#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2026 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the Pandora dense matching method implementation
"""
import collections
# Standard imports
import copy
import logging
# Third party imports
import numpy as np
import pandora
import xarray as xr
from json_checker import And, Checker, Or
from pandora.check_configuration import check_pipeline_section
from pandora.img_tools import add_global_disparity
from pandora.state_machine import PandoraMachine
from scipy.ndimage import maximum_filter, minimum_filter
# CARS imports
from cars.applications.dense_matching import dense_matching_algo as dm_algo
from cars.applications.dense_matching import (
dense_matching_wrappers as dm_wrappers,
)
from cars.applications.dense_matching.loaders.pandora_loader import (
PandoraLoader,
)
from cars.applications.dense_matching.methods import (
abstract_dense_matching_method as adm,
)
from cars.core import constants as cst
from cars.core import inputs, tiling
from cars.data_structures import cars_dataset
from cars.orchestrator.cluster.log_wrapper import cars_profile
AbstractDenseMatchingMethod = adm.AbstractDenseMatchingMethod
# pylint:disable=C0302,R0902,R0917
[docs]
def is_valid_perf_method(x):
"""
Helper function
Returns whether the performance map method x is valid
:param x: performance map method
:type x: str
:return: true or false
:rtype: bool
"""
return all(y in ["risk", "intervals"] for y in x)
[docs]
def is_valid_classification_3sgm_value(x):
"""
Helper function
Returns whether the classification value x is valid
:param x: classification value
:type x: any
:return: true or false
:rtype: bool
"""
return isinstance(x, int) and not isinstance(x, bool)
[docs]
class PandoraMethod(
AbstractDenseMatchingMethod,
short_name=[
"pandora_custom",
"pandora_mccnn_sgm",
"pandora_census_sgm_urban",
"pandora_census_sgm_shadow",
"pandora_census_sgm_mountain_and_vegetation",
"pandora_census_sgm_homogeneous",
"pandora_census_sgm_default",
"pandora_census_sgm_sparse",
"pandora_auto",
],
):
"""
The implementation of Pandora as a Dense matching method
"""
def __init__(self, conf):
super().__init__(conf=conf)
# non blocking check conf
self.schema = {
"method": str,
"generate_ambiguity": bool,
"performance_map_method": And(
list,
is_valid_perf_method,
),
"perf_eta_max_ambiguity": float,
"perf_eta_max_risk": float,
"perf_eta_step": float,
"perf_ambiguity_threshold": float,
"classification_fusion_margin": int,
"use_cross_validation": Or(bool, str),
"denoise_disparity_map": bool,
"used_band": str,
"loader_conf": Or(dict, collections.OrderedDict, str, None),
"loader": str,
"confidence_filtering": dict,
"threshold_disp_range_to_borders": bool,
"filter_incomplete_disparity_range": bool,
"edges_3sgm": bool,
"classification_3sgm": Or(
[is_valid_classification_3sgm_value],
None,
),
}
# those will be defined in check_conf
self.loader = None
self.margins = None
self.corr_config = None
self.used_config = self.check_conf(conf)
self.method = self.used_config["method"]
self.generate_ambiguity = self.used_config["generate_ambiguity"]
self.performance_map_method = self.used_config["performance_map_method"]
self.perf_eta_max_ambiguity = self.used_config["perf_eta_max_ambiguity"]
self.perf_eta_max_risk = self.used_config["perf_eta_max_risk"]
self.perf_eta_step = self.used_config["perf_eta_step"]
self.perf_ambiguity_threshold = self.used_config[
"perf_ambiguity_threshold"
]
self.classification_fusion_margin = self.used_config[
"classification_fusion_margin"
]
self.use_cross_validation = self.used_config["use_cross_validation"]
self.denoise_disparity_map = self.used_config["denoise_disparity_map"]
self.used_band = self.used_config["used_band"]
self.loader_conf = self.used_config["loader_conf"]
self.confidence_filtering = self.used_config["confidence_filtering"]
self.threshold_disp_range_to_borders = self.used_config[
"threshold_disp_range_to_borders"
]
self.filter_incomplete_disparity_range = self.used_config[
"filter_incomplete_disparity_range"
]
self.edges_3sgm = self.used_config["edges_3sgm"]
self.classification_3sgm = self.used_config["classification_3sgm"]
[docs]
def check_conf(self, conf):
"""
Merge user configuration with default values and validate schema.
Extra keys in conf do not raise errors.
"""
# Initialize conf if None
if conf is None:
conf = {}
save_intermediate_data = conf.get("save_intermediate_data", False)
default_perf_map_method = "risk" if save_intermediate_data else None
# Default configuration
default_conf = {
"save_intermediate_data": False,
"generate_ambiguity": save_intermediate_data,
"performance_map_method": default_perf_map_method,
"perf_eta_max_ambiguity": 0.99,
"perf_eta_max_risk": 0.25,
"perf_eta_step": 0.04,
"perf_ambiguity_threshold": 0.6,
"classification_fusion_margin": 5,
"use_cross_validation": True,
"denoise_disparity_map": False,
"used_band": "b0",
"loader_conf": None,
"loader": "pandora",
"confidence_filtering": {},
"threshold_disp_range_to_borders": False,
"filter_incomplete_disparity_range": True,
"edges_3sgm": True,
"classification_3sgm": None,
}
# Merge defaults with user conf
used_conf = default_conf.copy()
used_conf.update(conf)
# --- Update parameters
if not used_conf["classification_3sgm"]:
used_conf["classification_3sgm"] = None
if used_conf["use_cross_validation"] is True:
used_conf["use_cross_validation"] = "fast"
# Get/update perf map method as list
perf_map_method = used_conf["performance_map_method"]
if isinstance(perf_map_method, str):
used_conf["performance_map_method"] = [perf_map_method]
elif perf_map_method is None:
used_conf["performance_map_method"] = []
perf_map_method = used_conf["performance_map_method"]
# Loader initialization
# this overrides conf[loader], and sets:
# self.loader
# self.margins
# self.corr_config
self.check_conf_pandora_loader(used_conf, perf_map_method)
# Validate only keys defined in schema
conf_to_check = {k: used_conf[k] for k in self.schema if k in used_conf}
checker = Checker(self.schema)
checker.validate(conf_to_check)
# additional checks: confidence_filtering
conf_to_check["confidence_filtering"] = (
self.check_conf_confidence_filtering(
conf_to_check["confidence_filtering"]
)
)
# return the conf without unvalidated keys
return conf_to_check
[docs]
def check_conf_pandora_loader(self, conf, perf_map_method):
"""
Check the pandora loader conf
"""
loader = conf.get("loader")
loader_conf = conf.get("loader_conf")
default_method = "pandora_custom" if loader_conf else "pandora_auto"
method = conf.get("method", default_method)
if method == "pandora_auto" and loader_conf:
raise RuntimeError(
"Cannot use 'pandora_auto' method with a custom loader "
"configuration"
)
logger = logging.getLogger("transitions.core")
logger.addFilter(
lambda record: "to model due to model override policy"
not in record.getMessage()
)
# classification_3sgm is values, we want their band names
# (str of the values)
classification_3sgm = conf["classification_3sgm"]
if classification_3sgm is not None:
classification_3sgm = [str(value) for value in classification_3sgm]
pandora_loader = PandoraLoader(
conf=loader_conf,
method_name=method,
generate_performance_map_from_risk="risk" in perf_map_method,
generate_performance_map_from_intervals="intervals"
in perf_map_method,
generate_ambiguity=conf["generate_ambiguity"],
perf_eta_max_ambiguity=conf["perf_eta_max_ambiguity"],
perf_eta_max_risk=conf["perf_eta_max_risk"],
perf_eta_step=conf["perf_eta_step"],
use_cross_validation=conf["use_cross_validation"],
denoise_disparity_map=conf["denoise_disparity_map"],
used_band=conf["used_band"],
classification_3sgm=classification_3sgm,
)
self.loader = pandora_loader
self.corr_config = collections.OrderedDict(pandora_loader.get_conf())
conf["loader"] = loader
# create the dataset
classif_bands = pandora_loader.get_classif_bands()
fake_dataset = xr.Dataset(
data_vars={
"image": (["row", "col"], np.zeros((10, 10))),
"classif": (
["row", "col", "band_classif"],
np.zeros((10, 10, len(classif_bands)), dtype=np.int32),
),
},
coords={
"band_im": [conf["used_band"]],
"band_classif": classif_bands,
"row": np.arange(10),
"col": np.arange(10),
},
attrs={"disparity_source": [-1, 1]},
)
# Import plugins before checking configuration
pandora.import_plugin()
pandora_machine = PandoraMachine()
corr_config_pipeline = {"pipeline": dict(self.corr_config["pipeline"])}
saved_schema = copy.deepcopy(
pandora.matching_cost.matching_cost.AbstractMatchingCost.schema
)
_ = check_pipeline_section(
corr_config_pipeline, fake_dataset, fake_dataset, pandora_machine
)
# quick fix to remove when the problem is solved in pandora
pandora.matching_cost.matching_cost.AbstractMatchingCost.schema = (
saved_schema
)
self.margins = pandora_machine.margins.global_margins
[docs]
def check_conf_confidence_filtering(self, overloaded_conf):
"""
Check the confidence filtering conf
"""
default_conf = {
"activated": True,
"bounds_ratio_threshold": 0.2,
"risk_ratio_threshold": 0.75,
"bounds_range_threshold": 3,
"risk_range_threshold": 9,
"nan_threshold": 0.2,
"win_nanratio": 20,
}
confidence_filtering_schema = {
"activated": bool,
"bounds_ratio_threshold": float,
"risk_ratio_threshold": float,
"bounds_range_threshold": int,
"risk_range_threshold": int,
"nan_threshold": float,
"win_nanratio": int,
}
used_conf = default_conf.copy()
used_conf.update(overloaded_conf)
checker_confidence_filtering_schema = Checker(
confidence_filtering_schema
)
checker_confidence_filtering_schema.validate(used_conf)
return used_conf
[docs]
def get_method(self):
"""
Returns the method used
"""
return self.method
@cars_profile(name="Get margin fun")
def get_margins_fun(
self,
grid_left,
disp_range_grid,
min_elevation_offset,
max_elevation_offset,
):
"""
Get Margins function that generates margins needed by
matching method, to use during resampling.
:param grid_left: left epipolar grid
:type grid_left: dict
:param disp_range_grid: minimum and maximum disparity grid
:type disp_range_grid: dict
:param min_elevation_offset: minimum elevation offset
:type min_elevation_offset: float
:param max_elevation_offset: maximum elevation offset
:type max_elevation_offset: float
:return: function that generates margin for given roi
:rtype: callable
"""
disp_min_grid_arr, _ = inputs.rasterio_read_as_array(
disp_range_grid["grid_min_path"]
)
disp_max_grid_arr, _ = inputs.rasterio_read_as_array(
disp_range_grid["grid_max_path"]
)
step_row = disp_range_grid["step_row"]
step_col = disp_range_grid["step_col"]
row_range = disp_range_grid["row_range"]
col_range = disp_range_grid["col_range"]
# get disp_to_alt_ratio
disp_to_alt_ratio = grid_left["disp_to_alt_ratio"]
# Check if we need to override disp_min
if min_elevation_offset is not None:
user_disp_min = min_elevation_offset / disp_to_alt_ratio
if np.any(disp_min_grid_arr < user_disp_min):
logging.warning(
(
"Overridden disparity minimum "
"= {:.3f} pix. (= {:.3f} m.) "
"is greater than disparity minimum estimated "
"in prepare step "
"for current pair"
).format(
user_disp_min,
min_elevation_offset,
)
)
disp_min_grid_arr[:, :] = user_disp_min
# Check if we need to override disp_max
if max_elevation_offset is not None:
user_disp_max = max_elevation_offset / disp_to_alt_ratio
if np.any(disp_max_grid_arr > user_disp_max):
logging.warning(
(
"Overridden disparity maximum "
"= {:.3f} pix. (or {:.3f} m.) "
"is lower than disparity maximum estimated "
"in prepare step "
"for current pair"
).format(
user_disp_max,
max_elevation_offset,
)
)
disp_max_grid_arr[:, :] = user_disp_max
# Compute global range of logging
disp_min_global = np.min(disp_min_grid_arr)
disp_max_global = np.max(disp_max_grid_arr)
logging.info(
"Global Disparity range for current pair: "
"[{:.3f} pix., {:.3f} pix.] "
"(or [{:.3f} m., {:.3f} m.])".format(
disp_min_global,
disp_max_global,
disp_min_global * disp_to_alt_ratio,
disp_max_global * disp_to_alt_ratio,
)
)
def margins_wrapper(row_min, row_max, col_min, col_max):
"""
Generates margins Dataset used in resampling
:param row_min: row min
:param row_max: row max
:param col_min: col min
:param col_max: col max
:return: margins
:rtype: xr.Dataset
"""
disp_min, disp_max = tiling.compute_local_disp_range_from_grids(
row_min,
row_max,
col_min,
col_max,
disp_min_grid_arr,
disp_max_grid_arr,
step_row,
step_col,
row_range,
col_range,
)
# Compute margins for the correlator
margins = dm_wrappers.get_margins(self.margins, disp_min, disp_max)
return margins
return margins_wrapper
@cars_profile(name="Optimal size estimation")
def get_optimal_tile_size(
self,
disp_range_grid,
max_ram_per_worker,
min_epi_tile_size,
max_epi_tile_size,
local_disp_grid_step,
epipolar_tile_margin_in_percent,
):
"""
Get the optimal tile size to use during dense matching.
:param disp_range_grid: minimum and maximum disparity grid
:type disp_range_grid: dict
:param max_ram_per_worker: maximum RAM per worker
:type max_ram_per_worker: int
:param min_epi_tile_size: minimum epipolar tile size
:type min_epi_tile_size: int
:param max_epi_tile_size: maximum epipolar tile size
:type max_epi_tile_size: int
:param local_disp_grid_step: disparity grid step
:type local_disp_grid_step: int
:param epipolar_tile_margin_in_percent: margin percentage
:type epipolar_tile_margin_in_percent: float
:return: optimal tile size and local function
:rtype: tuple
"""
disp_min_grids, _ = inputs.rasterio_read_as_array(
disp_range_grid["grid_min_path"]
)
disp_max_grids, _ = inputs.rasterio_read_as_array(
disp_range_grid["grid_max_path"]
)
# use max tile size as overlap for min and max:
# max Point to point diff is less than diff of tile
# use filter of size max_epi_tile_size
overlap = 3 * int(max_epi_tile_size / local_disp_grid_step)
disp_min_grids = minimum_filter(
disp_min_grids, size=[overlap, overlap], mode="nearest"
)
disp_max_grids = maximum_filter(
disp_max_grids, size=[overlap, overlap], mode="nearest"
)
# Worst cases scenario:
# 1: [global max - max diff, global max]
# 2: [global min, global min max diff]
max_diff = np.round(np.nanmax(disp_max_grids - disp_min_grids)) + 1
global_min = np.floor(np.nanmin(disp_min_grids))
global_max = np.ceil(np.nanmax(disp_max_grids))
# Get tiling param
opt_epipolar_tile_size_1 = (
dm_wrappers.optimal_tile_size_pandora_plugin_libsgm(
global_min,
global_min + max_diff,
min_epi_tile_size,
max_epi_tile_size,
max_ram_per_worker,
margin=epipolar_tile_margin_in_percent,
)
)
opt_epipolar_tile_size_2 = (
dm_wrappers.optimal_tile_size_pandora_plugin_libsgm(
global_max - max_diff,
global_max,
min_epi_tile_size,
max_epi_tile_size,
max_ram_per_worker,
margin=epipolar_tile_margin_in_percent,
)
)
# return worst case
opt_epipolar_tile_size = min(
opt_epipolar_tile_size_1, opt_epipolar_tile_size_2
)
# Define function to compute local optimal size for each tile
def local_tile_optimal_size_fun(local_disp_min, local_disp_max):
"""
Compute optimal tile size for tile
:return: local tile size, global optimal tile sizes
"""
local_opt_tile_size = (
dm_wrappers.optimal_tile_size_pandora_plugin_libsgm(
local_disp_min,
local_disp_max,
0,
20000, # arbitrary
max_ram_per_worker,
margin=epipolar_tile_margin_in_percent,
)
)
# Get max range to use with current optimal size
max_range = dm_wrappers.get_max_disp_from_opt_tile_size(
opt_epipolar_tile_size,
max_ram_per_worker,
margin=epipolar_tile_margin_in_percent,
used_disparity_range=(local_disp_max - local_disp_min),
)
return local_opt_tile_size, opt_epipolar_tile_size, max_range
return opt_epipolar_tile_size, local_tile_optimal_size_fun
[docs]
def run(
self,
left_image_object: xr.Dataset,
right_image_object: xr.Dataset,
disp_range_grid,
saving_info=None,
compute_disparity_masks=False,
crop_with_range=None,
left_overlaps=None,
margins_to_keep=0,
texture_bands=None,
classif_bands_to_mask=None,
):
"""
Run dense matching on a pair of epipolar images.
Compute the disparity map between left and right images using the
configured dense matching method.
:param left_image_object: left epipolar image as xarray Dataset with:
- data with keys: "im", optionally "msk", "texture", "classif"
- attrs with keys: "margins" with "disp_min" and "disp_max",
"transform", "crs", "valid_pixels", "no_data_mask",
"no_data_img"
:type left_image_object: xr.Dataset
:param right_image_object: right epipolar image as xarray Dataset with:
- data with keys: "im", optionally "msk", "texture", "classif"
- attrs with keys: "margins" with "disp_min" and "disp_max",
"transform", "crs", "valid_pixels", "no_data_mask",
"no_data_img"
:type right_image_object: xr.Dataset
:param disp_range_grid: minimum and maximum disparity grid
:type disp_range_grid: dict
:param saving_info: information for saving outputs
:type saving_info: dict
:param compute_disparity_masks: activate computation of disparity masks
:type compute_disparity_masks: bool
:param crop_with_range: crop disparity map using provided range
:type crop_with_range: int or None
:param left_overlaps: overlaps associated to left image tiles
:type left_overlaps: dict or None
:param margins_to_keep: margins to keep after dense matching
:type margins_to_keep: int
:param texture_bands: indices of bands from left image used for texture
:type texture_bands: list
:param classif_bands_to_mask: bands from classification to mask
:type classif_bands_to_mask: list of str or int
:return: disparity map.
The CarsDataset contains:
- N x M delayed tiles.
Each tile is an xarray Dataset containing:
- data with keys: "disp", "disp_msk"
- attrs with keys: "profile", "window", "overlaps"
- attributes containing:
- "largest_epipolar_region"
- "opt_epipolar_tile_size"
- "disp_min_tiling"
- "disp_max_tiling"
:rtype: CarsDataset
"""
# ensure classification_3sgm is used over edges_3sgm if both are true
if (
self.edges_3sgm
and "edges_mask" in left_image_object
and not self.classification_3sgm
):
self.corr_config["pipeline"]["optimization"][
"optimization_method"
] = "3sgm"
self.corr_config["pipeline"]["optimization"]["geometric_prior"] = {
"source": "edges"
}
# convert to 2D array for sgm
left_image_object["edges"] = left_image_object[
"edges_mask"
].squeeze()
# transform disp_range_grid back to dict
disp_range_grid = disp_range_grid.data
# Generate disparity grids
(
disp_min_grid,
disp_max_grid,
) = dm_algo.compute_disparity_grid(
disp_range_grid,
left_image_object,
right_image_object,
self.used_band,
self.threshold_disp_range_to_borders,
)
global_disp_min = disp_range_grid["global_min"]
global_disp_max = disp_range_grid["global_max"]
# add global disparity in case of ambiguity normalization
left_image_object = add_global_disparity(
left_image_object, global_disp_min, global_disp_max
)
# Crop interval if needed
mask_crop = np.zeros(disp_min_grid.shape, dtype=int)
is_cropped = False
if crop_with_range is not None:
current_min = np.min(disp_min_grid)
current_max = np.max(disp_max_grid)
if (current_max - current_min) > crop_with_range:
is_cropped = True
logging.warning("disparity range for current tile is cropped")
# crop
new_min = (
current_min * crop_with_range / (current_max - current_min)
)
new_max = (
current_max * crop_with_range / (current_max - current_min)
)
mask_crop = np.logical_or(
disp_min_grid < new_min, disp_max_grid > new_max
)
mask_crop = mask_crop.astype(bool)
disp_min_grid[mask_crop] = new_min
disp_max_grid[mask_crop] = new_max
# Compute disparity
# TODO : remove overwriting of EPI_MSK
disp_dataset = dm_algo.compute_disparity(
left_image_object,
right_image_object,
self.corr_config,
self.used_band,
disp_min_grid=disp_min_grid,
disp_max_grid=disp_max_grid,
compute_disparity_masks=compute_disparity_masks,
cropped_range=mask_crop,
margins_to_keep=margins_to_keep,
classification_fusion_margin=self.classification_fusion_margin,
texture_bands=texture_bands,
filter_incomplete_disparity_range=(
self.filter_incomplete_disparity_range
),
classif_bands_to_mask=classif_bands_to_mask,
)
mask = disp_dataset["disp_msk"].values
disp_map = disp_dataset["disp"].values
disp_map[mask == 0] = np.nan
# Filtering by using the confidence
requested_confidence = [
"confidence_from_risk_min.cars_2",
"confidence_from_risk_max.cars_2",
"confidence_from_interval_bounds_inf.cars_3",
"confidence_from_interval_bounds_sup.cars_3",
]
if (
all(key in disp_dataset for key in requested_confidence)
and self.confidence_filtering["activated"] is True
):
dm_wrappers.confidence_filtering(
disp_dataset,
requested_confidence,
self.confidence_filtering,
)
# Fill with attributes
cars_dataset.fill_dataset(
disp_dataset,
saving_info=saving_info,
window=cars_dataset.get_window_dataset(left_image_object),
profile=cars_dataset.get_profile_rasterio(left_image_object),
attributes={cst.CROPPED_DISPARITY_RANGE: is_cropped},
overlaps=left_overlaps,
)
return disp_dataset