# !/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
CARS pandora loader file
"""
import copy
import json
import logging
import os
from collections import OrderedDict
from typing import Dict
import numpy as np
import pandora
from json_checker import Checker, Or
from pandora.check_configuration import (
check_pipeline_section,
concat_conf,
get_config_pipeline,
update_conf,
)
from pandora.img_tools import get_metadata
from pandora.state_machine import PandoraMachine
[docs]class PandoraLoader:
"""
PandoraLoader
"""
def __init__( # noqa: C901
self,
conf=None,
method_name=None,
generate_performance_map_from_risk=False,
generate_performance_map_from_intervals=False,
generate_ambiguity=False,
perf_eta_max_ambiguity=0.99,
perf_eta_max_risk=0.25,
perf_eta_step=0.04,
use_cross_validation=True,
denoise_disparity_map=False,
):
"""
Init function of PandoraLoader
If conf is profided, pandora will use it
If not, Pandora will use intern configuration :
census or mccnn, depending on method_name
:param conf: configuration of pandora to use
:type conf: dict
:param method_name: name of method to use
:param performance_map_conf: true if generate performance maps
:param use_cross_validation: true to add crossvalidation
:param denoise_disparity_map: true to add the disparity denoiser filter
"""
if method_name is None:
method_name = "census_sgm_default"
self.pandora_config = None
uses_cars_pandora_conf = False
if isinstance(conf, str):
# load file
with open(conf, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif conf is None:
uses_cars_pandora_conf = True
package_path = os.path.dirname(__file__)
if method_name == "mccnn_sgm":
# Use mccn_conf
conf_file_path = os.path.join(package_path, "config_mccnn.json")
# Read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif method_name == "census_sgm_urban":
# Use census sgm conf
conf_file_path = os.path.join(
package_path, "config_census_sgm_urban.json"
)
# read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif method_name == "census_sgm_shadow":
# Use census sgm conf
conf_file_path = os.path.join(
package_path, "config_census_sgm_shadow.json"
)
# read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif method_name == "census_sgm_mountain_and_vegetation":
# Use census sgm conf
conf_file_path = os.path.join(
package_path,
"config_census_sgm_mountain_and_vegetation.json",
)
# read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif method_name == "census_sgm_homogeneous":
# Use census sgm conf
conf_file_path = os.path.join(
package_path, "config_census_sgm_homogeneous.json"
)
# read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif method_name == "census_sgm_default":
# Use census sgm conf
conf_file_path = os.path.join(
package_path, "config_census_sgm_default.json"
)
# read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
elif method_name == "census_sgm_sparse":
# Use census sgm conf
conf_file_path = os.path.join(
package_path, "config_census_sgm_sparse.json"
)
# read conf
with open(conf_file_path, "r", encoding="utf8") as fstream:
conf = json.load(fstream)
else:
logging.error(
"No method named {} in pandora loader".format(method_name)
)
raise NameError(
"No method named {} in pandora loader".format(method_name)
)
perf_ambiguity_conf = {
"cost_volume_confidence.cars_1": {
"confidence_method": "ambiguity",
"eta_max": perf_eta_max_ambiguity,
"eta_step": perf_eta_step,
}
}
perf_risk_conf = {
"cost_volume_confidence.cars_2": {
"confidence_method": "risk",
"eta_max": perf_eta_max_risk,
"eta_step": perf_eta_step,
}
}
intervals_conf = {
"cost_volume_confidence.cars_3": {
"confidence_method": "interval_bounds",
}
}
# Cross validation
cross_validation_acc_conf = {
"validation": {
"validation_method": "cross_checking_accurate",
"cross_checking_threshold": 1.0,
}
}
cross_validation_fast_conf = {
"validation": {
"validation_method": "cross_checking_fast",
}
}
disparity_denoiser_conf = {
"filter": {"filter_method": "disparity_denoiser"}
}
confidences = {}
if generate_performance_map_from_risk:
confidences.update(perf_ambiguity_conf)
confidences.update(perf_risk_conf)
if generate_performance_map_from_intervals:
confidences.update(perf_ambiguity_conf)
confidences.update(intervals_conf)
if generate_ambiguity:
confidences.update(perf_ambiguity_conf)
if confidences:
conf["pipeline"] = overload_pandora_conf_with_confidence(
conf["pipeline"], confidences
)
# update with cross validation
if "validation" not in conf["pipeline"]:
if use_cross_validation in (True, "fast"):
conf["pipeline"].update(cross_validation_fast_conf)
elif use_cross_validation == "accurate":
conf["pipeline"].update(cross_validation_acc_conf)
if (
denoise_disparity_map
and conf["pipeline"]["filter"]["filter_method"]
!= "disparity_denoiser"
):
conf["pipeline"].update(disparity_denoiser_conf)
if generate_performance_map_from_intervals:
# To ensure the consistency between the disparity map
# and the intervals, the median filter for intervals
# must be similar to the median filter. The filter is
# added at the end of the conf as it is applied during
# the disp_map state.
try:
filter_size = conf["pipeline"]["filter"]["filter_size"]
except KeyError:
filter_size = 3
conf_filter_interval = {
"filter.cars_3": {
"filter_method": "median_for_intervals",
"filter_size": filter_size,
"interval_indicator": "cars_3",
"regularization": True,
"ambiguity_indicator": "cars_1",
}
}
pipeline_dict = OrderedDict()
pipeline_dict.update(conf["pipeline"])
# Filter is placed after validation in config
# and should be placed before.
# However it does not have any incidence on operation
if uses_cars_pandora_conf:
pipeline_dict.update(conf_filter_interval)
conf["pipeline"] = pipeline_dict
if "filter" in conf["pipeline"]:
filter_conf = conf["pipeline"]["filter"]
if filter_conf["filter_method"] == "disparity_denoiser":
if "band" in filter_conf:
band = filter_conf["band"]
if band is not None:
logging.warning(
"As multiband support is not "
"yet implemented in CARS, "
"the filter will be applied to "
"the panchromatic band."
)
conf["pipeline"]["filter"]["band"] = None
for key in list(conf.get("pipeline")):
if key.startswith("filter"):
if (
conf["pipeline"][key]["filter_method"]
== "median_for_intervals"
and "validation" in conf["pipeline"]
):
if (
conf["pipeline"]["validation"]["validation_method"]
== "cross_checking_fast"
):
conf["pipeline"]["validation"][
"validation_method"
] = "cross_checking_accurate"
logging.warning(
"You can not use median_for_intervals with "
"the fast cross checking validation for now. "
"It therefore has been overrided to accurate"
)
# Check conf
self.pandora_config = conf
[docs] def get_conf(self):
"""
Get pandora configuration used
:return: pandora configuration
:rtype: dict
"""
return self.pandora_config
[docs] def check_conf(
self,
user_cfg,
img_left,
img_right,
classif_left=None,
classif_right=None,
):
"""
Check configuration
:param user_cfg: configuration
:type user_cfg: dict
:return: pandora configuration
:rtype: dict
"""
# Import plugins before checking configuration
pandora.import_plugin()
# Check configuration and update the configuration with default values
# Instantiate pandora state machine
pandora_machine = PandoraMachine()
# check pipeline
metadata_left = get_metadata(img_left, classif=classif_left)
metadata_right = get_metadata(img_right, classif=classif_right)
user_cfg_pipeline = get_config_pipeline(user_cfg)
saved_schema = copy.deepcopy(
pandora.matching_cost.matching_cost.AbstractMatchingCost.schema
)
cfg_pipeline = check_pipeline_section(
user_cfg_pipeline, metadata_left, metadata_right, pandora_machine
)
# quick fix to remove when the problem is solved in pandora
pandora.matching_cost.matching_cost.AbstractMatchingCost.schema = (
saved_schema
)
# check a part of input section
user_cfg_input = get_config_input_custom_cars(user_cfg)
cfg_input = check_input_section_custom_cars(user_cfg_input)
# concatenate updated config
cfg = concat_conf([cfg_input, cfg_pipeline])
return cfg
input_configuration_schema_custom_cars = {
"nodata_left": Or(
int, lambda x: np.isnan(x) # pylint: disable=unnecessary-lambda
),
"nodata_right": Or(
int, lambda x: np.isnan(x) # pylint: disable=unnecessary-lambda
),
}
default_short_configuration_input_custom_cars = {
"input": {
"nodata_left": -9999,
"nodata_right": -9999,
}
}
[docs]def overload_pandora_conf_with_confidence(conf, confidence_conf):
"""
Overload pandora pipeline configuration with given confidence to add
just before disparity computation.
:param conf: current pandora configuration
:type conf: OrderedDict
:param confidence_conf: confidence applications config
:type confidence_conf: OrderedDict
:return: updated pandora pipeline conf
:rtype: OrderedDict
"""
out_dict = OrderedDict()
out_dict.update(conf)
conf_keys = list(conf.keys())
confidence_conf_keys = list(confidence_conf.keys())
for key in confidence_conf_keys:
if key in conf_keys:
logging.error("{} pandora key already in configuration".format(key))
# update confidence
out_dict.update(confidence_conf)
# move confidence keys right before disparity computation
# get position of key "disparity"
if "disparity" not in conf_keys:
raise RuntimeError("disparity key not in pandora configuration")
disp_index = conf_keys.index("disparity")
# move to end every key from disparity
for ind in range(disp_index, len(conf_keys)):
out_dict.move_to_end(conf_keys[ind])
return out_dict