Source code for cars.applications.point_cloud_outlier_removal.outlier_removal_algo

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions used in outlier removal
"""

# Standard imports
from typing import List, Tuple, Union

# Third party imports
import numpy as np
import outlier_filter  # pylint:disable=E0401
import pandas
from scipy.spatial import cKDTree  # pylint: disable=no-name-in-module

from cars.applications.triangulation import pc_transform

# CARS imports
from cars.core import constants as cst
from cars.core import projection

# ##### Small component filtering ######


[docs] def small_component_filtering( cloud: pandas.DataFrame, connection_val: float, nb_pts_threshold: int, clusters_distance_threshold: float = None, filtered_elt_pos: bool = False, ) -> Tuple[pandas.DataFrame, Union[None, pandas.DataFrame]]: """ Filter point cloud to remove small clusters of points (see the detect_small_components function). :param cloud: combined cloud as returned by the create_combined_cloud function :param connection_val: distance to use to consider that two points are connected :param nb_pts_threshold: number of points to use to identify small clusters to filter :param clusters_distance_threshold: distance to use to consider if two points clusters are far from each other or not (set to None to deactivate this level of filtering) :param filtered_elt_pos: if filtered_elt_pos is set to True, the removed points positions in their original epipolar images are returned, otherwise it is set to None :return: Tuple made of the filtered cloud and the removed elements positions in their epipolar images """ clusters_distance_threshold_float = ( np.nan if clusters_distance_threshold is None else clusters_distance_threshold ) index_elt_to_remove = outlier_filter.pc_small_component_outlier_filtering( cloud.loc[:, cst.X].values, cloud.loc[:, cst.Y].values, cloud.loc[:, cst.Z].values, radius=connection_val, min_cluster_size=nb_pts_threshold, clusters_distance_threshold=clusters_distance_threshold_float, ) return pc_transform.filter_cloud( cloud, index_elt_to_remove, filtered_elt_pos )
[docs] def detect_small_components( cloud_xyz: np.ndarray, connection_val: float, nb_pts_threshold: int, clusters_distance_threshold: float = None, ) -> List[int]: """ Determine the indexes of the points of cloud_xyz to filter. The clusters are made of 'connected' points (2 connected points have a distance smaller than connection_val) The removed clusters are composed of less than nb_pts_threshold points and are also far from other clusters (points are further than clusters_distance_threshold). If clusters_distance_threshold is set to None, all the clusters that are composed of less than nb_pts_threshold points are filtered. :param cloud_xyz: points kdTree :param connection_val: distance to use to consider that two points are connected :param nb_pts_threshold: number of points to use to identify small clusters to filter :param clusters_distance_threshold: distance to use to consider if two points clusters are far from each other or not (set to None to deactivate this level of filtering) :return: list of the points to filter indexes """ cloud_tree = cKDTree(cloud_xyz) # extract connected components processed = [False] * len(cloud_xyz) connected_components = [] for idx, xyz_point in enumerate(cloud_xyz): # if point has already been added to a cluster if processed[idx]: continue # get point neighbors neighbors_list = cloud_tree.query_ball_point(xyz_point, connection_val) # add them to the current cluster seed = [] seed.extend(neighbors_list) for neigh_idx in neighbors_list: processed[neigh_idx] = True # iteratively add all the neighbors of the points # which were added to the current cluster (if there are some) while len(neighbors_list) != 0: all_neighbors = cloud_tree.query_ball_point( cloud_xyz[neighbors_list], connection_val ) # flatten neighbors new_neighbors = [] for neighbor_item in all_neighbors: new_neighbors.extend(neighbor_item) # retrieve only new neighbors neighbors_list = list(set(new_neighbors) - set(seed)) # add them to the current cluster seed.extend(neighbors_list) for neigh_idx in neighbors_list: processed[neigh_idx] = True connected_components.append(seed) # determine clusters to remove cluster_to_remove = [] for _, connected_components_item in enumerate(connected_components): if len(connected_components_item) < nb_pts_threshold: if clusters_distance_threshold is not None: # search if the current cluster has any neighbors # in the clusters_distance_threshold radius all_neighbors = cloud_tree.query_ball_point( cloud_xyz[connected_components_item], clusters_distance_threshold, ) # flatten neighbors new_neighbors = [] for neighbor_item in all_neighbors: new_neighbors.extend(neighbor_item) # retrieve only new neighbors neighbors_list = list( set(new_neighbors) - set(connected_components_item) ) # if there are no new neighbors, the cluster will be removed if len(neighbors_list) == 0: cluster_to_remove.extend(connected_components_item) else: cluster_to_remove.extend(connected_components_item) return cluster_to_remove
# ##### statistical filtering ###### # pylint: disable=too-many-positional-arguments
[docs] def statistical_outlier_filtering( cloud: pandas.DataFrame, k: int, filtering_constant: float, mean_factor: float, dev_factor: float, use_median: bool = False, filtered_elt_pos: bool = False, ) -> Tuple[pandas.DataFrame, Union[None, pandas.DataFrame]]: """ Filter point cloud to remove statistical outliers (see the detect_statistical_outliers function). :param cloud: combined cloud as returned by the create_combined_cloud function :param k: number of neighbors :param filtering_constant: constant added to the distance threshold :param mean_factor: multiplication factor of mean used to compute the distance threshold :param dev_factor: multiplication factor of deviation used to compute the distance threshold :param use_median: choice of statistical measure used to filter :param filtered_elt_pos: if filtered_elt_pos is set to True, the removed points positions in their original epipolar images are returned, otherwise it is set to None :return: Tuple made of the filtered cloud and the removed elements positions in their epipolar images """ index_elt_to_remove = outlier_filter.pc_statistical_outlier_filtering( cloud.loc[:, cst.X].values, cloud.loc[:, cst.Y].values, cloud.loc[:, cst.Z].values, filtering_constant=filtering_constant, mean_factor=mean_factor, dev_factor=dev_factor, k=k, use_median=use_median, ) return pc_transform.filter_cloud( cloud, index_elt_to_remove, filtered_elt_pos )
[docs] def detect_statistical_outliers( cloud_xyz: np.ndarray, k: int, dev_factor: float, use_median: bool ) -> List[int]: """ Determine the indexes of the points of cloud_xyz to filter. The removed points have mean distances with their k nearest neighbors that are greater than a distance threshold (dist_thresh). This threshold is computed from the mean (or median) and standard deviation (or interquartile range) of all the points mean distances with their k nearest neighbors: (1) dist_thresh = mean_distances + dev_factor * std_distances or (2) dist_thresh = median_distances + dev_factor * iqr_distances :param cloud_xyz: points kdTree :param k: number of neighbors :param dev_factor: multiplication factor of deviation used to compute the distance threshold :param use_median: if True formula (2) is used for threshold, else formula (1) :return: list of the points to filter indexes """ # compute for each points, all the distances to their k neighbors cloud_tree = cKDTree(cloud_xyz) neighbors_distances, _ = cloud_tree.query(cloud_xyz, k + 1) # Compute the mean of those distances for each point # Mean is not used directly as each line # contained the distance value to the point itself mean_neighbors_distances = np.sum(neighbors_distances, axis=1) mean_neighbors_distances /= k if use_median: # compute median and interquartile range of those mean distances # for the whole point cloud median_distances = np.median(mean_neighbors_distances) iqr_distances = np.percentile( mean_neighbors_distances, 75 ) - np.percentile(mean_neighbors_distances, 25) # compute distance threshold and # apply it to determine which points will be removed dist_thresh = median_distances + dev_factor * iqr_distances else: # compute median and interquartile range of those mean distances # for the whole point cloud mean_distances = np.mean(mean_neighbors_distances) std_distances = np.std(mean_neighbors_distances) # compute distance threshold and # apply it to determine which points will be removed dist_thresh = mean_distances + dev_factor * std_distances points_to_remove = np.argwhere(mean_neighbors_distances > dist_thresh) # flatten points_to_remove detected_points = [] for removed_point in points_to_remove: detected_points.extend(removed_point) return detected_points
[docs] def epipolar_small_components( cloud, min_cluster_size=15, radius=1.0, half_window_size=5, clusters_distance_threshold=np.nan, ): """ Filter outliers using the small components method in epipolar geometry :param epipolar_ds: epipolar dataset to filter :type epipolar_ds: xr.Dataset :param statistical_k: k :type statistical_k: int :param std_dev_factor: std factor :type std_dev_factor: float :param half_window_size: use median and quartile instead of mean and std :type half_window_size: int :param use_median: use median and quartile instead of mean and std :type use_median: bool :return: filtered dataset :rtype: xr.Dataset """ if clusters_distance_threshold is None: clusters_distance_threshold = np.nan outlier_filter.epipolar_small_component_outlier_filtering( cloud[cst.X], cloud[cst.Y], cloud[cst.Z], min_cluster_size, radius, half_window_size, clusters_distance_threshold, ) projection.point_cloud_conversion_dataset(cloud, 4326) return cloud
# pylint: disable=too-many-positional-arguments
[docs] def epipolar_statistical_filtering( epipolar_ds, k=15, filtering_constant=0.0, mean_factor=1.0, dev_factor=1.0, half_window_size=5, use_median=False, ): """ Filter outliers using the statistical method in epipolar geometry :param epipolar_ds: epipolar dataset to filter :type epipolar_ds: xr.Dataset :param statistical_k: k :type statistical_k: int :param std_dev_factor: std factor :type std_dev_factor: float :param half_window_size: use median and quartile instead of mean and std :type half_window_size: int :param use_median: use median and quartile instead of mean and std :type use_median: bool :return: filtered dataset :rtype: xr.Dataset """ if not np.all(np.isnan(epipolar_ds[cst.Z])): outlier_filter.epipolar_statistical_outlier_filtering( epipolar_ds[cst.X], epipolar_ds[cst.Y], epipolar_ds[cst.Z], k, half_window_size, filtering_constant, mean_factor, dev_factor, use_median, ) projection.point_cloud_conversion_dataset(epipolar_ds, 4326) return epipolar_ds