Source code for cars.applications.dense_matching.dense_matching_wrappers

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module is responsible for the dense matching algorithms:
- thus it creates a disparity map from a pair of images
"""
# pylint: disable=C0302

# Standard imports
import logging
import math
import warnings
from typing import Dict

import numpy as np
import xarray as xr

# Third party imports
from pandora import constants as p_cst
from scipy.ndimage import generic_filter

from cars.applications.dense_match_filling import fill_disp_wrappers

# CARS imports
from cars.applications.dense_matching import (
    dense_matching_constants as dense_match_cst,
)
from cars.conf import mask_cst as msk_cst
from cars.core import constants as cst
from cars.core import constants_disparity as cst_disp

from .cpp import dense_matching_cpp


[docs] def get_margins(margin, disp_min, disp_max): """ Get margins for the dense matching steps :param margin: margins object :type margin: Margins :param disp_min: Minimum disparity :type disp_min: int :param disp_max: Maximum disparity :type disp_max: int :return: margins of the matching algorithm used """ corner = ["left", "up", "right", "down"] col = np.arange(len(corner)) left_margins = [ margin.left + disp_max, margin.up, margin.right - disp_min, margin.down, ] right_margins = [ margin.left - disp_min, margin.up, margin.right + disp_max, margin.down, ] same_margins = [ max(left, right) for left, right in zip(left_margins, right_margins) # noqa: B905 ] margins = xr.Dataset( { "left_margin": ( ["col"], same_margins, ) }, coords={"col": col}, ) margins["right_margin"] = xr.DataArray(same_margins, dims=["col"]) margins.attrs["disp_min"] = disp_min margins.attrs["disp_max"] = disp_max return margins
[docs] def get_masks_from_pandora( disp: xr.Dataset, compute_disparity_masks: bool, filter_incomplete_disparity_range: bool, ) -> Dict[str, np.ndarray]: """ Get masks dictionary from the disparity map in output of pandora. :param disp: disparity map (pandora output) :param compute_disparity_masks: compute_disparity_masks activation status :return: masks dictionary """ masks = {} # Retrieve validity mask from pandora # Invalid pixels in validity mask are: # * Bit 0: Edge of the reference image or nodata in reference image # * Bit 1: Disparity interval to explore is missing or nodata in the # secondary image # * Bit 6: Pixel is masked on the mask of the reference image # * Bit 7: Disparity to explore is masked on the mask of the secondary # image # * Bit 8: Pixel located in an occlusion region # * Bit 9: Fake match validity_mask_cropped = disp.validity_mask.values # Mask initialization to false (all is invalid) masks[cst_disp.VALID] = np.full(validity_mask_cropped.shape, False) invalid_value = p_cst.PANDORA_MSK_PIXEL_INVALID if filter_incomplete_disparity_range: invalid_value |= ( p_cst.PANDORA_MSK_PIXEL_INCOMPLETE_VARIABLE_DISPARITY_RANGE ) # Identify valid points masks[cst_disp.VALID][ np.where((validity_mask_cropped & invalid_value) == 0) ] = True # With compute_disparity_masks, produce one mask for each invalid flag in if compute_disparity_masks: msk_table = dense_match_cst.MASK_HASH_TABLE for key, val in msk_table.items(): masks[key] = np.full(validity_mask_cropped.shape, False) masks[key][np.where((validity_mask_cropped & val) == 0)] = True # Build final mask with 255 for valid points and 0 for invalid points # The mask is used by rasterize method (non zero are valid points) for key, mask in masks.items(): final_msk = np.ndarray(mask.shape, dtype=np.int16) final_msk[mask] = 255 final_msk[np.equal(mask, False)] = 0 masks[key] = final_msk return masks
[docs] def add_disparity_grids( output_dataset: xr.Dataset, disp_min_grid: np.ndarray = None, disp_max_grid: np.ndarray = None, ): """ Add disparity min and max grids to dataset :param output_dataset: output dataset :param disp_min_grid: dense disparity map grid min :param disp_max_grid: dense disparity map grid max """ if disp_min_grid is not None: output_dataset[cst_disp.EPI_DISP_MIN_GRID] = xr.DataArray( disp_min_grid, dims=[cst.ROW, cst.COL], ) # Add color mask if disp_max_grid is not None: output_dataset[cst_disp.EPI_DISP_MAX_GRID] = xr.DataArray( disp_max_grid, dims=[cst.ROW, cst.COL], )
[docs] def add_texture( output_dataset: xr.Dataset, texture: np.ndarray = None, color_type=None, band_im: list = None, texture_bands: list = None, ): """ Add image and image mask to dataset :param output_dataset: output dataset :param image: image array :param image_type: data type of pixels :param band_im: list of band names """ if texture_bands: output_dataset.coords[cst.BAND_IM] = band_im[texture_bands] output_dataset[cst.EPI_TEXTURE] = xr.DataArray( texture[texture_bands], dims=[cst.BAND_IM, cst.ROW, cst.COL], ) if color_type is not None: # Add input image type output_dataset[cst.EPI_TEXTURE].attrs["color_type"] = color_type
[docs] def add_edges( output_dataset: xr.Dataset, ref_dataset: xr.Dataset, ): """ Add image and image mask to dataset :param output_dataset: output dataset :param image: image array :param image_type: data type of pixels :param band_im: list of band names """ if cst.EPI_EDGES_NORMALS in ref_dataset: output_dataset[cst.EPI_EDGES_NORMALS] = ref_dataset[ cst.EPI_EDGES_NORMALS ].copy() if cst.EPI_EDGES_MASK in ref_dataset: output_dataset[cst.EPI_EDGES_MASK] = ref_dataset[ cst.EPI_EDGES_MASK ].copy() if cst.EPI_EDGES_DEPTH_MAP in ref_dataset: output_dataset[cst.EPI_EDGES_DEPTH_MAP] = ref_dataset[ cst.EPI_EDGES_DEPTH_MAP ].copy() if cst.EPI_EDGES_TILE_ID in ref_dataset: output_dataset[cst.EPI_EDGES_TILE_ID] = ref_dataset[ cst.EPI_EDGES_TILE_ID ].copy()
[docs] def add_classification( output_dataset: xr.Dataset, classif: np.ndarray = None, band_classif: list = None, ): """ Add classification to dataset :param output_dataset: output dataset :param classif: classif array :param band_im: list of band names """ if classif is not None: if ( band_classif is not None and cst.BAND_CLASSIF not in output_dataset.dims ): output_dataset.coords[cst.BAND_CLASSIF] = band_classif output_dataset[cst.EPI_CLASSIFICATION] = xr.DataArray( classif, dims=[cst.BAND_CLASSIF, cst.ROW, cst.COL], )
[docs] def compute_cropped_roi( current_margins, margins_to_keep, tile_roi, nb_rows, nb_cols ): """ Compute cropped roi, with associated margins and :param current_margins: current dataset margins :type current_margins: list[int] :param margins_to_keep: margin to keep :type margins_to_keep: int :param nb_rows: number of current rows :type margins_to_keep: int :param tile_roi: roi without margin of tile :type tile_roi: list :param nb_cols: number of current cols :type nb_cols: int :return: (borders to use as roi, new dataset roi with margin, margin associated to roi) :rtype: tuple(list, list, list) """ def cast_int_list(current_list): """ Apply int cast to list """ new_list = [] for obj in current_list: new_list.append(int(obj)) return new_list new_margin_neg = list( np.clip(np.array(current_margins[0:2]), -margins_to_keep, 0) ) new_margin_pos = list( np.clip(np.array(current_margins[2:4]), 0, margins_to_keep) ) new_margin = list(np.array(new_margin_neg + new_margin_pos).astype(int)) new_roi = list(np.array(tile_roi) + np.array(new_margin)) ref_roi = [ int(-current_margins[0] + new_margin[0]), int(-current_margins[1] + new_margin[1]), int(nb_cols - current_margins[2] + new_margin[2]), int(nb_rows - current_margins[3] + new_margin[3]), ] return ( cast_int_list(ref_roi), cast_int_list(new_roi), cast_int_list(new_margin), )
# pylint: disable=too-many-positional-arguments
[docs] def create_disp_dataset( # noqa: C901 disp: xr.Dataset, ref_dataset: xr.Dataset, secondary_dataset: xr.Dataset, compute_disparity_masks: bool = False, disp_min_grid=None, disp_max_grid=None, cropped_range=None, margins_to_keep=0, classification_fusion_margin=-1, texture_bands=None, filter_incomplete_disparity_range=True, ) -> xr.Dataset: """ Create the disparity dataset. :param disp: disparity map (result of pandora) :param ref_dataset: reference dataset for the considered disparity map :param secondary_dataset: secondary dataset for the considered disparity map :param compute_disparity_masks: compute_disparity_masks activation status :param disp_min_grid: disparity min grid :param disp_max_grid: disparity max grid :param cropped_range: true if disparity range was cropped :type cropped_range: numpy array :param margins_to_keep: margin to keep after dense matching :type margins_to_keep: int :return: disparity dataset as used in cars """ # Crop disparity to ROI ref_roi, new_roi, new_margin = compute_cropped_roi( ref_dataset.attrs[cst.EPI_MARGINS], margins_to_keep, ref_dataset.attrs[cst.ROI], ref_dataset.sizes[cst.ROW], ref_dataset.sizes[cst.COL], ) # Crop datasets disp = disp.isel( row=slice(ref_roi[1], ref_roi[3]), col=slice(ref_roi[0], ref_roi[2]) ) ref_dataset = ref_dataset.isel( row=slice(ref_roi[1], ref_roi[3]), col=slice(ref_roi[0], ref_roi[2]) ) secondary_dataset = secondary_dataset.isel( row=slice(ref_roi[1], ref_roi[3]), col=slice(ref_roi[0], ref_roi[2]) ) # Crop disparity min max grids if disp_min_grid is not None: disp_min_grid = disp_min_grid[ ref_roi[1] : ref_roi[3], ref_roi[0] : ref_roi[2] ] if disp_max_grid is not None: disp_max_grid = disp_max_grid[ ref_roi[1] : ref_roi[3], ref_roi[0] : ref_roi[2] ] if cropped_range is not None: cropped_range = cropped_range[ ref_roi[1] : ref_roi[3], ref_roi[0] : ref_roi[2] ] # Retrieve disparity values disp_map = disp.disparity_map.values # Transform image to texture epi_image = ref_dataset[cst.EPI_IMAGE].values band_im = ref_dataset.coords[cst.BAND_IM] image_type = ref_dataset.attrs.get("image_type", "float32") if isinstance(image_type, list): if texture_bands is not None: image_type = image_type[texture_bands[0]] else: image_type = image_type[0] # Cast image if np.issubdtype(image_type, np.floating): min_value_clr = np.finfo(image_type).min max_value_clr = np.finfo(image_type).max else: min_value_clr = np.iinfo(image_type).min max_value_clr = np.iinfo(image_type).max epi_image = np.clip(epi_image, min_value_clr, max_value_clr).astype( image_type ) # Retrieve original mask of panchromatic image epi_msk = None if cst.EPI_MSK in ref_dataset: epi_msk = ref_dataset[cst.EPI_MSK].values epi_msk_right = None if cst.EPI_MSK in secondary_dataset: epi_msk_right = secondary_dataset[cst.EPI_MSK].values # Retrieve masks from pandora pandora_masks = get_masks_from_pandora( disp, compute_disparity_masks, filter_incomplete_disparity_range ) pandora_masks[cst_disp.VALID][np.isnan(disp_map)] = 0 # retrieve classif left_classif = None left_band_classif = None if cst.EPI_CLASSIFICATION in ref_dataset: left_classif = ref_dataset[cst.EPI_CLASSIFICATION].values left_band_classif = ref_dataset.coords[cst.BAND_CLASSIF].values # mask left classif outside right sensor if epi_msk_right is not None: left_classif = mask_left_classif_from_right_mask( # here left_classif, epi_msk_right == msk_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION, np.floor(disp_min_grid).astype(np.int16), np.ceil(disp_max_grid).astype(np.int16), classification_fusion_margin, ) left_from_right_classif = None right_band_classif = None if cst.EPI_CLASSIFICATION in secondary_dataset: right_classif = secondary_dataset[cst.EPI_CLASSIFICATION].values right_band_classif = secondary_dataset.coords[cst.BAND_CLASSIF].values left_from_right_classif = estimate_right_classif_on_left( right_classif, disp_map, pandora_masks[cst_disp.VALID], int(np.floor(np.min(disp_min_grid))), int(np.ceil(np.max(disp_max_grid))), classification_fusion_margin, ) # mask outside left sensor left_mask = ( ref_dataset[cst.EPI_MSK].values == msk_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION ) left_mask_stacked = np.repeat( np.expand_dims(left_mask, axis=0), left_from_right_classif.shape[0], axis=0, ) # pylint: disable=unsupported-assignment-operation left_from_right_classif[left_mask_stacked] = 0 # Merge right classif classif, band_classif = merge_classif_left_right( left_classif, left_band_classif, left_from_right_classif, right_band_classif, ) # Fill disparity array with 0 value for invalid points disp_map[pandora_masks[cst_disp.VALID] == 0] = np.nan # Build output dataset row = np.array(range(new_roi[1], new_roi[3])) col = np.array(range(new_roi[0], new_roi[2])) disp_ds = xr.Dataset( { cst_disp.MAP: ([cst.ROW, cst.COL], np.copy(disp_map)), cst_disp.VALID: ( [cst.ROW, cst.COL], pandora_masks[cst_disp.VALID].astype("uint8"), ), }, coords={cst.ROW: row, cst.COL: col}, ) # add left texture add_texture( disp_ds, texture=epi_image, color_type=image_type, band_im=band_im, texture_bands=texture_bands, ) # add original mask if epi_msk is not None: disp_ds[cst.EPI_MSK] = xr.DataArray( epi_msk.astype("uint8"), dims=[cst.ROW, cst.COL] ) # add confidence if "confidence_measure" in disp: add_confidence(disp_ds, disp) # add classif add_classification(disp_ds, classif=classif, band_classif=band_classif) # Add disparity grids add_disparity_grids( disp_ds, disp_min_grid=disp_min_grid, disp_max_grid=disp_max_grid ) # add edges add_edges(disp_ds, ref_dataset) # Add filling infos if cropped_range is not None: disp_ds = add_crop_info(disp_ds, cropped_range) if compute_disparity_masks: for key, val in pandora_masks.items(): disp_ds[key] = xr.DataArray(np.copy(val), dims=[cst.ROW, cst.COL]) disp_ds.attrs[cst.ROI] = ref_dataset.attrs[cst.ROI] disp_ds.attrs[cst.ROI_WITH_MARGINS] = new_roi disp_ds.attrs[cst.EPI_MARGINS] = new_margin disp_ds.attrs[cst.EPI_FULL_SIZE] = ref_dataset.attrs[cst.EPI_FULL_SIZE] return disp_ds
[docs] def add_crop_info(disp_ds, cropped_range): """ Add crop info :param disp: disp xarray :param cropped_range: was cropped range, bool :return updated dataset """ disp_ds = fill_disp_wrappers.add_empty_filling_band( disp_ds, ["cropped_disp_range"] ) fill_disp_wrappers.update_filling( disp_ds, cropped_range, "cropped_disp_range" ) return disp_ds
# pylint: disable=too-many-positional-arguments
[docs] def estimate_right_classif_on_left( right_classif, disp_map, disp_mask, disp_min, disp_max, classifiation_fusion_margin, ): """ Estimate right classif on left image :param right_classif: right classification :type right_classif: np ndarray :param disp_map: disparity map :type disp_map: np ndarray :param disp_mask: disparity mask :type disp_mask: np ndarray :param disp_min: disparity min :type disp_min: int :param disp_max: disparity max :type disp_max: int :return: right classif on left image :rtype: np nadarray """ return dense_matching_cpp.estimate_right_classif_on_left( right_classif, disp_map, disp_mask, disp_min, disp_max, classifiation_fusion_margin, )
[docs] def mask_left_classif_from_right_mask( left_classif, right_mask, disp_min, disp_max, classifiation_fusion_margin, ): """ Mask left classif with right mask. :param left_classif: right classification :type left_classif: np ndarray :param right_mask: right mask :type right_mask: np ndarray :param disp_min: disparity min :type disp_min: np.array type int :param disp_max: disparity max :type disp_max: np.array type int :return: masked left classif :rtype: np nadarray """ return dense_matching_cpp.mask_left_classif_from_right_mask( left_classif, right_mask, disp_min, disp_max, classifiation_fusion_margin, )
[docs] def merge_classif_left_right( left_classif, left_band_classif, right_classif, right_band_classif ): """ Merge left and right classif :param left_classif: left classif :type left_classif: np nadarray :param left_band_classif: list of tag :type left_band_classif: list :param right_classif: left classif :type right_classif: np nadarray :param right_band_classif: list of tag :type right_band_classif: list :return: merged classif, merged tag list :rtype: np ndarray, list """ classif = None band_classif = None if left_classif is None and right_classif is not None: classif = right_classif band_classif = right_band_classif elif left_classif is not None and right_classif is None: classif = left_classif band_classif = left_band_classif elif left_classif is not None and right_classif is not None: list_classif = [] band_classif = [] seen_tag = [] # search in left for ind_left, tag_left in enumerate(left_band_classif): seen_tag.append(tag_left) band_classif.append(tag_left) if tag_left in right_band_classif: # merge left and right ind_right = list(right_band_classif).index(tag_left) list_classif.append( np.logical_or( left_classif[ind_left, :, :], right_classif[ind_right, :, :], ) ) else: list_classif.append(left_classif[ind_left, :, :]) # search in right not already seen for ind_right, tag_right in enumerate(right_band_classif): if tag_right not in seen_tag: band_classif.append(tag_right) list_classif.append(right_classif[ind_right, :, :]) # Stack classif classif = np.stack(list_classif, axis=0) return classif, band_classif
[docs] def add_confidence( output_dataset: xr.Dataset, disp: xr.Dataset, ): """ Add confidences to dataset :param output_dataset: output dataset :param disp: disp xarray """ confidence_measure_indicator_list = np.array( disp.confidence_measure.indicator ) for key in confidence_measure_indicator_list: if key == "confidence_from_ambiguity.cars_1": confidence_idx = list(disp.confidence_measure.indicator).index(key) output_dataset["ambiguity"] = xr.DataArray( 1 - np.copy( disp.confidence_measure.data[ :, :, confidence_idx, ] ), dims=[cst.ROW, cst.COL], ) else: confidence_idx = list(disp.confidence_measure.indicator).index(key) output_dataset[str(key)] = xr.DataArray( np.copy( disp.confidence_measure.data[ :, :, confidence_idx, ] ), dims=[cst.ROW, cst.COL], )
[docs] def to_safe_disp_grid(grid_disp_min, grid_disp_max): """ Generate safe grids, with min < max for each point :param grid_disp_min: min disp grid :param grid_disp_max: max disp grid :return: grid_disp_min, grid_disp_max """ stacked_disp_range = np.dstack([grid_disp_min, grid_disp_max]) grid_disp_min = np.nanmin(stacked_disp_range, axis=2) grid_disp_max = np.nanmax(stacked_disp_range, axis=2) # convert nan grid_disp_min[np.isnan(grid_disp_min)] = 0 grid_disp_max[np.isnan(grid_disp_max)] = 0 if (grid_disp_min > grid_disp_max).any(): raise RuntimeError("grid min > max") return grid_disp_min, grid_disp_max
[docs] def estimate_right_grid_disp(disp_min_grid, disp_max_grid): """ Estimate right grid min and max. Correspond to the range of pixels that can be correlated from left -> right. If no left pixels can be associated to right, use global values :param disp_min_grid: left disp min grid :type disp_min_grid: numpy ndarray :param disp_max_grid: left disp max grid :type disp_max_grid: numpy ndarray :return: disp_min_right_grid, disp_max_right_grid :rtype: numpy ndarray, numpy ndarray """ float_types = [np.float16, np.float32, np.float64, np.float128] int_types = [ int, np.int8, np.uint8, np.int16, np.uint16, np.int32, np.uint32, np.int64, np.uint64, ] if disp_min_grid.dtype in float_types: return dense_matching_cpp.estimate_right_grid_disp_float( disp_min_grid, disp_max_grid ) if disp_min_grid.dtype in int_types: return dense_matching_cpp.estimate_right_grid_disp_int( disp_min_grid, disp_max_grid ) raise TypeError( "estimate_right_grid_disp does not support" f"{disp_min_grid.dtype} as an input type" )
# pylint: disable=too-many-positional-arguments
[docs] def optimal_tile_size_pandora_plugin_libsgm( disp_min: int, disp_max: int, min_tile_size: int, max_tile_size: int, max_ram_per_worker: int, tile_size_rounding: int = 50, margin: int = 0, ) -> int: """ Compute optimal tile size according to estimated memory usage (pandora_plugin_libsgm) Returned optimal tile size will be at least equal to tile_size_rounding. :param disp_min: Minimum disparity to explore :param disp_max: Maximum disparity to explore :param min_tile_size: Minimal tile size :param max_tile_size: Maximal tile size :param max_ram_per_worker: amount of RAM allocated per worker :param tile_size_rounding: Optimal tile size will be aligned to multiples\ of tile_size_rounding :param margin: margin to remove to the computed tile size (as a percent of the computed tile size) :returns: Optimal tile size according to benchmarked memory usage """ memory = max_ram_per_worker disp = max(3, abs(disp_max - disp_min)) image = 32 * 2 disp_ref = 32 validity_mask_ref = 16 confidence = 32 cv_ = disp * 32 nan_ = disp * 8 cv_uint = disp * 8 penal = 8 * 32 * 2 img_crop = 32 * 2 tot = image + disp_ref + validity_mask_ref tot += confidence + 2 * cv_ + nan_ + cv_uint + penal + img_crop import_ = 200 # MiB row_or_col = float(((memory - import_) * 2**23)) / tot if row_or_col <= 0: logging.warning( "Optimal tile size is null, " "forcing it to {} pixels".format(tile_size_rounding) ) tile_size = tile_size_rounding else: # nb_pixels = tile_size x (tile_size + disp) # hence tile_size not equal to sqrt(nb_pixels) # but sqrt(nb_pixels + (disp/2)**2) - disp/2 tile_size = np.sqrt(row_or_col + (disp / 2) ** 2) - disp / 2 tile_size = (1.0 - margin / 100.0) * tile_size if math.isinf(tile_size): logging.warning("Tile size infinite") if tile_size > max_tile_size: tile_size = max_tile_size elif tile_size < min_tile_size: tile_size = min_tile_size tile_size = tile_size_rounding * int(tile_size / tile_size_rounding) return tile_size
[docs] def get_max_disp_from_opt_tile_size( opt_epipolar_tile_size, max_ram_per_worker, margin=0, used_disparity_range=0 ): """ Compute max range possible depending on max ram per worker Return max range usable :param opt_epipolar_tile_size: used tile size :param max_ram_per_worker: amount of RAM allocated per worker :param tile_size_rounding: Optimal tile size will be aligned to multiples\ of tile_size_rounding :param margin: margin to remove to the computed tile size (as a percent of the computed tile size) :returns: max disp range to use """ import_ = 200 memory = max_ram_per_worker # not depending on disp image = 32 * 2 disp_ref = 32 validity_mask_ref = 16 confidence = 32 penal = 8 * 32 * 2 img_crop = 32 * 2 # depending on disp : data * disp cv_ = 32 nan_ = 8 cv_uint = 8 row = opt_epipolar_tile_size / (1.0 - margin / 100.0) col = row + used_disparity_range row_or_col = row * col tot = float(((memory - import_) * 2**23)) / row_or_col disp_tot = tot - ( image + disp_ref + validity_mask_ref + confidence + penal + img_crop ) # disp tot = disp x (2 x cv_ + nan_ + cv uint) max_range = int(disp_tot / (2 * cv_ + nan_ + cv_uint) + 1) return max_range
[docs] def nan_ratio_func(window): """ " Calculate the number of nan in the window :param window: the window in the image """ total_pixels = window.size nan_count = np.isnan(window).sum() return nan_count / total_pixels
[docs] def confidence_filtering( dataset, requested_confidence, conf_filtering, ): """ Filter the disparity map by using the confidence :param dataset: the epipolar disparity map dataset :type dataset: cars dataset :param disp_map: the disparity map :type disp_map: numpy darray :param requested_confidence: the confidence to use :type requested_confidence: list :param conf_filtering: the confidence_filtering parameters :type conf_filtering: dict """ data_risk_inf = dataset[requested_confidence[0]].values data_risk_sup = dataset[requested_confidence[1]].values risk_range = data_risk_sup - data_risk_inf data_bounds_inf = dataset[requested_confidence[2]].values data_bounds_sup = dataset[requested_confidence[3]].values bounds_range = data_bounds_sup - data_bounds_inf disp_min = dataset["disp_min_grid"].values disp_max = dataset["disp_max_grid"].values risk_ratio = risk_range / (disp_max - disp_min) bounds_ratio = bounds_range / (disp_max - disp_min) bounds_mask = (bounds_ratio > conf_filtering["bounds_ratio_threshold"]) & ( bounds_range > conf_filtering["bounds_range_threshold"] ) risk_mask = (risk_ratio > conf_filtering["risk_ratio_threshold"]) & ( risk_range > conf_filtering["risk_range_threshold"] ) mask = bounds_mask | risk_mask dataset["disp"].values[mask] = np.nan dataset["disp_msk"].values[mask] = 0 with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) nan_ratio = generic_filter( dataset["disp"], nan_ratio_func, size=conf_filtering["win_nanratio"] ) mask = (nan_ratio > conf_filtering["nan_threshold"]) & ( (bounds_range > conf_filtering["bounds_range_threshold"]) | (risk_range > conf_filtering["risk_range_threshold"]) ) dataset["disp"].values[mask] = np.nan dataset["disp_msk"].values[mask] = 0