Source code for cars.applications.point_cloud_outlier_removal.statistical_app

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the statistical point removal application class.
"""


import copy

# Standard imports
import logging
import os

import numpy as np

# Third party imports
from json_checker import And, Checker, Or
from pyproj import CRS

# CARS imports
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.point_cloud_outlier_removal import (
    abstract_outlier_removal_app as pc_removal,
)
from cars.applications.point_cloud_outlier_removal import (
    outlier_removal_algo,
)
from cars.applications.triangulation import pc_transform
from cars.applications.triangulation.triangulation_wrappers import (
    generate_point_cloud_file_names,
)
from cars.core import constants as cst
from cars.core import projection
from cars.data_structures import cars_dataset

# R0903  temporary disabled for error "Too few public methods"
# œgoing to be corrected by adding new methods as check_conf


[docs] class Statistical( pc_removal.PointCloudOutlierRemoval, short_name="statistical" ): # pylint: disable=R0903 """ Statistical """ # pylint: disable=too-many-instance-attributes def __init__(self, scaling_coeff, conf=None): """ Init function of Statistical :param scaling_coeff: scaling factor for resolution :type scaling_coeff: float :param conf: configuration for points outlier removal :return: a application_to_use object """ super().__init__(scaling_coeff, conf=conf) self.used_method = self.used_config["method"] # statistical outliers self.k = self.used_config["k"] self.filtering_constant = self.used_config["filtering_constant"] self.mean_factor = self.used_config["mean_factor"] self.std_dev_factor = self.used_config["std_dev_factor"] self.use_median = self.used_config["use_median"] self.half_epipolar_size = self.used_config["half_epipolar_size"] # Saving files self.save_intermediate_data = self.used_config["save_intermediate_data"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "statistical") overloaded_conf[application_constants.SAVE_INTERMEDIATE_DATA] = ( conf.get(application_constants.SAVE_INTERMEDIATE_DATA, False) ) overloaded_conf["use_median"] = conf.get("use_median", True) # statistical outlier filtering # k: number of neighbors overloaded_conf["k"] = conf.get("k", 50) # filtering_constant: constant to apply in the distance threshold # computation overloaded_conf["filtering_constant"] = conf.get( "filtering_constant", 0.0 ) # mean_factor: factor to apply to the mean in the distance threshold # computation overloaded_conf["mean_factor"] = conf.get("mean_factor", 1.3) # mean_factor: factor to apply to the standard deviation in the # distance threshold overloaded_conf["std_dev_factor"] = conf.get("std_dev_factor", 3.0) # half_epipolar_size: # Half size of the epipolar window used for neighobr search (depth map # input only) overloaded_conf["half_epipolar_size"] = conf.get( "half_epipolar_size", 5 ) point_cloud_outlier_removal_schema = { "method": str, "k": And(int, lambda x: x > 0), "filtering_constant": And(Or(float, int), lambda x: x >= 0), "mean_factor": And(Or(float, int), lambda x: x >= 0), "std_dev_factor": And(Or(float, int), lambda x: x >= 0), "use_median": bool, "half_epipolar_size": int, application_constants.SAVE_INTERMEDIATE_DATA: bool, } # Check conf checker = Checker(point_cloud_outlier_removal_schema) checker.validate(overloaded_conf) return overloaded_conf
[docs] def get_optimal_tile_size( self, max_ram_per_worker, superposing_point_clouds=1, point_cloud_resolution=0.5, ): """ Get the optimal tile size to use, depending on memory available :param max_ram_per_worker: maximum ram available :type max_ram_per_worker: int :param superposing_point_clouds: number of point clouds superposing :type superposing_point_clouds: int :param point_cloud_resolution: resolution of point cloud :type point_cloud_resolution: float :return: optimal tile size in meter :rtype: float """ tot = 10000 * superposing_point_clouds / point_cloud_resolution import_ = 200 # MiB tile_size = int( np.sqrt(float(((max_ram_per_worker - import_) * 2**23)) / tot) ) logging.info( "Estimated optimal tile size for statistical " "removal: {} meters".format(tile_size) ) return tile_size
[docs] def get_method(self): """ Get margins to use during point clouds fusion :return: algorithm method :rtype: string """ return self.used_method
[docs] def get_epipolar_margin(self): """ Get epipolar margin to use :return: margin :rtype: int """ margin = self.half_epipolar_size return margin
[docs] def get_on_ground_margin(self, resolution=0.5): """ Get margins to use during point clouds fusion :return: margin :rtype: float """ return 0
[docs] def run( # pylint: disable=too-many-positional-arguments self, merged_point_cloud, orchestrator=None, depth_map_dir=None, point_cloud_dir=None, dump_dir=None, epsg=None, ): """ Run PointCloudOutlierRemoval application. Creates a CarsDataset filled with new point cloud tiles. :param merged_point_cloud: merged point cloud. CarsDataset contains: - Z x W Delayed tiles. \ Each tile will be a future pandas DataFrame containing: - data : with keys : "x", "y", "z", "corr_msk" \ optional: "clr", "msk", "data_valid","coord_epi_geom_i",\ "coord_epi_geom_j", "idx_im_epi" - attrs with keys: "epsg" - attributes containing "bounds", "ysize", "xsize", "epsg" :type merged_point_cloud: CarsDataset filled with pandas.DataFrame :param orchestrator: orchestrator used :param depth_map_dir: output depth map directory. If None output will be written in dump_dir if intermediate data is requested :type depth_map_dir: str :param point_cloud_dir: output depth map directory. If None output will be written in dump_dir if intermediate data is requested :type point_cloud_dir: str :param dump_dir: dump dir for output (except depth map) if intermediate data is requested :type dump_dir: str :param epsg: cartographic reference for the point cloud (array input) :type epsg: int :return: filtered merged point cloud. CarsDataset contains: - Z x W Delayed tiles. \ Each tile will be a future pandas DataFrame containing: - data : with keys "x", "y", "z", "corr_msk" \ optional: "clr", "msk", "data_valid", "coord_epi_geom_i",\ "coord_epi_geom_j", "idx_im_epi" - attrs with keys: "epsg" - attributes containing "bounds", "ysize", "xsize", "epsg" :rtype : CarsDataset filled with xr.Dataset """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator if dump_dir is None: dump_dir = self.generate_unknown_dump_dir(self.orchestrator) if merged_point_cloud.dataset_type != "arrays": raise RuntimeError( "Only arrays is supported in statistical removal" ) prefix = os.path.basename(dump_dir) # Save as depth map filtered_point_cloud, saving_info_epipolar = ( self.__register_epipolar_dataset__( merged_point_cloud, depth_map_dir, dump_dir, app_name="statistical", pair_key=prefix, ) ) # Save as point cloud ( flatten_filtered_point_cloud, laz_pc_dir_name, csv_pc_dir_name, saving_info_flatten, ) = self.__register_pc_dataset__( merged_point_cloud, point_cloud_dir, dump_dir, app_name="statistical", ) # initialize empty index file for point cloud product if official # product is requested pc_index = None if point_cloud_dir: pc_index = {} # Generate rasters for col in range(filtered_point_cloud.shape[1]): for row in range(filtered_point_cloud.shape[0]): # update saving infos for potential replacement full_saving_info_epipolar = ocht.update_saving_infos( saving_info_epipolar, row=row, col=col ) full_saving_info_flatten = None if saving_info_flatten is not None: full_saving_info_flatten = ocht.update_saving_infos( saving_info_flatten, row=row, col=col ) if merged_point_cloud[row][col] is not None: csv_pc_file_name, laz_pc_file_name = ( generate_point_cloud_file_names( csv_pc_dir_name, laz_pc_dir_name, row, col, pc_index, pair_key=prefix, ) ) window = merged_point_cloud.tiling_grid[row, col] overlap = filtered_point_cloud.overlaps[row, col] # Delayed call to cloud filtering ( filtered_point_cloud[row, col], flatten_filtered_point_cloud[row, col], ) = self.orchestrator.cluster.create_task( epipolar_statistical_removal_wrapper, nout=2 )( merged_point_cloud[row, col], self.k, self.filtering_constant, self.mean_factor, self.std_dev_factor, self.use_median, self.half_epipolar_size, window, overlap, epsg=epsg, point_cloud_csv_file_name=csv_pc_file_name, point_cloud_laz_file_name=laz_pc_file_name, saving_info_epipolar=full_saving_info_epipolar, saving_info_flatten=full_saving_info_flatten, ) # update point cloud index if point_cloud_dir: self.orchestrator.update_index(pc_index) return filtered_point_cloud
# pylint: disable=too-many-positional-arguments
[docs] def epipolar_statistical_removal_wrapper( epipolar_ds, statistical_k, filtering_constant, mean_factor, std_dev_factor, use_median, half_epipolar_size, window, overlap, epsg, point_cloud_csv_file_name=None, point_cloud_laz_file_name=None, saving_info_epipolar=None, saving_info_flatten=None, ): """ Statistical outlier removal in epipolar geometry :param epipolar_ds: epipolar dataset to filter :type epipolar_ds: xr.Dataset :param statistical_k: k :type statistical_k: int :param filtering_constant: constant applied to the threshold :type filtering_constant: float :param mean_factor: mean factor :type mean_factor: float :param std_dev_factor: std factor :type std_dev_factor: float :param use_median: use median and quartile instead of mean and std :type use median: bool :param half_epipolar_size: half size of the window used to search neighbors :type half_epipolar_size: int :param window: window of base tile [row min, row max, col min col max] :type window: list :param overlap: overlap [row min, row max, col min col max] :type overlap: list :param epsg: epsg code of the CRS used to compute distances :type epsg: int :return: filtered dataset :rtype: xr.Dataset """ # Copy input cloud filtered_cloud = copy.copy(epipolar_ds) # Get current epsg cloud_epsg = filtered_cloud.attrs["epsg"] current_epsg = cloud_epsg # Check if can be used to filter spatial_ref = CRS.from_epsg(cloud_epsg) if spatial_ref.is_geographic: logging.debug( "The point cloud to filter is not in a cartographic system. " "The filter's default parameters might not be adapted " "to this referential. Please, convert the point " "cloud to ECEF to ensure a proper point_cloud." ) # Convert to epsg = 4978 cartographic_epsg = 4978 projection.point_cloud_conversion_dataset( filtered_cloud, cartographic_epsg ) current_epsg = cartographic_epsg outlier_removal_algo.epipolar_statistical_filtering( filtered_cloud, k=statistical_k, filtering_constant=filtering_constant, mean_factor=mean_factor, dev_factor=std_dev_factor, use_median=use_median, half_window_size=half_epipolar_size, ) # Fill with attributes cars_dataset.fill_dataset( filtered_cloud, saving_info=saving_info_epipolar, window=cars_dataset.window_array_to_dict(window), profile=None, attributes=None, overlaps=cars_dataset.overlap_array_to_dict(overlap), ) # Flatten point cloud to save it as LAZ flatten_filtered_cloud = None if point_cloud_csv_file_name or point_cloud_laz_file_name: # Convert epipolar array into point cloud flatten_filtered_cloud, cloud_epsg = ( pc_transform.depth_map_dataset_to_dataframe( filtered_cloud, current_epsg ) ) # Convert to wanted epsg if epsg is not None and cloud_epsg != epsg: projection.point_cloud_conversion_dataframe( flatten_filtered_cloud, cloud_epsg, epsg ) cloud_epsg = epsg # Fill attributes for LAZ saving color_type = pc_transform.get_color_type([filtered_cloud]) attributes = { "epsg": cloud_epsg, "color_type": color_type, cst.CROPPED_DISPARITY_RANGE: ocht.get_disparity_range_cropped( epipolar_ds ), } cars_dataset.fill_dataframe( flatten_filtered_cloud, saving_info=saving_info_flatten, attributes=attributes, ) # Save point cloud in worker if point_cloud_csv_file_name: cars_dataset.run_save_points( flatten_filtered_cloud, point_cloud_csv_file_name, overwrite=True, point_cloud_format="csv", overwrite_file_name=False, ) if point_cloud_laz_file_name: cars_dataset.run_save_points( flatten_filtered_cloud, point_cloud_laz_file_name, overwrite=True, point_cloud_format="laz", overwrite_file_name=False, ) return filtered_cloud, flatten_filtered_cloud