Source code for cars.applications.dense_matching.loaders.pandora_loader

# !/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
CARS pandora loader file
"""

import copy
import json
import logging
import os
from collections import OrderedDict
from typing import Dict

import numpy as np
import pandora
import rasterio
import xarray as xr
from json_checker import Checker, Or
from pandora.check_configuration import (
    check_pipeline_section,
    concat_conf,
    get_config_pipeline,
    update_conf,
)
from pandora.state_machine import PandoraMachine
from rasterio.mask import mask
from shapely.geometry import mapping

from cars.core.projection import polygon_projection
from cars.orchestrator.cluster.log_wrapper import cars_profile


[docs] class PandoraLoader: """ PandoraLoader """ def __init__( # pylint: disable=too-many-positional-arguments # noqa: C901 self, conf=None, method_name=None, generate_performance_map_from_risk=False, generate_performance_map_from_intervals=False, generate_ambiguity=False, perf_eta_max_ambiguity=0.99, perf_eta_max_risk=0.25, perf_eta_step=0.04, use_cross_validation=True, denoise_disparity_map=False, used_band="b0", classification_3sgm=None, ): """ Init function of PandoraLoader If conf is profided, pandora will use it If not, Pandora will use intern configuration : census or mccnn, depending on method_name :param conf: configuration of pandora to use :type conf: dict :param method_name: name of method to use :param performance_map_conf: true if generate performance maps :param use_cross_validation: true to add crossvalidation :param denoise_disparity_map: true to add the disparity denoiser filter :param used_band: name of band used for correlation :type used_band: str :param classification_3sgm: use 3SGM with classif (list of bands) :type classification_3sgm: list[str] or None """ if method_name is None: method_name = "census_sgm_default" self.pandora_config = None uses_cars_pandora_conf = False if isinstance(conf, str): # load file with open(conf, "r", encoding="utf8") as fstream: conf = json.load(fstream) elif conf is None: uses_cars_pandora_conf = True package_path = os.path.dirname(__file__) config_map = { "pandora_mccnn_sgm": "config_mccnn.json", "pandora_census_sgm_urban": "config_census_sgm_urban.json", "pandora_census_sgm_shadow": "config_census_sgm_shadow.json", "pandora_census_sgm_mountain_and_vegetation": ( "config_census_sgm_mountain_and_vegetation.json" ), "pandora_census_sgm_homogeneous": ( "config_census_sgm_homogeneous.json" ), "pandora_census_sgm_default": "config_census_sgm_default.json", "pandora_census_sgm_sparse": "config_census_sgm_sparse.json", "pandora_auto": "config_census_sgm_default.json", } try: filename = config_map[method_name] except KeyError as err: logging.error( "No method named {} in pandora loader".format(method_name) ) raise NameError( "No method named {} in pandora loader".format(method_name) ) from err conf_file_path = os.path.join(package_path, filename) with open(conf_file_path, "r", encoding="utf8") as fstream: conf = json.load(fstream) perf_ambiguity_conf = { "cost_volume_confidence.cars_1": { "confidence_method": "ambiguity", "eta_max": perf_eta_max_ambiguity, "eta_step": perf_eta_step, } } perf_risk_conf = { "cost_volume_confidence.cars_2": { "confidence_method": "risk", "eta_max": perf_eta_max_risk, "eta_step": perf_eta_step, } } intervals_conf = { "cost_volume_confidence.cars_3": { "confidence_method": "interval_bounds", } } # Cross validation cross_validation_acc_conf = { "validation": { "validation_method": "cross_checking_accurate", "cross_checking_threshold": 1.0, } } cross_validation_fast_conf = { "validation": { "validation_method": "cross_checking_fast", } } disparity_denoiser_conf = { "filter": {"filter_method": "disparity_denoiser"} } confidences = {} if generate_performance_map_from_risk: confidences.update(perf_ambiguity_conf) confidences.update(perf_risk_conf) if generate_performance_map_from_intervals: confidences.update(perf_ambiguity_conf) confidences.update(intervals_conf) if generate_ambiguity: confidences.update(perf_ambiguity_conf) if confidences: conf["pipeline"] = overload_pandora_conf_with_confidence( conf["pipeline"], confidences ) # update with cross validation if "validation" not in conf["pipeline"]: if use_cross_validation in (True, "fast"): conf["pipeline"].update(cross_validation_fast_conf) elif use_cross_validation == "accurate": conf["pipeline"].update(cross_validation_acc_conf) if ( denoise_disparity_map and conf["pipeline"]["filter"]["filter_method"] != "disparity_denoiser" ): conf["pipeline"].update(disparity_denoiser_conf) if classification_3sgm: conf["pipeline"]["optimization"]["optimization_method"] = "3sgm" conf["pipeline"]["optimization"]["geometric_prior"] = { "source": "classif", "classes": classification_3sgm, } if "band" not in conf["pipeline"]["matching_cost"]: conf["pipeline"]["matching_cost"]["band"] = used_band if generate_performance_map_from_intervals: # To ensure the consistency between the disparity map # and the intervals, the median filter for intervals # must be similar to the median filter. The filter is # added at the end of the conf as it is applied during # the disp_map state. try: filter_size = conf["pipeline"]["filter"]["filter_size"] except KeyError: filter_size = 3 conf_filter_interval = { "filter.cars_3": { "filter_method": "median_for_intervals", "filter_size": filter_size, "interval_indicator": "cars_3", "regularization": True, "ambiguity_indicator": "cars_1", } } pipeline_dict = OrderedDict() pipeline_dict.update(conf["pipeline"]) # Filter is placed after validation in config # and should be placed before. # However it does not have any incidence on operation if uses_cars_pandora_conf: pipeline_dict.update(conf_filter_interval) conf["pipeline"] = pipeline_dict if "filter" in conf["pipeline"]: filter_conf = conf["pipeline"]["filter"] if filter_conf["filter_method"] == "disparity_denoiser": if "band" not in filter_conf: conf["pipeline"]["filter"]["band"] = used_band # Check conf self.pandora_config = conf
[docs] def get_conf(self): """ Get pandora configuration used :return: pandora configuration :rtype: dict """ return self.pandora_config
[docs] def get_classif_bands(self): """ Get the classification bands used in the pandora configuration :return: list of classification bands """ classif_bands = [] def search_classes_recursive(obj): """ Recursive search of keys containing 'classes' in the configuration """ if isinstance(obj, dict): for key, value in obj.items(): if "classes" in key.lower(): if isinstance(value, list): classif_bands.extend(value) elif value is not None: classif_bands.append(value) # Continue recursive search search_classes_recursive(value) search_classes_recursive(self.pandora_config) # Remove duplicates and return the list return list(set(classif_bands))
@cars_profile(name="Find auto conf") def find_auto_conf( self, intersection_poly, land_cover_map, classif_to_config_mapping, epsg ): """ Find the configuration that suits the most on the land cover map based on the roi """ package_path = os.path.dirname(__file__) # construct the path to the land_cover_map if os.path.dirname(land_cover_map) == "": land_cover_map_path = os.path.join(package_path, land_cover_map) else: land_cover_map_path = land_cover_map with rasterio.open(land_cover_map_path) as src: # Project the polygon to the right epsg if src.crs != epsg: poly = polygon_projection( intersection_poly, epsg, src.crs.to_epsg() ) else: poly = intersection_poly # Use a buffer because the land_cover_map resolution is coarse data_land_cover, _ = mask( src, [mapping(poly)], crop=True, all_touched=True ) # Find the most common class in the roi data_squeeze = data_land_cover.squeeze() valid_data = data_squeeze[data_squeeze != src.nodata] most_common_class = None if valid_data.size > 0: classes, counts = np.unique(valid_data, return_counts=True) max_index = np.argmax(counts) most_common_class = classes[max_index] # Construct the path to the classification to configuration mapping if os.path.dirname(classif_to_config_mapping) == "": conf_file_path = os.path.join( package_path, classif_to_config_mapping ) else: conf_file_path = classif_to_config_mapping # read conf with open(conf_file_path, "r", encoding="utf8") as fstream: conf_mapping = json.load(fstream) # Find the configuration that corresponds to the most common class corresponding_conf_name = conf_mapping.get(str(most_common_class), None) # If no equivalence has been found, we use the default configuration if corresponding_conf_name is None: corresponding_conf_name = "census_sgm_default" logging.info( "The conf that has been chosen regarding the " "world classification map is {}".format(corresponding_conf_name) ) # We return the corresponding configuration json_conf_name = os.path.join( package_path, "config_" + corresponding_conf_name + ".json" ) with open(json_conf_name, "r", encoding="utf8") as fstream: conf = json.load(fstream) return conf
[docs] def check_conf( # pylint: disable=too-many-positional-arguments self, user_cfg, nodata_left, nodata_right, bands_left, bands_right, bands_classif_left=None, bands_classif_right=None, ): """ Check configuration :param user_cfg: configuration :type user_cfg: dict :return: pandora configuration :rtype: dict """ # Import plugins before checking configuration pandora.import_plugin() # Check configuration and update the configuration with default values # Instantiate pandora state machine pandora_machine = PandoraMachine() # check pipeline metadata_left = overide_pandora_get_metadata( bands_left, classif_bands=bands_classif_left ) metadata_right = overide_pandora_get_metadata( bands_right, classif_bands=bands_classif_right ) metadata_left = metadata_left.assign_coords(band_im=bands_left) metadata_right = metadata_right.assign_coords(band_im=bands_right) user_cfg_pipeline = get_config_pipeline(user_cfg) # check that if validation is cross_checking_*, both # images have a classification band for 3SGM validation_method = ( user_cfg_pipeline["pipeline"] .get("validation", {}) .get("validation_method", None) ) sgm_optimization_method = ( user_cfg_pipeline["pipeline"] .get("optimization", {}) .get("optimization_method", None) ) if validation_method is not None and validation_method.startswith( "cross_checking" ): if sgm_optimization_method == "3sgm": has_classif_left = bands_classif_left is not None has_classif_right = bands_classif_right is not None if not (has_classif_left and has_classif_right): raise ValueError( "When using 3SGM and cross-validation at the same time," " both of the images should have a classification." ) saved_schema = copy.deepcopy( pandora.matching_cost.matching_cost.AbstractMatchingCost.schema ) cfg_pipeline = check_pipeline_section( user_cfg_pipeline, metadata_left, metadata_right, pandora_machine ) # quick fix to remove when the problem is solved in pandora pandora.matching_cost.matching_cost.AbstractMatchingCost.schema = ( saved_schema ) # check a part of input section user_cfg_input = get_config_input_custom_cars( user_cfg, nodata_left, nodata_right ) cfg_input = check_input_section_custom_cars(user_cfg_input) # concatenate updated config cfg = concat_conf([cfg_input, cfg_pipeline]) return cfg
input_configuration_schema_custom_cars = { "nodata_left": Or( int, lambda x: np.isnan(x) # pylint: disable=unnecessary-lambda ), "nodata_right": Or( int, lambda x: np.isnan(x) # pylint: disable=unnecessary-lambda ), } default_short_configuration_input_custom_cars = { "input": { "nodata_left": -9999, "nodata_right": -9999, } }
[docs] def overide_pandora_get_metadata( im_bands: list, classif_bands: list = None ) -> xr.Dataset: """ Read metadata from image, and return the corresponding xarray.DataSet :param im_bands: list of band names :param classif_bands: list of classification band names :return: partial xarray.DataSet (attributes and coordinates) :rtype: xarray.DataSet """ coords = { "band_im": list(im_bands), "row": np.arange(10), "col": np.arange(10), } data_vars = { "image": (["row", "col"], np.zeros((10, 10))), } if classif_bands is not None: coords["band_classif"] = list(classif_bands) data_vars["classif"] = ( ["row", "col", "band_classif"], np.zeros((10, 10, len(classif_bands)), dtype=np.int32), ) # create the dataset dataset = xr.Dataset(data_vars=data_vars, coords=coords) dataset.attrs["disparity_source"] = None return dataset
[docs] def get_config_input_custom_cars( user_cfg: Dict[str, dict], nodata_left, nodata_right ) -> Dict[str, dict]: """ Get the input configuration :param user_cfg: user configuration :type user_cfg: dict :return cfg: partial configuration :rtype cfg: dict """ cfg = {} if "input" in user_cfg: cfg["input"] = {} if "nodata_left" in user_cfg["input"]: cfg["input"]["nodata_left"] = user_cfg["input"]["nodata_left"] else: cfg["input"]["nodata_left"] = nodata_left if "nodata_right" in user_cfg["input"]: cfg["input"]["nodata_right"] = user_cfg["input"]["nodata_right"] else: cfg["input"]["nodata_right"] = nodata_right return cfg
[docs] def check_input_section_custom_cars( user_cfg: Dict[str, dict], ) -> Dict[str, dict]: """ Complete and check if the dictionary is correct :param user_cfg: user configuration :type user_cfg: dict :return: cfg: global configuration :rtype: cfg: dict """ # Add missing steps and inputs defaults values in user_cfg cfg = update_conf(default_short_configuration_input_custom_cars, user_cfg) # check schema configuration_schema = {"input": input_configuration_schema_custom_cars} checker = Checker(configuration_schema) checker.validate(cfg) return cfg
[docs] def overload_pandora_conf_with_confidence(conf, confidence_conf): """ Overload pandora pipeline configuration with given confidence to add just before disparity computation. :param conf: current pandora configuration :type conf: OrderedDict :param confidence_conf: confidence applications config :type confidence_conf: OrderedDict :return: updated pandora pipeline conf :rtype: OrderedDict """ out_dict = OrderedDict() out_dict.update(conf) conf_keys = list(conf.keys()) confidence_conf_keys = list(confidence_conf.keys()) for key in confidence_conf_keys: if key in conf_keys: logging.info("{} pandora key already in configuration".format(key)) # update confidence out_dict.update(confidence_conf) # move confidence keys right before disparity computation # get position of key "disparity" if "disparity" not in conf_keys: raise RuntimeError("disparity key not in pandora configuration") disp_index = conf_keys.index("disparity") # move to end every key from disparity for ind in range(disp_index, len(conf_keys)): out_dict.move_to_end(conf_keys[ind]) return out_dict