Source code for cars.applications.dense_match_filling.plane

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the fill_disp application class.
"""


import copy

# Standard imports
import logging

# Third party imports
from json_checker import Checker, Or
from shapely.geometry import Polygon

# CARS imports
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.dense_match_filling import fill_disp_constants as fd_cst
from cars.applications.dense_match_filling import fill_disp_tools as fd_tools
from cars.applications.dense_match_filling.dense_match_filling import (
    DenseMatchFilling,
)
from cars.core import constants as cst
from cars.data_structures import cars_dataset, corresponding_tiles_tools


[docs]class PlaneFill( DenseMatchFilling, short_name=["plane"] ): # pylint: disable=R0903 """ Fill invalid area in disparity map using plane method """ # pylint: disable=too-many-instance-attributes def __init__(self, conf=None): """ Init function of FillDisp :param conf: configuration for filling :return: a application_to_use object """ super().__init__(conf=conf) # get conf self.used_method = self.used_config["method"] self.interpolation_type = self.used_config[fd_cst.INTERP_TYPE] self.interpolation_method = self.used_config[fd_cst.INTERP_METHOD] self.max_search_distance = self.used_config[fd_cst.MAX_DIST] self.smoothing_iterations = self.used_config[fd_cst.SMOOTH_IT] self.ignore_nodata_at_disp_mask_borders = self.used_config[ fd_cst.IGNORE_NODATA ] self.ignore_zero_fill_disp_mask_values = self.used_config[ fd_cst.IGNORE_ZERO ] self.ignore_extrema_disp_values = self.used_config[ fd_cst.IGNORE_EXTREMA ] self.nb_pix = self.used_config["nb_pix"] self.percent_to_erode = self.used_config["percent_to_erode"] self.classification = self.used_config["classification"] # Saving files self.save_intermediate_data = self.used_config["save_intermediate_data"]
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "plane") overloaded_conf[fd_cst.INTERP_TYPE] = conf.get( fd_cst.INTERP_TYPE, "pandora" ) overloaded_conf[fd_cst.INTERP_METHOD] = conf.get( fd_cst.INTERP_METHOD, "mc_cnn" ) overloaded_conf[fd_cst.MAX_DIST] = conf.get(fd_cst.MAX_DIST, 100) overloaded_conf[fd_cst.SMOOTH_IT] = conf.get(fd_cst.SMOOTH_IT, 1) overloaded_conf[fd_cst.IGNORE_NODATA] = conf.get( fd_cst.IGNORE_NODATA, False ) overloaded_conf[fd_cst.IGNORE_ZERO] = conf.get(fd_cst.IGNORE_ZERO, True) overloaded_conf[fd_cst.IGNORE_EXTREMA] = conf.get( fd_cst.IGNORE_EXTREMA, True ) overloaded_conf["nb_pix"] = conf.get("nb_pix", 20) overloaded_conf["percent_to_erode"] = conf.get("percent_to_erode", 0.2) overloaded_conf["classification"] = conf.get("classification", None) # Saving files overloaded_conf["save_intermediate_data"] = conf.get( "save_intermediate_data", False ) application_schema = { "method": str, "save_intermediate_data": bool, "interpolation_type": Or(None, str), "interpolation_method": Or(None, str), "max_search_distance": Or(None, int), "smoothing_iterations": Or(None, int), "ignore_nodata_at_disp_mask_borders": bool, "ignore_zero_fill_disp_mask_values": bool, "ignore_extrema_disp_values": bool, "nb_pix": Or(None, int), "percent_to_erode": Or(None, float), "classification": Or(None, [str]), } # Check conf checker = Checker(application_schema) checker.validate(overloaded_conf) return overloaded_conf
[docs] def get_poly_margin(self): """ Get the margin used for polygon :return: self.nb_pix :rtype: int """ return self.nb_pix
[docs] def run( self, epipolar_disparity_map, holes_bbox_left=None, holes_bbox_right=None, disp_min=0, disp_max=0, orchestrator=None, pair_folder=None, pair_key="PAIR_0", ): """ Run Refill application using plane method. :param epipolar_disparity_map: left to right disparity :type epipolar_disparity_map: CarsDataset :param holes_bbox_left: left holes :type holes_bbox_left: CarsDataset :param holes_bbox_right: right holes :type holes_bbox_right: CarsDataset :param disp_min: minimum disparity :type disp_min: int :param disp_max: maximum disparity :type disp_max: int :param orchestrator: orchestrator used :param pair_folder: folder used for current pair :type pair_folder: str :param pair_key: pair id :type pair_key: str :return: filled disparity map: \ Each CarsDataset contains: - N x M Delayed tiles.\ Each tile will be a future xarray Dataset containing: - data with keys : "disp", "disp_msk" - attrs with keys: profile, window, overlaps - attributes containing: "largest_epipolar_region","opt_epipolar_tile_size", "epipolar_regions_grid" :rtype: CarsDataset """ if holes_bbox_left is None or holes_bbox_right is None: raise RuntimeError("Disparity holes bbox are inconsistent.") res = None if not self.classification: logging.info("Disparity holes filling was not activated") res = epipolar_disparity_map else: # Default orchestrator if orchestrator is None: # Create defaut sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator interp_options = { "type": self.interpolation_type, "method": self.interpolation_method, "smoothing_iterations": self.smoothing_iterations, "max_search_distance": self.max_search_distance, } if epipolar_disparity_map.dataset_type == "arrays": new_epipolar_disparity_map = self.__register_dataset__( epipolar_disparity_map, self.save_intermediate_data, pair_folder, pair_key, app_name="plane", ) # Get saving infos in order to save tiles when they are computed [saving_info] = self.orchestrator.get_saving_infos( [new_epipolar_disparity_map] ) # Add infos to orchestrator.out_json updating_dict = { application_constants.APPLICATION_TAG: { fd_cst.FILL_DISP_WITH_PLAN_RUN_TAG: { pair_key: {}, } } } self.orchestrator.update_out_info(updating_dict) logging.info( "Fill missing disparity with plan model" ": number tiles: {}".format( epipolar_disparity_map.shape[1] * epipolar_disparity_map.shape[0] ) ) # get all polygones poly_list_left = fd_tools.get_polygons_from_cars_ds( holes_bbox_left ) poly_list_right = fd_tools.get_polygons_from_cars_ds( holes_bbox_right ) # Estimate right poly on left # using disparity range poly_list_right_on_left = [ fd_tools.estimate_poly_with_disp( p, dmin=-disp_max, dmax=-disp_min ) for p in poly_list_right ] # Merge polygones merged_poly_list = fd_tools.merge_intersecting_polygones( poly_list_left + poly_list_right_on_left ) logging.info( "Disparity filling: {} holes on" " left to fill".format(len(merged_poly_list)) ) # Generate polygones for tiles tiles_polygones = {} for col in range(epipolar_disparity_map.shape[1]): for row in range(epipolar_disparity_map.shape[0]): tile = epipolar_disparity_map.tiling_grid[row, col] tiles_polygones[(row, col)] = Polygon( [ [tile[0], tile[2]], [tile[0], tile[3]], [tile[1], tile[3]], [tile[1], tile[2]], [tile[0], tile[2]], ] ) # Generate disparity maps for col in range(epipolar_disparity_map.shape[1]): for row in range(epipolar_disparity_map.shape[0]): if epipolar_disparity_map[row, col] is not None: tile_poly = tiles_polygones[(row, col)] # Get intersecting holes poly corresponding_holes = ( fd_tools.get_corresponding_holes( tile_poly, merged_poly_list ) ) # Get corresponding_tiles # list of (tile_window, tile overlap, xr.Dataset) corresponding_tiles = ( fd_tools.get_corresponding_tiles( tiles_polygones, corresponding_holes, epipolar_disparity_map, ) ) # get tile window and overlap window = new_epipolar_disparity_map.tiling_grid[ row, col ] overlap = new_epipolar_disparity_map.overlaps[ row, col ] # update saving infos for potential replacement full_saving_info = ocht.update_saving_infos( saving_info, row=row, col=col ) if len(corresponding_tiles) == 0: # copy dataset ( new_epipolar_disparity_map[row, col] ) = self.orchestrator.cluster.create_task( wrapper_copy_disparity )( epipolar_disparity_map[row, col], window, overlap, saving_info=full_saving_info, ) else: # Fill holes ( new_epipolar_disparity_map[row, col] ) = self.orchestrator.cluster.create_task( fill_disparity_plane_wrapper )( corresponding_tiles, corresponding_holes, window, overlap, self.classification, ignore_nodata_at_disp_mask_borders=( self.ignore_nodata_at_disp_mask_borders ), ignore_zero_fill_disp_mask_values=( self.ignore_zero_fill_disp_mask_values ), ignore_extrema_disp_values=( self.ignore_extrema_disp_values ), nb_pix=self.nb_pix, percent_to_erode=self.percent_to_erode, interp_options=interp_options, saving_info=full_saving_info, ) res = new_epipolar_disparity_map else: logging.error( "FillDisp application doesn't support " "this input data format" ) return res
[docs]def fill_disparity_plane_wrapper( corresponding_tiles, corresponding_poly, window, overlap, classification, ignore_nodata_at_disp_mask_borders=True, ignore_zero_fill_disp_mask_values=True, ignore_extrema_disp_values=True, nb_pix=20, percent_to_erode=0.3, interp_options=None, saving_info=None, ): """ Wrapper to Fill disparity map holes :param corresponding_tiles: disparity map tiles :type corresponding_tiles: list(tuple(list, list, xr.Dataset)) :param corresponding_poly: holes polygons :type corresponding_poly: list(Polygon) :param window: window of base tile [row min, row max, col min col max] :type window: list :param overlap: overlap [row min, row max, col min col max] :type overlap: list :param left_epi_image: left epipolar image :type left_epi_image: xr.Dataset :param classification: list of tag to use :type classification: list(str) :param ignore_nodata_at_disp_mask_borders: ingore nodata :type ignore_nodata_at_disp_mask_borders: bool :param ignore_zero_fill_disp_mask_values: ingnore zero fill :type ignore_zero_fill_disp_mask_values: bool :param ignore_extrema_disp_values: ignore extrema :type ignore_extrema_disp_values: bool :param nb_pix: margin to use :type nb_pix: int :param percent_to_erode: percent to erode :type percent_to_erode: float :param interp_options: interp_options :type interp_options: dict :param saving_info: saving infos :type saving_info: dict :return: disp map :rtype: xr.Dataset """ # Create combined xarray Dataset ( combined_dataset, row_min, col_min, ) = corresponding_tiles_tools.reconstruct_data( corresponding_tiles, window, overlap ) # Add a band to disparity dataset to memorize which pixels are filled combined_dataset = fd_tools.add_empty_filling_band( combined_dataset, ["plane.hole_center", "plane.hole_border"] ) # Fill disparity fd_tools.fill_disp_using_plane( combined_dataset, corresponding_poly, row_min, col_min, ignore_nodata_at_disp_mask_borders, ignore_zero_fill_disp_mask_values, ignore_extrema_disp_values, nb_pix, percent_to_erode, interp_options, classification, ) # Find xarray Dataset corresponding to current tile input_disp = copy.deepcopy( corresponding_tiles_tools.find_tile_dataset(corresponding_tiles, window) ) # Add additional attribute "filling" to dataset template input_disp[cst.EPI_FILLING] = combined_dataset[cst.EPI_FILLING] # Crop Dataset to get tile disparity cropped_disp = corresponding_tiles_tools.crop_dataset( combined_dataset, input_disp, window, overlap, row_min, col_min, ) # Fill with attributes cars_dataset.fill_dataset( cropped_disp, saving_info=saving_info, window=cars_dataset.window_array_to_dict(window), profile=None, attributes=None, overlaps=cars_dataset.overlap_array_to_dict(overlap), ) return cropped_disp
[docs]def wrapper_copy_disparity( disp, window, overlap, saving_info=None, ): """ Wrapper to copy previous disparity with additional filling band with zeros :param disp: disparity map :type disp: xr.Dataset :param window: window of base tile [row min, row max, col min col max] :type window: list :param overlap: overlap [row min, row max, col min col max] :type overlap: list :param saving_info: saving infos :type saving_info: dict :return: disp map :rtype: xr.Dataset """ # Fill band named filling with zeros res = fd_tools.add_empty_filling_band( disp, ["plane.hole_center", "plane.hole_border"] ) attributes = { cst.CROPPED_DISPARITY_RANGE: (ocht.get_disparity_range_cropped(disp)) } # Fill with attributes cars_dataset.fill_dataset( res, saving_info=saving_info, window=cars_dataset.window_array_to_dict(window), profile=None, attributes=attributes, overlaps=cars_dataset.overlap_array_to_dict(overlap), ) return res