Python quickly implements ground point filtering

Use the traditional algorithm to filter out the ground points. The specific idea is to divide the point cloud into segments and bins on the BEV, and take a z_min point in each bin as a representative.

import numpy as np

class Processor:
    '''
    Module: Processor

    Args:
        n_segments(int): The number of fan-shaped regions divided by 360 degrees
        n_bins(int): The number of bins divided in a segment.
        r_max(float): The max boundary of lidar point.(meters)
        r_min(float): The min boundary of lidar point.(meters)
        line_search_angle(float): The angle for relative search in nearby segments.
        max_dist_to_line(float): The distance threshold of the non-ground object to the ground.(meters)

        max_slope(float): Local maximum slope of the ground.
        max_error(float): The max MSE to fit the ground.(meters)
        long_threshold(int): The max threshold of ground wave interval.
        max_start_height(float): The max height difference between hillside and ground.(meters)
        sensor_height(float): The distance from the lidar sensor to the ground.(meters)

    Call:
        Arg:
            vel_msg(numpy.ndarray): The raw local LiDAR cloud points in 3D(x,y,z).

            For example:
                vel_msg shapes [n_point, 3], with `n_point` refers to the number of cloud points,
                    while `3` is the number of 3D(x,y,z) axis.
                vel_msg = array([[0.3, 0.1, 0.7],
                                    [0.6, 0.6, 0.5],
                                    [0.1, 0.4, 0.8],
                                    ... ... ...
                                    [0.5, 0.3, 0.6],
                                    [0.6, 0.3, 0.4]]
        Returns:
            vel_non_ground(numpy.ndarray): The local LiDAR cloud points after filter out ground info.
    '''

    def __init__(self, n_segments=60, n_bins=80, r_max=150, r_min=0.3,
                    line_search_angle=0.2, max_dist_to_line=0.25,
                    max_slope=2.0, max_error=0.1, long_threshold=3,
                    max_start_height=0.2, sensor_height=1.73):
        self.n_segments = n_segments # number of segments
        self.n_bins = n_bins # number of bins in a segment
        self.r_max = r_max
        self.r_min = r_min
        self.line_search_angle = line_search_angle
        self.max_dist_to_line = max_dist_to_line

        self.max_slope = max_slope
        self.max_error = max_error
        self. long_threshold = long_threshold #int
        self.max_start_height = max_start_height
        self.sensor_height = sensor_height

        self.segment_step = 2 * np.pi / self.n_segments
        self.bin_step = (self.r_max - self.r_min) / self.n_bins

        self.segments = []
        self.seg_list = []

    def __call__(self, vel_msg):
        point5D = self.Model_Ground(vel_msg)
        vel_non_ground = self.Segment_Vel(point5D)

        return vel_non_ground

    def Model_Ground(self, vel_msg):
        point5D = self. project_5D(vel_msg)
        point5D = self. filter_out_range(point5D)
        # Sort by seg module.
        point5D = point5D[np. argsort(point5D[:, 3])]
        # np.unique removes duplicate elements in the array and returns a new array from small to large
        self.seg_list = np.int16(np.unique(point5D[:, 3]))
        # Each seg is processed separately
        for seg_idx in self.seg_list:
            # Create an instance of segmentation
            segment = Segmentation(self.max_slope, self.max_error, self.long_threshold,
                                    self.max_start_height, self.sensor_height)
            # Take out the point of the current segment for processing
            point5D_seg = point5D[point5D[:, 3] == seg_idx]
            # min_z indicates the minimum value of z for each bin in the current seg
            min_z = segment.get_min_z(point5D_seg) # checked
            segment.fitSegmentLines(min_z) # checked
            # Corresponding to the current segment instance, there is an array property of self.lines in a segment instance
            self. segments. append(segment)

        return point5D

    def Segment_Vel(self, point5D):
        #point5D is arranged in seg order, and the points that are not in the range of r are filtered
        # Initial label for each point
        label = np. zeros([point5D. shape[0]])
        #np.diff means that the adjacent elements in the array are subtracted, such as a 10-column vector, and 9 columns are obtained after the operation. np.r_ means splicing two matrices up and down, and np.nonzero is used to get the index of the non-zero element in the array
        #This slice_list is to create a list of the start and end indexes of each segment in point5D, and the points in each segment can be separated according to the list
        slice_list = np.r_[np.nonzero(np.r_[1, np.diff(point5D[:, 3])])[0], len(point5D)]

        for i, seg_idx in enumerate(self. seg_list):
            #Get the instance of the current seg and the points in the seg
            segment = self. segments[i]
            point5D_seg = point5D[point5D[:, 3] == seg_idx]
            #Get the label of the point in the current seg
            non_ground = segment.verticalDistanceToLine(point5D_seg[:, [4, 2]]) # x,y -> d,z
            #Set the points whose calculated distance is greater than the threshold to 0
            non_ground[non_ground > self.max_dist_to_line] = 0

            step = 1
            #A function is defined below, and a subscript value can be obtained according to i and step
            idx_search = lambda i, step: (i % len(self. seg_list) + step) % len(self. seg_list)
            #Judging several segments around the current seg, if the bin and z of the current seg meet the segline curve next door, it is also defined as a ground point.
            while step * self.segment_step < self.line_search_angle:
                segment_f = self. segments[idx_search(i, -step)]
                segment_b = self. segments[idx_search(i, step)]

                non_ground_b = segment_f.verticalDistanceToLine(point5D_seg[:, [4, 2]])
                non_ground_b[non_ground_b > self.max_dist_to_line] = 0
                non_ground_f = segment_b.verticalDistanceToLine(point5D_seg[:, [4, 2]])
                non_ground_f[non_ground_f > self.max_dist_to_line] = 0

                non_ground += non_ground_b + non_ground_f

                step + = 1
            #The point assignment label of the current seg, the label is 1 means it is a ground point
            label[slice_list[i]:slice_list[i + 1]] = non_ground == 0

        # vel_non_ground = point5D[label == 1][:, :3]
        vel_non_ground = point5D[label != 1][:, :3]

        return vel_non_ground

    def project_5D(self, point3D):
        '''
        Args:
            point3D: shapes (n_row, 3), while 3 represent x,y,z axis in order.
        Returns:
            point5D: shapes (n_row, 3 + 2), while 5 represent x,y,z,seg,bin axis in order.
        '''
        x = point3D[:,0]
        y = point3D[:, 1]
        z = point3D[:, 2]

        # index mapping
        # The input of arctan2 is two points, and the output range is positive and negative. And arctan is one input, and the output range is half positive and negative
        angle = np. arctan2(y, x)
        # Because the output is positive and negative pies, the range of adding pai is 0-2 pies. Divide by the angle per share. where do you get it
        segment_index = np.int32(np.floor((angle + np.pi) / self.segment_step)) # segment
        # Find the plane distance of each point, that is, the radius
        radius = np. sqrt(x ** 2 + y ** 2)
        # Divide into n parts between rmin and rmax, see which part this point falls in
        bin_index = np.int32(np.floor((radius - self.r_min) / self.bin_step)) # bin
        # segment_index is a row of n columns, so it must be transposed first and then superimposed by row
        point5D = np.vstack([point3D.T, segment_index, bin_index]).T

        return point5D

    def filter_out_range(self, point5D):
        '''
        Args:
            point5D: shapes (n_row, 3 + 2), while 5 represent x,y,z,seg,bin axis in order.
        Returns:
            point5D: shapes (n_row_filtered, 5), while 5 represent x,y,z,seg,bin axis in order.
        '''
        radius = point5D[:, 4] # [x,y,z,seg,bin]
        # np.logical_and filters out points that are not within the radius.
        condition = np.logical_and(radius < self.r_max, radius > self.r_min)
        point5D = point5D[condition]

        return point5D


class Segmentation:
    '''
    Args:
        max_slope(float): Local maximum slope of the ground.
        max_error(float): The max MSE to fit the ground.
        long_threshold(int): The max threshold of ground wave interval.
        max_start_height(float): The max height difference between hillside and ground.
        sensor_height(float): The distance from the lidar sensor to the ground.
    '''

    def __init__(self, max_slope=2.0, max_error=0.1, long_threshold=3,
                    max_start_height=0.2, sensor_height=1.73):
        self.max_slope_ = max_slope
        self.max_error_ = max_error
        self. long_threshold_ = long_threshold #int
        self.max_start_height_ = max_start_height
        self.sensor_height_ = sensor_height

        self.matrix_new = np.array([[1, 0, 0], [0, 1, 0]])
        self.matrix_one = np.array([[0, 0, 1]])

        self.lines = []

    def get_min_z(self, point5D_seg):
        '''
        Args:
            point5D: shapes (n_row, 5), while 5 represent x,y,z,seg,bin axis in order.
        Returns:
            pointSBZ: shapes (n_row, 2), while 3 represent bin,z axis in order. Bin order sorted.
        '''
        # The bin value in the current seg
        bin_ = point5D_seg[:, 4]
        # Get the minimum value of z for each bin in the current seg, and return [num_bin,2]2 are bin_idx and z_min respectively
        pointBZ = np.array([point5D_seg[bin_ == bin_idx].min(axis=0)[2:] for bin_idx in np.unique(bin_)])[:, [2, 0]]
        return point BZ

    def fitLocalLine(self, cur_line_points, error=False):
        # @Indicates matrix multiplication in numpy, if the second element is a vector, it will be automatically transposed
        xy1 = np.array(cur_line_points) @ self.matrix_new + self.matrix_one
        # At this time, xy1 means (:(bin,z_min,1))
        A = xy1[:, [0, 2]]
        y = xy1[:, [1]]
        # The independent variable is bin, and the dependent variable is z_min. np.linalg.lstsqyong'zui'xiao'er'chneg
        [[m], [b]] = np.linalg.lstsq(A, y, rcond=None)[0]
        if error:
            mse = (A @ np.array([[m], [b]]) - y) ** 2
            return [m, b], mse
        else:
            return [m, b]

    def verticalDistanceToLine(self, xy): # checked
        #The incoming xy is the point in the current seg, including the attributes of bin and z
        kMargin = 0.1
        #Create the label of the current seg point
        label = np.zeros(len(xy))
        # Traverse the lines in the current seg instance
        for d_l, d_r, m, b in self.lines:
            #Calculate the distance from the point in the current seg to the fitted line
            distance = np.abs(m * xy[:, 0] + b - xy[:, 1])
            #Get the bin in the current line range
            con = (xy[:, 0] > d_l - kMargin) & amp; (xy[:, 0] < d_r + kMargin)
            #Assign the distance of these bins to the label
            label[con] = distance[con]

        return label. flatten()

    def fitSegmentLines(self, min_z):
        # The point of the smallest bin
        cur_line_points = [min_z[0]]
        long_line = False
        # the height of the sensor
        cur_ground_height = self.sensor_height_
        d_i = 1
        while d_i < len(min_z):
            # The last of the stored bin list
            lst_point = cur_line_points[-1]
            # Loop through the current bin in the loop
            cur_point = min_z[d_i]
            # If the bins of the two are far away, it is a long line
            if cur_point[0] - lst_point[0] > self.long_threshold_:
                long_line = True

            if len(cur_line_points) < 2:
                # If the difference between the two bins is less than the threshold, the z and ground height of the bin saved at the same time are less than the non-ground threshold
                # means that the ground points are continuous, if the two bins are far apart or the first point is far away from the ground, the first point will be saved again
                if (cur_point[0] - lst_point[0] < self.long_threshold_) and abs(
                        lst_point[1] - cur_ground_height) < self.max_start_height_:
                    cur_line_points.append(cur_point)
                else:
                    cur_line_points = [cur_point]
            else:
                # Save the third point directly
                cur_line_points.append(cur_point)
                # Fit the straight line equation between bin and z according to the existing points
                cur_line, mse = self. fitLocalLine(cur_line_points, True)
                # If the error of the least squares fitting is greater than the threshold or the slope is greater than the slope threshold, or if it is a long line
                if (mse.max() > self.max_error_ or cur_line[0] > self.max_slope_ or long_line):
                    # Explain that the point just put in is not in the plane, pop it out
                    cur_line_points. pop()
                    if len(cur_line_points) >= 3:
                        #If there are more than or equal to three points, then directly fit the parameters, put them in self.lines, start bin, end bin, straight line parameters
                        new_line = self. fitLocalLine(cur_line_points)
                        self.lines.append([cur_line_points[0][0], cur_line_points[-1][0], *new_line]) # b boundary
                        #Update the current z, which is the ground height, according to the fitted straight line equation. The independent variable selects the last bin
                        cur_ground_height = new_line[0] * cur_line_points[-1][0] + new_line[1] # m*x + b
                    long_line = False
                    cur_line_points = [cur_line_points[-1]]
                    #Because it popped up before, d_i did not participate in this calculation, so subtract one, let d_i judge again with the last last point reserved
                    d_i -= 1
            d_i += 1
        #After exiting the loop, if the number of saved points is greater than 2, it means that it is also a line, so keep it.
        if len(cur_line_points) > 2:
            new_line = self. fitLocalLine(cur_line_points)
            self.lines.append([cur_line_points[0][0], cur_line_points[-1][0], *new_line])

Original link:
yGitHub – SilvesterHsu/LiDAR_ground_removal: This is an implementation of Fast Segmentation of 3D Point Clouds for Ground Vehicles