Source code for cars.applications.point_cloud_outlier_removal.small_components_app

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the small_components point removal application class.
"""


import copy

# Standard imports
import logging
import os

# Third party imports
import numpy as np
from json_checker import And, Checker, Or
from pyproj import CRS

# CARS imports
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.point_cloud_outlier_removal import (
    abstract_outlier_removal_app as pc_removal,
)
from cars.applications.point_cloud_outlier_removal import (
    outlier_removal_algo,
)
from cars.applications.triangulation import pc_transform
from cars.applications.triangulation.triangulation_wrappers import (
    generate_point_cloud_file_names,
)
from cars.core import constants as cst
from cars.core import projection
from cars.data_structures import cars_dataset


[docs] class SmallComponents( pc_removal.PointCloudOutlierRemoval, short_name="small_components" ): # pylint: disable=R0903 """ SmallComponents """ # pylint: disable=too-many-instance-attributes def __init__(self, scaling_coeff, conf=None): """ Init function of SmallComponents :param scaling_coeff: scaling factor for resolution :type scaling_coeff: float :param conf: configuration for points outlier removal :return: an application_to_use object """ super().__init__(scaling_coeff, conf=conf) self.used_method = self.used_config["method"] # small components self.on_ground_margin = self.used_config["on_ground_margin"] self.connection_distance = self.used_config["connection_distance"] self.nb_points_threshold = self.used_config["nb_points_threshold"] self.clusters_distance_threshold = self.used_config[ "clusters_distance_threshold" ] self.half_epipolar_size = self.used_config["half_epipolar_size"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "small_components") overloaded_conf[application_constants.SAVE_INTERMEDIATE_DATA] = ( conf.get(application_constants.SAVE_INTERMEDIATE_DATA, False) ) # small components # on_ground_margin: # margin added to the on ground region to not filter # points clusters # that were incomplete because they were on the edges overloaded_conf["on_ground_margin"] = conf.get("on_ground_margin", 11) # pts_connection_dist: # distance to use to consider that two points are connected overloaded_conf["connection_distance"] = conf.get( "connection_distance", self.scaling_coeff * 3.0 ) # nb_pts_threshold: # points clusters that have less than this number of points # will be filtered overloaded_conf["nb_points_threshold"] = conf.get( "nb_points_threshold", 50 ) # dist_between_clusters: # distance to use to consider that two points clusters # are far from each other or not. # If a small points cluster is near to another one, it won't # be filtered. # (None = deactivated) overloaded_conf["clusters_distance_threshold"] = conf.get( "clusters_distance_threshold", None ) # half_epipolar_size: # Half size of the epipolar window used for neighobr search (depth map # input only) overloaded_conf["half_epipolar_size"] = conf.get( "half_epipolar_size", 5 ) point_cloud_fusion_schema = { "method": str, "on_ground_margin": int, "connection_distance": And(float, lambda x: x > 0), "nb_points_threshold": And(int, lambda x: x > 0), "clusters_distance_threshold": Or(None, float), "half_epipolar_size": int, application_constants.SAVE_INTERMEDIATE_DATA: bool, } # Check conf checker = Checker(point_cloud_fusion_schema) checker.validate(overloaded_conf) return overloaded_conf
[docs] def get_optimal_tile_size( self, max_ram_per_worker, superposing_point_clouds=1, point_cloud_resolution=0.5, ): """ Get the optimal tile size to use, depending on memory available :param max_ram_per_worker: maximum ram available :type max_ram_per_worker: int :param superposing_point_clouds: number of point clouds superposing :type superposing_point_clouds: int :param point_cloud_resolution: resolution of point cloud :type point_cloud_resolution: float :return: optimal tile size in meter :rtype: float """ tot = 10000 * superposing_point_clouds / point_cloud_resolution import_ = 200 # MiB tile_size = int( np.sqrt(float(((max_ram_per_worker - import_) * 2**23)) / tot) ) logging.info( "Estimated optimal tile size for small" "component removal: {} meters".format(tile_size) ) return tile_size
[docs] def get_method(self): """ Get margins to use during point clouds fusion :return: algorithm method :rtype: string """ return self.used_method
[docs] def get_epipolar_margin(self): """ Get epipolar margin to use :return: margin :rtype: int """ margin = self.half_epipolar_size return margin
[docs] def get_on_ground_margin(self, resolution=0.5): """ Get margins to use during point clouds fusion :return: margin :rtype: float """ on_ground_margin = self.on_ground_margin * resolution return on_ground_margin
[docs] def run( # pylint: disable=too-many-positional-arguments self, merged_point_cloud, orchestrator=None, depth_map_dir=None, point_cloud_dir=None, dump_dir=None, epsg=None, ): """ Run PointCloudOutlierRemoval application. Creates a CarsDataset filled with new point cloud tiles. :param merged_point_cloud: merged point cloud. CarsDataset contains: - Z x W Delayed tiles. \ Each tile will be a future pandas DataFrame containing: - data : with keys : "x", "y", "z", "corr_msk"\ optional: "clr", "msk", "data_valid","coord_epi_geom_i",\ "coord_epi_geom_j","idx_im_epi" \ - attrs with keys: "epsg" - attributes containing "bounds", "ysize", "xsize", "epsg" :type merged_point_cloud: CarsDataset filled with pandas.DataFrame :param orchestrator: orchestrator used :param depth_map_dir: output depth map directory. If None output will be written in dump_dir if intermediate data is requested :type depth_map_dir: str :param point_cloud_dir: output depth map directory. If None output will be written in dump_dir if intermediate data is requested :type point_cloud_dir: str :type output_dir: str :param dump_dir: dump dir for output (except depth map) if intermediate data is requested :type dump_dir: str :param epsg: cartographic reference for the point cloud (array input) :type epsg: int :return: filtered merged point cloud. CarsDataset contains: - Z x W Delayed tiles.\ Each tile will be a future pandas DataFrame containing: - data : with keys : "x", "y", "z", "corr_msk"\ optional: "clr", "msk", "data_valid","coord_epi_geom_i",\ "coord_epi_geom_j","idx_im_epi" - attrs with keys: "epsg" - attributes containing "bounds", "ysize", "xsize", "epsg" :rtype: CarsDataset filled with xr.Dataset """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator if dump_dir is None: dump_dir = self.generate_unknown_dump_dir(self.orchestrator) if merged_point_cloud.dataset_type != "arrays": raise RuntimeError( "Only arrays is supported in small components removal" ) prefix = os.path.basename(dump_dir) # Save as depth map filtered_point_cloud, saving_info_epipolar = ( self.__register_epipolar_dataset__( merged_point_cloud, depth_map_dir, dump_dir, app_name="small_components", pair_key=prefix, ) ) # Save as point cloud ( flatten_filtered_point_cloud, laz_pc_dir_name, csv_pc_dir_name, saving_info_flatten, ) = self.__register_pc_dataset__( merged_point_cloud, point_cloud_dir, dump_dir, app_name="small_components", ) # initialize empty index file for point cloud product if official # product is requested pc_index = None if point_cloud_dir: pc_index = {} # Generate rasters for col in range(filtered_point_cloud.shape[1]): for row in range(filtered_point_cloud.shape[0]): # update saving infos for potential replacement full_saving_info_epipolar = ocht.update_saving_infos( saving_info_epipolar, row=row, col=col ) full_saving_info_flatten = None if saving_info_flatten is not None: full_saving_info_flatten = ocht.update_saving_infos( saving_info_flatten, row=row, col=col ) if merged_point_cloud[row][col] is not None: csv_pc_file_name, laz_pc_file_name = ( generate_point_cloud_file_names( csv_pc_dir_name, laz_pc_dir_name, row, col, pc_index, pair_key=prefix, ) ) window = merged_point_cloud.tiling_grid[row, col] overlap = filtered_point_cloud.overlaps[row, col] # Delayed call to cloud filtering ( filtered_point_cloud[row, col], flatten_filtered_point_cloud[row, col], ) = self.orchestrator.cluster.create_task( epipolar_small_component_removal_wrapper, nout=2 )( merged_point_cloud[row, col], self.connection_distance, self.nb_points_threshold, self.clusters_distance_threshold, self.half_epipolar_size, window, overlap, epsg=epsg, point_cloud_csv_file_name=csv_pc_file_name, point_cloud_laz_file_name=laz_pc_file_name, saving_info_epipolar=full_saving_info_epipolar, saving_info_flatten=full_saving_info_flatten, ) # update point cloud index if point_cloud_dir: self.orchestrator.update_index(pc_index) return filtered_point_cloud
# pylint: disable=too-many-positional-arguments
[docs] def epipolar_small_component_removal_wrapper( cloud, connection_distance, nb_points_threshold, clusters_distance_threshold, half_epipolar_size, window, overlap, epsg=None, point_cloud_csv_file_name=None, point_cloud_laz_file_name=None, saving_info_epipolar=None, saving_info_flatten=None, ): """ Small component outlier removal in epipolar geometry :param epipolar_ds: epipolar dataset to filter :type epipolar_ds: xr.Dataset :param connection_distance: minimum distance of two connected points :type connection_distance: float :param nb_points_threshold: minimum valid cluster size :type nb_points_threshold: int :param clusters_distance_threshold: max distance between an outlier cluster and other points :type clusters_distance_threshold: float :param half_epipolar_size: half size of the window used to search neighbors :type half_epipolar_size: int :param window: window of base tile [row min, row max, col min col max] :type window: list :param overlap: overlap [row min, row max, col min col max] :type overlap: list :return: filtered dataset :rtype: xr.Dataset """ # Copy input cloud filtered_cloud = copy.copy(cloud) # Get current epsg cloud_epsg = filtered_cloud.attrs["epsg"] current_epsg = cloud_epsg # Check if can be used to filter spatial_ref = CRS.from_epsg(cloud_epsg) if spatial_ref.is_geographic: logging.debug( "The points cloud to filter is not in a cartographic system. " "The filter's default parameters might not be adapted " "to this referential. Please, convert the points " "cloud to ECEF to ensure a proper point_cloud." ) # Convert to epsg = 4978 cartographic_epsg = 4978 projection.point_cloud_conversion_dataset( filtered_cloud, cartographic_epsg ) current_epsg = cartographic_epsg outlier_removal_algo.epipolar_small_components( filtered_cloud, min_cluster_size=nb_points_threshold, radius=connection_distance, half_window_size=half_epipolar_size, clusters_distance_threshold=clusters_distance_threshold, ) # Fill with attributes cars_dataset.fill_dataset( filtered_cloud, saving_info=saving_info_epipolar, window=cars_dataset.window_array_to_dict(window), profile=None, attributes=None, overlaps=cars_dataset.overlap_array_to_dict(overlap), ) # Flatten point cloud to save it as LAZ flatten_filtered_cloud = None if point_cloud_csv_file_name or point_cloud_laz_file_name: # Convert epipolar array into point cloud flatten_filtered_cloud, cloud_epsg = ( pc_transform.depth_map_dataset_to_dataframe( filtered_cloud, current_epsg ) ) # Convert to UTM if epsg is not None and cloud_epsg != epsg: projection.point_cloud_conversion_dataframe( flatten_filtered_cloud, cloud_epsg, epsg ) cloud_epsg = epsg # Fill attributes for LAZ saving color_type = pc_transform.get_color_type([filtered_cloud]) attributes = { "epsg": cloud_epsg, "color_type": color_type, cst.CROPPED_DISPARITY_RANGE: ocht.get_disparity_range_cropped( cloud ), } cars_dataset.fill_dataframe( flatten_filtered_cloud, saving_info=saving_info_flatten, attributes=attributes, ) # Save point cloud in worker if point_cloud_csv_file_name: cars_dataset.run_save_points( flatten_filtered_cloud, point_cloud_csv_file_name, overwrite=True, point_cloud_format="csv", overwrite_file_name=False, ) if point_cloud_laz_file_name: cars_dataset.run_save_points( flatten_filtered_cloud, point_cloud_laz_file_name, overwrite=True, point_cloud_format="laz", overwrite_file_name=False, ) return filtered_cloud, flatten_filtered_cloud