# !/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
CARS pandora loader file
"""
import copy
import json
import logging
import os
from collections import OrderedDict
from typing import Dict
import numpy as np
import pandora
import rasterio
import xarray as xr
from json_checker import Checker, Or
from pandora.check_configuration import (
check_pipeline_section,
concat_conf,
get_config_pipeline,
update_conf,
)
from pandora.state_machine import PandoraMachine
from rasterio.mask import mask
from shapely.geometry import mapping
from cars.core.projection import polygon_projection
from cars.orchestrator.cluster.log_wrapper import cars_profile
[docs]
class PandoraLoader:
"""
PandoraLoader
"""
def __init__( # pylint: disable=too-many-positional-arguments # noqa: C901
self,
conf=None,
method_name=None,
generate_performance_map_from_risk=False,
generate_performance_map_from_intervals=False,
generate_ambiguity=False,
perf_eta_max_ambiguity=0.99,
perf_eta_max_risk=0.25,
perf_eta_step=0.04,
use_cross_validation=True,
denoise_disparity_map=False,
used_band="b0",
classification_3sgm=None,
):
"""
Init function of PandoraLoader
If conf is profided, pandora will use it
If not, Pandora will use intern configuration :
census or mccnn, depending on method_name
:param conf: configuration of pandora to use
:type conf: dict
:param method_name: name of method to use
:param performance_map_conf: true if generate performance maps
:param use_cross_validation: true to add crossvalidation
:param denoise_disparity_map: true to add the disparity denoiser filter
:param used_band: name of band used for correlation
:type used_band: str
:param classification_3sgm: use 3SGM with classif (list of bands)
:type classification_3sgm: list[str] or None
"""
if method_name is None:
method_name = "census_sgm_default"
self.pandora_config = None
uses_cars_pandora_conf = False
if isinstance(conf, str):
# load file
with open(conf, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif conf is None:
uses_cars_pandora_conf = True
package_path = os.path.dirname(__file__)
config_map = {
"pandora_mccnn_sgm": "config_mccnn.json",
"pandora_census_sgm_urban": "config_census_sgm_urban.json",
"pandora_census_sgm_shadow": "config_census_sgm_shadow.json",
"pandora_census_sgm_mountain_and_vegetation": (
"config_census_sgm_mountain_and_vegetation.json"
),
"pandora_census_sgm_homogeneous": (
"config_census_sgm_homogeneous.json"
),
"pandora_census_sgm_default": "config_census_sgm_default.json",
"pandora_census_sgm_sparse": "config_census_sgm_sparse.json",
"pandora_auto": "config_census_sgm_default.json",
}
try:
filename = config_map[method_name]
except KeyError as err:
logging.error(
"No method named {} in pandora loader".format(method_name)
)
raise NameError(
"No method named {} in pandora loader".format(method_name)
) from err
conf_file_path = os.path.join(package_path, filename)
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
perf_ambiguity_conf = {
"cost_volume_confidence.cars_1": {
"confidence_method": "ambiguity",
"eta_max": perf_eta_max_ambiguity,
"eta_step": perf_eta_step,
}
}
perf_risk_conf = {
"cost_volume_confidence.cars_2": {
"confidence_method": "risk",
"eta_max": perf_eta_max_risk,
"eta_step": perf_eta_step,
}
}
intervals_conf = {
"cost_volume_confidence.cars_3": {
"confidence_method": "interval_bounds",
}
}
# Cross validation
cross_validation_acc_conf = {
"validation": {
"validation_method": "cross_checking_accurate",
"cross_checking_threshold": 1.0,
}
}
cross_validation_fast_conf = {
"validation": {
"validation_method": "cross_checking_fast",
}
}
disparity_denoiser_conf = {
"filter": {"filter_method": "disparity_denoiser"}
}
confidences = {}
if generate_performance_map_from_risk:
confidences.update(perf_ambiguity_conf)
confidences.update(perf_risk_conf)
if generate_performance_map_from_intervals:
confidences.update(perf_ambiguity_conf)
confidences.update(intervals_conf)
if generate_ambiguity:
confidences.update(perf_ambiguity_conf)
if confidences:
conf["pipeline"] = overload_pandora_conf_with_confidence(
conf["pipeline"], confidences
)
# update with cross validation
if "validation" not in conf["pipeline"]:
if use_cross_validation in (True, "fast"):
conf["pipeline"].update(cross_validation_fast_conf)
elif use_cross_validation == "accurate":
conf["pipeline"].update(cross_validation_acc_conf)
if (
denoise_disparity_map
and conf["pipeline"]["filter"]["filter_method"]
!= "disparity_denoiser"
):
conf["pipeline"].update(disparity_denoiser_conf)
if classification_3sgm:
conf["pipeline"]["optimization"]["optimization_method"] = "3sgm"
conf["pipeline"]["optimization"]["geometric_prior"] = {
"source": "classif",
"classes": classification_3sgm,
}
if "band" not in conf["pipeline"]["matching_cost"]:
conf["pipeline"]["matching_cost"]["band"] = used_band
if generate_performance_map_from_intervals:
# To ensure the consistency between the disparity map
# and the intervals, the median filter for intervals
# must be similar to the median filter. The filter is
# added at the end of the conf as it is applied during
# the disp_map state.
try:
filter_size = conf["pipeline"]["filter"]["filter_size"]
except KeyError:
filter_size = 3
conf_filter_interval = {
"filter.cars_3": {
"filter_method": "median_for_intervals",
"filter_size": filter_size,
"interval_indicator": "cars_3",
"regularization": True,
"ambiguity_indicator": "cars_1",
}
}
pipeline_dict = OrderedDict()
pipeline_dict.update(conf["pipeline"])
# Filter is placed after validation in config
# and should be placed before.
# However it does not have any incidence on operation
if uses_cars_pandora_conf:
pipeline_dict.update(conf_filter_interval)
conf["pipeline"] = pipeline_dict
if "filter" in conf["pipeline"]:
filter_conf = conf["pipeline"]["filter"]
if filter_conf["filter_method"] == "disparity_denoiser":
if "band" not in filter_conf:
conf["pipeline"]["filter"]["band"] = used_band
# Check conf
self.pandora_config = conf
[docs]
def get_conf(self):
"""
Get pandora configuration used
:return: pandora configuration
:rtype: dict
"""
return self.pandora_config
[docs]
def get_classif_bands(self):
"""
Get the classification bands used in the pandora configuration
:return: list of classification bands
"""
classif_bands = []
def search_classes_recursive(obj):
"""
Recursive search of keys containing 'classes' in the configuration
"""
if isinstance(obj, dict):
for key, value in obj.items():
if "classes" in key.lower():
if isinstance(value, list):
classif_bands.extend(value)
elif value is not None:
classif_bands.append(value)
# Continue recursive search
search_classes_recursive(value)
search_classes_recursive(self.pandora_config)
# Remove duplicates and return the list
return list(set(classif_bands))
@cars_profile(name="Find auto conf")
def find_auto_conf(
self, intersection_poly, land_cover_map, classif_to_config_mapping, epsg
):
"""
Find the configuration that suits the most on the
land cover map based on the roi
"""
package_path = os.path.dirname(__file__)
# construct the path to the land_cover_map
if os.path.dirname(land_cover_map) == "":
land_cover_map_path = os.path.join(package_path, land_cover_map)
else:
land_cover_map_path = land_cover_map
with rasterio.open(land_cover_map_path) as src:
# Project the polygon to the right epsg
if src.crs != epsg:
poly = polygon_projection(
intersection_poly, epsg, src.crs.to_epsg()
)
else:
poly = intersection_poly
# Use a buffer because the land_cover_map resolution is coarse
data_land_cover, _ = mask(
src, [mapping(poly)], crop=True, all_touched=True
)
# Find the most common class in the roi
data_squeeze = data_land_cover.squeeze()
valid_data = data_squeeze[data_squeeze != src.nodata]
most_common_class = None
if valid_data.size > 0:
classes, counts = np.unique(valid_data, return_counts=True)
max_index = np.argmax(counts)
most_common_class = classes[max_index]
# Construct the path to the classification to configuration mapping
if os.path.dirname(classif_to_config_mapping) == "":
conf_file_path = os.path.join(
package_path, classif_to_config_mapping
)
else:
conf_file_path = classif_to_config_mapping
# read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf_mapping = json.load(fstream)
# Find the configuration that corresponds to the most common class
corresponding_conf_name = conf_mapping.get(str(most_common_class), None)
# If no equivalence has been found, we use the default configuration
if corresponding_conf_name is None:
corresponding_conf_name = "census_sgm_default"
logging.info(
"The conf that has been chosen regarding the "
"world classification map is {}".format(corresponding_conf_name)
)
# We return the corresponding configuration
json_conf_name = os.path.join(
package_path, "config_" + corresponding_conf_name + ".json"
)
with open(json_conf_name, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
return conf
[docs]
def check_conf( # pylint: disable=too-many-positional-arguments
self,
user_cfg,
nodata_left,
nodata_right,
bands_left,
bands_right,
bands_classif_left=None,
bands_classif_right=None,
):
"""
Check configuration
:param user_cfg: configuration
:type user_cfg: dict
:return: pandora configuration
:rtype: dict
"""
# Import plugins before checking configuration
pandora.import_plugin()
# Check configuration and update the configuration with default values
# Instantiate pandora state machine
pandora_machine = PandoraMachine()
# check pipeline
metadata_left = overide_pandora_get_metadata(
bands_left, classif_bands=bands_classif_left
)
metadata_right = overide_pandora_get_metadata(
bands_right, classif_bands=bands_classif_right
)
metadata_left = metadata_left.assign_coords(band_im=bands_left)
metadata_right = metadata_right.assign_coords(band_im=bands_right)
user_cfg_pipeline = get_config_pipeline(user_cfg)
# check that if validation is cross_checking_*, both
# images have a classification band for 3SGM
validation_method = (
user_cfg_pipeline["pipeline"]
.get("validation", {})
.get("validation_method", None)
)
sgm_optimization_method = (
user_cfg_pipeline["pipeline"]
.get("optimization", {})
.get("optimization_method", None)
)
if validation_method is not None and validation_method.startswith(
"cross_checking"
):
if sgm_optimization_method == "3sgm":
has_classif_left = bands_classif_left is not None
has_classif_right = bands_classif_right is not None
if not (has_classif_left and has_classif_right):
raise ValueError(
"When using 3SGM and cross-validation at the same time,"
" both of the images should have a classification."
)
saved_schema = copy.deepcopy(
pandora.matching_cost.matching_cost.AbstractMatchingCost.schema
)
cfg_pipeline = check_pipeline_section(
user_cfg_pipeline, metadata_left, metadata_right, pandora_machine
)
# quick fix to remove when the problem is solved in pandora
pandora.matching_cost.matching_cost.AbstractMatchingCost.schema = (
saved_schema
)
# check a part of input section
user_cfg_input = get_config_input_custom_cars(
user_cfg, nodata_left, nodata_right
)
cfg_input = check_input_section_custom_cars(user_cfg_input)
# concatenate updated config
cfg = concat_conf([cfg_input, cfg_pipeline])
return cfg
input_configuration_schema_custom_cars = {
"nodata_left": Or(
int, lambda x: np.isnan(x) # pylint: disable=unnecessary-lambda
),
"nodata_right": Or(
int, lambda x: np.isnan(x) # pylint: disable=unnecessary-lambda
),
}
default_short_configuration_input_custom_cars = {
"input": {
"nodata_left": -9999,
"nodata_right": -9999,
}
}
[docs]
def overload_pandora_conf_with_confidence(conf, confidence_conf):
"""
Overload pandora pipeline configuration with given confidence to add
just before disparity computation.
:param conf: current pandora configuration
:type conf: OrderedDict
:param confidence_conf: confidence applications config
:type confidence_conf: OrderedDict
:return: updated pandora pipeline conf
:rtype: OrderedDict
"""
out_dict = OrderedDict()
out_dict.update(conf)
conf_keys = list(conf.keys())
confidence_conf_keys = list(confidence_conf.keys())
for key in confidence_conf_keys:
if key in conf_keys:
logging.info("{} pandora key already in configuration".format(key))
# update confidence
out_dict.update(confidence_conf)
# move confidence keys right before disparity computation
# get position of key "disparity"
if "disparity" not in conf_keys:
raise RuntimeError("disparity key not in pandora configuration")
disp_index = conf_keys.index("disparity")
# move to end every key from disparity
for ind in range(disp_index, len(conf_keys)):
out_dict.move_to_end(conf_keys[ind])
return out_dict