Source code for cars.applications.point_cloud_fusion.mapping_to_terrain_tiles

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the epipolar cloud fusion application class.
"""


# Standard imports
import logging
import os
from collections import Counter

# Third party imports
import numpy as np
from json_checker import Checker
from shapely.geometry import Polygon

import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.point_cloud_fusion import (
    cloud_fusion_constants,
    pc_tif_tools,
    point_cloud_tools,
)
from cars.applications.point_cloud_fusion.point_cloud_fusion import (
    PointCloudFusion,
)
from cars.applications.triangulation.triangulation_tools import (
    generate_point_cloud_file_names,
)
from cars.core import constants as cst
from cars.core import inputs, projection, tiling
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset


[docs]class MappingToTerrainTiles( PointCloudFusion, short_name="mapping_to_terrain_tiles" ): """ EpipolarCloudFusion """ def __init__(self, conf=None): """ Init function of EpipolarCloudFusion :param conf: configuration for fusion :return: an application_to_use object """ super().__init__(conf=conf) # Cloud fusion self.used_method = self.used_config["method"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get( "method", "mapping_to_terrain_tiles" ) overloaded_conf["save_by_pair"] = conf.get("save_by_pair", False) overloaded_conf[application_constants.SAVE_INTERMEDIATE_DATA] = ( conf.get(application_constants.SAVE_INTERMEDIATE_DATA, False) ) point_cloud_fusion_schema = { "method": str, "save_by_pair": bool, application_constants.SAVE_INTERMEDIATE_DATA: bool, } # Check conf checker = Checker(point_cloud_fusion_schema) checker.validate(overloaded_conf) return overloaded_conf
[docs] def run( # noqa: C901 self, list_epipolar_point_clouds, bounds, epsg, source_pc_names=None, orchestrator=None, margins=0, optimal_terrain_tile_width=500, roi=None, save_laz_output=False, ): """ Run EpipolarCloudFusion application. Creates a CarsDataset corresponding to the merged point clouds, tiled with the terrain grid used during rasterization. :param list_epipolar_point_clouds: list with point clouds\ Each CarsDataset contains: - N x M Delayed tiles. \ Each tile will be a future xarray Dataset containing: - data : with keys : "x", "y", "z", "corr_msk" \ optional: "color", "msk", - attrs with keys: "margins", "epi_full_size", "epsg" - attributes containing: "disp_lower_bound", "disp_upper_bound" \ "elevation_delta_lower_bound", "elevation_delta_upper_bound" :type list_epipolar_point_clouds: list(CarsDataset) filled with xr.Dataset :param bounds: terrain bounds :type bounds: list :param epsg: epsg to use :type epsg: str :param source_pc_names: source pc names :type source_pc_names: list[str] :param orchestrator: orchestrator used :type orchestrator: Orchestrator :param margins: margins needed for tiles, meter or degree :type margins: float :param optimal_terrain_tile_width: optimal terrain tile width :type optimal_terrain_tile_width: int :param save_laz_output: save output point cloud as laz :type save_laz_output: bool :return: Merged point clouds CarsDataset contains: - Z x W Delayed tiles\ Each tile will be a future pandas DataFrame containing: - data : with keys : "x", "y", "z", "corr_msk" \ optional: "clr", "msk", "data_valid","coord_epi_geom_i",\ "coord_epi_geom_j","idx_im_epi" - attrs with keys: "epsg" - attributes containing: "bounds", "epsg" :rtype: CarsDataset filled with pandas.DataFrame """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator save_point_cloud_as_csv = self.used_config.get( application_constants.SAVE_INTERMEDIATE_DATA, False ) save_point_cloud_as_laz = ( self.used_config.get( application_constants.SAVE_INTERMEDIATE_DATA, False ) or save_laz_output ) save_by_pair = self.used_config.get("save_by_pair", False) if source_pc_names is None: source_pc_names = ["PAIR_0"] # Compute bounds and terrain grid [xmin, ymin, xmax, ymax] = bounds # Split terrain bounding box in pieces terrain_tiling_grid = tiling.generate_tiling_grid( xmin, ymin, xmax, ymax, optimal_terrain_tile_width, optimal_terrain_tile_width, ) source_pc_names = [] for point_cloud in list_epipolar_point_clouds: if "source_pc_name" in point_cloud.attributes: source_pc_names.append(point_cloud.attributes["source_pc_name"]) # Get dataset type of first item in list_epipolar_point_clouds pc_dataset_type = list_epipolar_point_clouds[0].dataset_type if pc_dataset_type in ( "arrays", "dict", "points", ): # Create CarsDataset merged_point_cloud = cars_dataset.CarsDataset( "points", name="point_cloud_fusion" ) # Compute tiling grid merged_point_cloud.tiling_grid = terrain_tiling_grid # update attributes merged_point_cloud.attributes["bounds"] = bounds merged_point_cloud.attributes["epsg"] = epsg merged_point_cloud.attributes["source_pc_names"] = source_pc_names number_of_terrain_tiles = ( merged_point_cloud.tiling_grid.shape[1] * merged_point_cloud.tiling_grid.shape[0] ) logging.info( "Number of tiles in cloud fusion :" "row : {} " "col : {}".format( merged_point_cloud.shape[0], merged_point_cloud.shape[1], ) ) number_of_epipolar_tiles_per_terrain_tiles = [] if pc_dataset_type in ( "arrays", "points", ): # deal with delayed tiles, with a priori disp min and max # Add epipolar_points_min and epipolar_points_max used # in point_cloud_fusion # , to get corresponding tiles (terrain) # TODO change method for corresponding tiles list_points_min = [] list_points_max = [] for point_cloud in list_epipolar_point_clouds: points_min, points_max = tiling.terrain_grid_to_epipolar( terrain_tiling_grid, point_cloud.tiling_grid, point_cloud.attributes["epipolar_grid_min"], point_cloud.attributes["epipolar_grid_max"], epsg, ) list_points_min.append(points_min) list_points_max.append(points_max) # Add infos to orchestrator.out_json updating_dict = { application_constants.APPLICATION_TAG: { cloud_fusion_constants.CLOUD_FUSION_RUN_TAG: { cloud_fusion_constants.EPSG_TAG: epsg, cloud_fusion_constants.MARGINS_TAG: margins, cloud_fusion_constants.NUMBER_TERRAIN_TILES: ( number_of_terrain_tiles ), cloud_fusion_constants.BOUNDS: bounds, }, } } orchestrator.update_out_info(updating_dict) # Generate merged point clouds logging.info( "Point clouds: Merged points number: {}".format( merged_point_cloud.shape[1] * merged_point_cloud.shape[0] ) ) # Compute corresponing tiles in parallel if from tif files color_type = None if pc_dataset_type == "dict": corresponding_tiles_cars_ds = ( pc_tif_tools.get_corresponding_tiles_tif( terrain_tiling_grid, list_epipolar_point_clouds, margins=margins, orchestrator=self.orchestrator, ) ) color_file = list_epipolar_point_clouds[0].tiles[0][0]["data"][ "color" ] if color_file is not None: color_type = inputs.rasterio_get_image_type(color_file) merged_point_cloud.attributes["color_type"] = color_type # Save objects csv_pc_dir_name = None if save_point_cloud_as_csv: # Point cloud file name csv_pc_dir_name = os.path.join( self.orchestrator.out_dir, "dump_dir", "point_cloud_fusion", "csv", ) safe_makedirs(csv_pc_dir_name) self.orchestrator.add_to_compute_lists( merged_point_cloud, cars_ds_name="merged_point_cloud_csv" ) laz_pc_dir_name = None if save_point_cloud_as_laz: # Point cloud file name if save_laz_output: laz_pc_dir_name = os.path.join( self.orchestrator.out_dir, "point_cloud" ) else: laz_pc_dir_name = os.path.join( self.orchestrator.out_dir, "dump_dir", "point_cloud_fusion", "laz", ) safe_makedirs(laz_pc_dir_name) self.orchestrator.add_to_compute_lists( merged_point_cloud, cars_ds_name="merged_point_cloud" ) # Get saving infos in order to save tiles when they are computed [saving_info] = self.orchestrator.get_saving_infos( [merged_point_cloud] ) pc_index = {} for col in range(merged_point_cloud.shape[1]): for row in range(merged_point_cloud.shape[0]): # update saving infos for potential replacement full_saving_info = ocht.update_saving_infos( saving_info, row=row, col=col ) if pc_dataset_type in ( "arrays", "points", ): # Get required point clouds ( terrain_region, required_point_clouds, _rank, _pos, ) = tiling.get_corresponding_tiles_row_col( terrain_tiling_grid, row, col, list_epipolar_point_clouds, list_points_min, list_points_max, ) else: # Get correspondances previously computed terrain_region = corresponding_tiles_cars_ds[row, col][ "terrain_region" ] required_point_clouds = corresponding_tiles_cars_ds[ row, col ]["required_point_clouds"] terrain_region_poly = Polygon( [ [terrain_region[0], terrain_region[1]], [terrain_region[0], terrain_region[3]], [terrain_region[2], terrain_region[3]], [terrain_region[2], terrain_region[1]], [terrain_region[0], terrain_region[1]], ] ) if len( [ value for value, _ in required_point_clouds if not isinstance(value, type(None)) ] ) > 0 and ( roi is None or terrain_region_poly.intersects(roi) ): logging.debug( "Number of clouds to process for this terrain" " tile: {}".format(len(required_point_clouds)) ) number_of_epipolar_tiles_per_terrain_tiles.append( len(required_point_clouds) ) csv_pc_file_name, laz_pc_file_name = ( generate_point_cloud_file_names( csv_pc_dir_name, laz_pc_dir_name, row, col, pc_index, pair_key=( source_pc_names if save_by_pair else None ), ) ) # Delayed call to rasterization operations using all # required point clouds merged_point_cloud[ row, col ] = self.orchestrator.cluster.create_task( compute_point_cloud_wrapper )( required_point_clouds, epsg, xmin=terrain_region[0], ymin=terrain_region[1], xmax=terrain_region[2], ymax=terrain_region[3], margins=margins, save_by_pair=save_by_pair, point_cloud_csv_file_name=csv_pc_file_name, point_cloud_laz_file_name=laz_pc_file_name, saving_info=full_saving_info, source_pc_names=source_pc_names, ) # update point cloud index if save_laz_output: self.orchestrator.update_index(pc_index) # Sort tiles according to rank TODO remove or implement it ? # Raise an error if no tiles has been found if len(number_of_epipolar_tiles_per_terrain_tiles) < 1: raise RuntimeError( "No epipolar tiles has been found inside the ROI! " "Please try with an other ROI." ) # Add delayed_dsm_tiles to orchestrator logging.info( "Submitting {} tasks to dask".format(number_of_terrain_tiles) ) logging.info( "Number of epipolar tiles " "for each terrain tile (counter): {}".format( sorted( Counter( number_of_epipolar_tiles_per_terrain_tiles ).items() ) ) ) logging.info( "Average number of epipolar tiles " "for each terrain tile: {}".format( int( np.round( np.mean(number_of_epipolar_tiles_per_terrain_tiles) ) ) ) ) logging.info( "Max number of epipolar tiles " "for each terrain tile: {}".format( np.max(number_of_epipolar_tiles_per_terrain_tiles) ) ) else: logging.error( "PointCloudRasterisation application doesn't " "support this input data format" ) return merged_point_cloud
[docs]def compute_point_cloud_wrapper( point_clouds, epsg, xmin: float = None, ymin: float = None, xmax: float = None, ymax: float = None, margins: float = 0, save_by_pair: bool = False, point_cloud_csv_file_name=None, point_cloud_laz_file_name=None, saving_info=None, source_pc_names=None, ): """ Wrapper for point clouds fusion step : - Convert a list of clouds to correct epsg :param point_clouds: list of clouds, list of (dataset, dataset_id) with : - cst.X - cst.Y - cst.Z - cst.EPI_COLOR :type point_clouds: list((xr.Dataset, int)) :param epsg_code: epsg code for the CRS of the output DSM :type epsg_code: int :param stereo_out_epsg: epsg code to convert point cloud to, if needed :type stereo_out_epsg: int :param xmin: xmin of the rasterization grid (if None, will be estimated by the function) :param xmin: xmin of the rasterization grid (if None, will be estimated by the function) :param xmax: xmax of the rasterization grid (if None, will be estimated by the function) :param ymax: ymax of the rasterization grid (if None, will be estimated by the function) :param margins: margins needed for tiles, meter or degree :type margins: float :param save_by_pair: save point cloud as pair :type save_by_pair: bool :param point_cloud_csv_file_name: write point cloud as CSV in filename (if None, the point cloud is not written as csv) :type point_cloud_csv_file_name: str :param point_cloud_laz_file_name: write point cloud as laz in filename (if None, the point cloud is not written as laz) :type point_cloud_laz_file_name: str :param saving_info: informations about CarsDataset ID. :type saving_info: dict :param source_pc_names: source point cloud name (correspond to pair_key) :type source_pc_names: list str :return: merged point cloud dataframe with: - cst.X - cst.Y - cst.Z - cst.EPI_COLOR - attrs : xmin, xmax, ymin, ymax, saving_info :rtype: pandas.DataFrame """ # Remove None tiles clouds = [] clouds_ids = [] disparity_range_is_cropped = False for value, pc_id in point_clouds: if value is not None: clouds.append(value) clouds_ids.append(pc_id) # Check if disparity range was cropped during process if ocht.get_disparity_range_cropped(value): disparity_range_is_cropped = True if len(clouds) == 0: raise RuntimeError("All clouds are None") # combine clouds if not isinstance(clouds[0], dict): pc_pandas, cloud_epsg = point_cloud_tools.create_combined_cloud( clouds, clouds_ids, epsg, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax, margin=margins, with_coords=True, ) # get color type list color_type = point_cloud_tools.get_color_type(clouds) else: # combined pc from tif files ( pc_pandas, cloud_epsg, color_type, ) = pc_tif_tools.create_combined_cloud_from_tif( clouds, clouds_ids, epsg, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax, margin=margins, ) # Conversion to UTM if cloud_epsg != epsg: projection.point_cloud_conversion_dataframe(pc_pandas, cloud_epsg, epsg) cloud_epsg = epsg # Fill attributes for rasterization attributes = { "epsg": cloud_epsg, "xmin": xmin, "xmax": xmax, "ymin": ymin, "ymax": ymax, "color_type": color_type, "source_pc_names": source_pc_names, "number_of_pc": len(source_pc_names), cst.CROPPED_DISPARITY_RANGE: disparity_range_is_cropped, } cars_dataset.fill_dataframe( pc_pandas, saving_info=saving_info, attributes=attributes ) # save point cloud in worker if point_cloud_csv_file_name: cars_dataset.run_save_points( pc_pandas, point_cloud_csv_file_name, save_by_pair=save_by_pair, overwrite=True, point_cloud_format="csv", overwrite_file_name=False, ) if point_cloud_laz_file_name: cars_dataset.run_save_points( pc_pandas, point_cloud_laz_file_name, save_by_pair=save_by_pair, overwrite=True, point_cloud_format="laz", overwrite_file_name=False, ) return pc_pandas