#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module is responsible for the dense matching algorithms:
- thus it creates a disparity map from a pair of images
"""
import copy
# Standard imports
import logging
from typing import Dict
import numpy as np
import pandora
import xarray as xr
# Third party imports
from pandora.check_configuration import check_datasets
from pandora.state_machine import PandoraMachine
from scipy.interpolate import (
LinearNDInterpolator,
NearestNDInterpolator,
RegularGridInterpolator,
)
from cars.applications.dense_matching import dense_matching_wrappers as dm_wrap
# CARS imports
from cars.core import constants as cst
from cars.core import inputs
# pylint: disable=too-many-lines
[docs]
def compute_disparity_grid(
disp_range_grid,
left_image_object,
right_image_object,
used_band,
threshold_disp_range_to_borders,
):
"""
Compute dense disparity grids min and max for pandora
superposable to left image
:param disp_range_grid: disp range grid with min and max grids
:type disp_range_grid: CarsDataset
:param left_image_object: left image
:type left_image_object: xr.Dataset
:return disp min map, disp_max_map
:rtype np.ndarray, np.ndarray
"""
disp_min_grid_arr, _ = inputs.rasterio_read_as_array(
disp_range_grid["grid_min_path"]
)
disp_max_grid_arr, _ = inputs.rasterio_read_as_array(
disp_range_grid["grid_max_path"]
)
row_range = disp_range_grid["row_range"]
col_range = disp_range_grid["col_range"]
# Create interpolators
interp_min = RegularGridInterpolator(
(
row_range,
col_range,
),
disp_min_grid_arr,
)
interp_max = RegularGridInterpolator(
(
row_range,
col_range,
),
disp_max_grid_arr,
)
# Interpolate disp on grid
roi_with_margins = left_image_object.attrs["roi_with_margins"]
row_range = np.arange(roi_with_margins[1], roi_with_margins[3])
col_range = np.arange(roi_with_margins[0], roi_with_margins[2])
row_grid, col_grid = np.meshgrid(row_range, col_range, indexing="ij")
disp_min_grid = interp_min((row_grid, col_grid)).astype("float32")
disp_max_grid = interp_max((row_grid, col_grid)).astype("float32")
# Compute extremums of disparity considering left image borders
if threshold_disp_range_to_borders:
disp_min_from_borders = np.zeros_like(disp_min_grid)
disp_max_from_borders = np.zeros_like(disp_max_grid)
right_msk = (
np.array(right_image_object[cst.EPI_MSK].loc[used_band]) == 0
)
index_of_first_valid_pixel = np.argmax(right_msk, axis=1)
index_of_last_valid_pixel = np.argmax(
np.flip(right_msk, axis=1), axis=1
)
index_of_last_valid_pixel = (
right_msk.shape[1] - index_of_last_valid_pixel
)
any_valid_pixel_exists = np.any(right_msk, axis=1)
right_msk_indices = zip( # noqa: B905
index_of_first_valid_pixel,
index_of_last_valid_pixel,
any_valid_pixel_exists,
)
for row_id, (first, last, exists) in enumerate(right_msk_indices):
if exists:
disp_min_from_borders[row_id, first:last] = np.flip(
np.arange(first - last, 0)
)
disp_max_from_borders[row_id, first:last] = np.flip(
np.arange(0, last - first)
)
disp_min_from_borders = np.minimum(disp_max_grid, disp_min_from_borders)
disp_max_from_borders = np.maximum(disp_min_grid, disp_max_from_borders)
disp_min_grid = np.maximum(disp_min_from_borders, disp_min_grid)
disp_max_grid = np.minimum(disp_max_from_borders, disp_max_grid)
# Interpolation might create min > max
disp_min_grid, disp_max_grid = dm_wrap.to_safe_disp_grid(
disp_min_grid, disp_max_grid
)
return disp_min_grid, disp_max_grid
[docs]
def compute_disparity( # pylint: disable=too-many-positional-arguments
left_dataset,
right_dataset,
corr_cfg,
used_band=None,
disp_min_grid=None,
disp_max_grid=None,
compute_disparity_masks=False,
cropped_range=None,
margins_to_keep=0,
classification_fusion_margin=-1,
texture_bands=None,
filter_incomplete_disparity_range=True,
classif_bands_to_mask=None,
) -> Dict[str, xr.Dataset]:
"""
This function will compute disparity.
:param left_dataset: Dataset containing left image and mask
:type left_dataset: xarray.Dataset
:param right_dataset: Dataset containing right image and mask
:type right_dataset: xarray.Dataset
:param corr_cfg: Correlator configuration
:type corr_cfg: dict
:param used_band: name of band used for correlation
:type used_band: str
:param disp_min_grid: Minimum disparity grid
(if None, value is taken from left dataset)
:type disp_min_grid: np ndarray
:param disp_max_grid: Maximum disparity grid
(if None, value is taken from left dataset)
:type disp_max_grid: np ndarray
:param compute_disparity_masks: Activation of compute_disparity_masks mode
:type compute_disparity_masks: Boolean
:param cropped_range: true if disparity range was cropped
:type cropped_range: numpy array
:param margins_to_keep: margin to keep after dense matching
:type margins_to_keep: int
:param classification_fusion_margin: the margin to add for the fusion
:type classification_fusion_margin: int
:param classif_bands_to_mask: bands from classif to mask
:type classif_bands_to_mask: list of str / int
:return: Disparity dataset
"""
# Save dataset
saved_left_dataset = copy.deepcopy(left_dataset)
saved_right_dataset = copy.deepcopy(right_dataset)
# Check disp min and max bounds with respect to margin used for
# rectification
# Get tile global disp
disp_min = np.floor(np.min(disp_min_grid))
disp_max = np.ceil(np.max(disp_max_grid))
if disp_min < left_dataset.attrs[cst.EPI_DISP_MIN]:
logging.error(
"disp_min ({}) is lower than disp_min used to determine "
"margin during rectification ({})".format(
disp_min, left_dataset.attrs["disp_min"]
)
)
if disp_max > left_dataset.attrs[cst.EPI_DISP_MAX]:
logging.error(
"disp_max ({}) is greater than disp_max used to determine "
"margin during rectification ({})".format(
disp_max, left_dataset.attrs["disp_max"]
)
)
# Load pandora plugin
pandora.import_plugin()
# Update nodata values
left_dataset.attrs[cst.EPI_NO_DATA_IMG] = corr_cfg["input"]["nodata_left"]
right_dataset.attrs[cst.EPI_NO_DATA_IMG] = corr_cfg["input"]["nodata_right"]
# Put disparity in datasets
left_disparity = xr.DataArray(
data=np.array(
[disp_min_grid, disp_max_grid],
),
dims=["band_disp", "row", "col"],
coords={"band_disp": ["min", "max"]},
)
(disp_min_right_grid, disp_max_right_grid) = (
dm_wrap.estimate_right_grid_disp(disp_min_grid, disp_max_grid)
)
# estimation might create max < min
disp_min_right_grid, disp_max_right_grid = dm_wrap.to_safe_disp_grid(
disp_min_right_grid, disp_max_right_grid
)
right_disparity = xr.DataArray(
data=np.array(
[disp_min_right_grid, disp_max_right_grid],
),
dims=["band_disp", "row", "col"],
coords={"band_disp": ["min", "max"]},
)
left_dataset["disparity"] = left_disparity
right_dataset["disparity"] = right_disparity
# Update invalidity masks: all classes in classification should be 0
if (
cst.EPI_CLASSIFICATION in left_dataset
and classif_bands_to_mask not in (None, [])
):
classif_values = (
left_dataset[cst.EPI_CLASSIFICATION]
.sel(band_classif=classif_bands_to_mask)
.values
)
left_dataset[cst.EPI_MSK] = np.logical_or(
left_dataset[cst.EPI_MSK],
np.repeat(
np.any(classif_values != 0, axis=0)[np.newaxis, ...],
left_dataset.sizes["band_im"],
axis=0,
),
)
if (
cst.EPI_CLASSIFICATION in right_dataset
and classif_bands_to_mask not in (None, [])
):
classif_values = (
right_dataset[cst.EPI_CLASSIFICATION]
.sel(band_classif=classif_bands_to_mask)
.values
)
right_dataset[cst.EPI_MSK] = np.logical_or(
right_dataset[cst.EPI_MSK],
np.repeat(
np.any(classif_values != 0, axis=0)[np.newaxis, ...],
right_dataset.sizes["band_im"],
axis=0,
),
)
if used_band is not None:
def remove_band(current_dataset, used_band):
"""
Remove band_im dimension from mask
"""
current_mask = current_dataset[cst.EPI_MSK]
current_mask = current_mask.loc[used_band]
current_dataset = current_dataset.drop_vars([cst.EPI_MSK])
current_dataset[cst.EPI_MSK] = current_mask
return current_dataset
# remove band_im in masks
[
left_dataset,
saved_left_dataset,
right_dataset,
saved_right_dataset,
] = [
remove_band(current_dataset, used_band)
for current_dataset in [
left_dataset,
saved_left_dataset,
right_dataset,
saved_right_dataset,
]
]
# Check that the datasets are not full of nan (would crash later)
if left_dataset["msk"].all() or right_dataset["msk"].all():
height = left_dataset.sizes["row"]
width = left_dataset.sizes["col"]
ref = xr.Dataset(
coords={
"row": left_dataset.coords["row"],
"col": left_dataset.coords["col"],
"indicator": [
"confidence_from_ambiguity.cars_1",
"confidence_from_risk_max.cars_2",
"confidence_from_risk_min.cars_2",
"confidence_from_disp_sup_from_risk.cars_2",
"confidence_from_disp_inf_from_risk.cars_2",
"confidence_from_interval_bounds_inf.cars_3",
"confidence_from_interval_bounds_sup.cars_3",
"confidence_from_left_right_consistency",
],
},
data_vars={
"disparity_map": (
("row", "col"),
np.full((height, width), np.nan, dtype=np.float32),
),
"validity_mask": (
("row", "col"),
np.ones((height, width), dtype=np.int64),
),
"confidence_measure": (
("row", "col", "indicator"),
np.full((height, width, 8), np.nan, dtype=np.float32),
),
},
)
else:
# Instantiate pandora state machine
pandora_machine = PandoraMachine()
# check datasets
check_datasets(left_dataset, right_dataset)
# Run the Pandora pipeline
ref, _ = pandora.run(
pandora_machine,
left_dataset,
right_dataset,
corr_cfg,
)
disp_dataset = dm_wrap.create_disp_dataset(
ref,
saved_left_dataset,
saved_right_dataset,
compute_disparity_masks=compute_disparity_masks,
disp_min_grid=disp_min_grid,
disp_max_grid=disp_max_grid,
cropped_range=cropped_range,
margins_to_keep=margins_to_keep,
classification_fusion_margin=classification_fusion_margin,
texture_bands=texture_bands,
filter_incomplete_disparity_range=filter_incomplete_disparity_range,
)
return disp_dataset