#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-lines
"""
This module is responsible for the filling disparity algorithms:
thus it fills the disparity map with values estimated according to
their neighbourhood.
"""
import copy
# Standard imports
import logging
import math
from typing import Tuple
# Third party imports
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
from rasterio.fill import fillnodata
from scipy.linalg import lstsq
from scipy.ndimage import (
binary_dilation,
binary_erosion,
generate_binary_structure,
label,
measurements,
median_filter,
)
from scipy.spatial.distance import cdist
from shapely import affinity
from skimage.segmentation import find_boundaries
# Cars import
from cars.applications.hole_detection import hole_detection_tools
from cars.conf import mask_cst
from cars.core import constants as cst
from .cpp import dense_match_filling_cpp
[docs]def fill_central_area_using_plane( # noqa: C901
disp_map: xr.Dataset,
corresponding_poly,
row_min,
col_min,
ignore_nodata: bool,
ignore_zero_fill: bool,
ignore_extrema: bool,
nb_pix: int,
percent_to_erode: float,
class_index: list,
):
"""
Finds central area of invalid region and estimates disparity values
in this area according to a plan model estimation. The estimation
of this model is done using disparity values at invalid region
borders.
:param disp_map: disparity map with several layers ('disp',
'disp_msk', 'msk_invalid_sec')
:type disp_map: 2D np.array (row, col)
:param corresponding_poly: list of holes polygons
:type corresponding_poly: list(Polygon)
:param row_min: row offset of combined tile
:type row_min: int
:param col_min: col offset of combined tile
:type col_min: int
:param ignore_nodata: option to activate to
ignore nodata values at disp mask borders
:type ignore_nodata: bool
:param ignore_zero_fill: option to activate to
ignore zero values at disp mask borders
:type ignore_zero_fill: bool
:param ignore_extrema: option to activate to ignore
extrema values at disp mask borders
:type ignore_extrema: bool
:param nb_pix: pixel number used to define disparity values band
at invalid region borders that will be considered for disp estimation
:type nb_pix: int
:param percent_to_erode: percentage to define size of central area
:type percent_to_erode: float
:param class_index: list of tag to use
:type class_index: list(str)
:return: mask of invalid region that hasn't been filled yet
(original invalid region - central area)
:rtype: 2D np.array (row, col)
"""
# Generate a structuring element that will consider features
# connected even if they touch diagonally
struct = generate_binary_structure(2, 2)
disp_mask = np.copy(disp_map["disp_msk"].values)
disp_values = np.copy(disp_map["disp"].values)
# Find invalid region of interest in disp data from polygon info
classif_mask = hole_detection_tools.classif_to_stacked_array(
disp_map, class_index
)
classif_mask_arrays, num_features = label(
(classif_mask > 0).astype(int), structure=struct
)
list_roi_msk = []
for segm in range(1, num_features + 1):
roi_msk = classif_mask_arrays == segm
# Create Polygon of current mask
mask_polys = hole_detection_tools.get_roi_coverage_as_poly_with_margins(
roi_msk, row_offset=row_min, col_offset=col_min, margin=0
)
# Clean mask polygons, remove artefacts
cleaned_mask_poly = []
for msk_pol in mask_polys:
# if area > 20 : small groups to remove
if msk_pol.area > 20:
cleaned_mask_poly.append(msk_pol)
if len(cleaned_mask_poly) > 1:
# polygons due to surrounding no data
# use biggest poly
main_poly = None
biggest_area = 0
for curent_poly in cleaned_mask_poly:
current_area = curent_poly.area
if current_area > biggest_area:
main_poly = curent_poly
cleaned_mask_poly = [main_poly]
logging.debug("Not single polygon for current mask")
intersect_holes = False
# Check if main poly intersect found classif polygons
if len(cleaned_mask_poly) > 0:
for hole_poly in corresponding_poly:
if hole_poly.intersects(cleaned_mask_poly[0]):
intersect_holes = True
if intersect_holes:
# is a hole to fill, not nodata in the border
# Option 'ignore_nodata' adds invalid values of disp mask at roi_msk
# invalid region borders
if ignore_nodata:
add_surrounding_nodata_to_roi(roi_msk, disp_values, disp_mask)
# Selected invalid region dilation
dilatation = binary_dilation(
roi_msk, structure=struct, iterations=nb_pix
)
# dilated mask - initial mask = band of 'nb_pix' pix around roi
roi_msk_tmp = np.logical_xor(dilatation, roi_msk)
# do not use nan
roi_msk_tmp = np.logical_and(
roi_msk_tmp,
~np.isnan(disp_values),
)
roi_msk_tmp = np.logical_and(
roi_msk_tmp,
~np.isnan(disp_mask),
)
# do not use zeros
roi_msk_tmp = np.logical_and(
roi_msk_tmp,
disp_values != 0,
)
# Band disp values retrieval
# Optional filter processing n°1 : ignore invalid values in band
if ignore_zero_fill:
initial_len = np.sum(roi_msk_tmp)
roi_msk_tmp = np.logical_and(
roi_msk_tmp,
disp_map["disp"].values.astype(bool),
)
logging.info(
"Zero_fill_disp_mask - Filtering {} \
disparity values, equivalent to {}% of data".format(
initial_len - np.sum(roi_msk_tmp),
100 - (100 * np.sum(roi_msk_tmp)) / initial_len,
)
)
band_disp_values = disp_values[roi_msk_tmp]
# Optional filter processing n°2 : remove extreme values (10%)
if ignore_extrema and len(band_disp_values) != 0:
initial_len = len(band_disp_values)
msk_extrema = np.copy(roi_msk_tmp)
msk_extrema[:] = 0
msk_extrema[
np.where(
abs(disp_values - np.mean(band_disp_values))
< 1.65 * np.std(band_disp_values)
)
] = 1
roi_msk_tmp = np.logical_and(
roi_msk_tmp,
msk_extrema,
)
band_disp_values = disp_values[roi_msk_tmp]
logging.info(
"Extrema values - Filtering {} disparity values,\
equivalent to {}% of data".format(
initial_len - len(band_disp_values),
100 - (100 * len(band_disp_values)) / initial_len,
)
)
if len(band_disp_values) != 0:
disp_moy = np.mean(band_disp_values)
logging.info("Disparity mean comptuted : {}".format(disp_moy))
# roi_msk can be filled with 0 if neighbours have filled mask
if np.sum(~roi_msk) > 0:
# Definition of central area to fill using plane model
erosion_value = define_interpolation_band_width(
roi_msk, percent_to_erode
)
central_area = binary_erosion(
roi_msk, structure=struct, iterations=erosion_value
)
# Exclude pixels outside of epipolar footprint
central_area = np.logical_and(
central_area,
disp_map[cst.EPI_MSK].values
!= mask_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION,
)
variable_disp = calculate_disp_plane(
band_disp_values,
roi_msk_tmp,
central_area,
)
disp_map["disp"].values[central_area] = variable_disp
disp_map["disp_msk"].values[central_area] = 255
disp_map[cst.EPI_MSK].values[central_area] = 0
update_filling(disp_map, central_area, "plane.hole_center")
# Retrieve borders that weren't filled yet
roi_msk[central_area] = 0
list_roi_msk.append(roi_msk)
return list_roi_msk
[docs]def add_surrounding_nodata_to_roi(
roi_mask,
disp,
disp_mask,
):
"""
Add surounding nodata to invalidity region
:param roi_mask: invalidity mask (values to fill)
:type roi_mask: 2D np.array (row, col)
:param disp: disparity values
:type disp: 2D np.array (row, col)
:param disp_mask: disparity values mask
:type disp_mask: 2D np.array (row, col)
"""
struct = generate_binary_structure(2, 2)
modified_nan_disp_mask = np.nan_to_num(disp_mask, nan=0, posinf=0)
all_mask = np.logical_or(
roi_mask.astype(bool), ~modified_nan_disp_mask.astype(bool)
)
# Added because zero values not included in disp_mask are present
all_mask = np.logical_or(all_mask, disp == 0)
labeled_msk_array, __ = label(all_mask.astype(int), structure=struct)
label_of_interest = np.unique(labeled_msk_array[np.where(roi_mask == 1)])
if len(label_of_interest) != 1:
logging.error(
"More than one label found for ROI :\
{}".format(
label_of_interest
)
)
for label_o_i in label_of_interest:
roi_mask[labeled_msk_array == label_o_i] = 1
else:
roi_mask[labeled_msk_array == label_of_interest] = 1
[docs]def define_interpolation_band_width(binary_image, percentage):
"""
Define number of pixel for later erosion operation
:param binary_image: invalidity mask (values to fill)
:type binary_image: 2D np.array (row, col)
:param percentage: percentage of border compared to center region
:type percentage: dict
:return: pixel number to erode
:rtype: int
"""
# Recherche des pixels de contour de la région masquée
contours = find_boundaries(binary_image, mode="inner")
# Localisation du centre de la zone masquée
# TODO: cas d'une zone bien concave --> résultat ok?
centroid = measurements.center_of_mass(binary_image)
# Recherche de la dist moy entre contours et centre de la zone
coords_contours = list(zip(*np.where(contours))) # noqa: B905
centroid_list = (centroid,) * len(coords_contours)
all_dist = cdist(centroid_list, coords_contours, "euclidean")
mean_dist = np.mean(all_dist)
erosion_value = np.round((percentage * mean_dist))
return int(erosion_value)
[docs]def plot_function(
data,
fit,
):
"""
Displays shape of plane/quadratic function used in region to fill.
:param data: coords and value of valid disparity values
:type data: 3D np.array (row, col)
:param fit: Least-squares solution.
:type fit: ndarray
:return: plot of function used in region to fill
:rtype: matplotlib plot
"""
plt.figure()
plt_ax = plt.subplot(111, projection="3d")
plt_ax.scatter(data[:, 0], data[:, 1], data[:, 2], color="b", alpha=0.5)
plt_ax.set_xlabel("x")
plt_ax.set_ylabel("y")
plt_ax.set_zlabel("z")
xlim = plt_ax.get_xlim()
ylim = plt_ax.get_ylim()
v_x, v_y = np.meshgrid(
np.arange(xlim[0], xlim[1]), np.arange(ylim[0], ylim[1])
)
val_ref = fit[0] * v_x + fit[1] * v_y + fit[2]
plt_ax.plot_wireframe(v_x, v_y, val_ref, color="r", alpha=0.5)
plt.show()
[docs]def calculate_disp_plane(
values,
mask,
central_area,
display=False,
):
"""
Estimates disparity values in disparity map which contains invalid
area using valid data and a plane model.
:param values: valid disparity values
:type values: 3D np.array (row, col)
:param mask: validity mask
:type mask: 3D np.array (row, col)
:param central_area: mask of disparity values to fill
:type central_area: 3D np.array (row, col)
:param display: plot interpolation fct in region to fill
:type display: boolean
:return: central interpolated disparity values
:rtype: list
"""
data_to_fill = np.where(central_area)
data = np.vstack([np.where(mask), values]).T
b_mat = data[:, 2]
# Calcul coefficient fonction optimale plan/quadratique
# ORDRE 1
a_mat = np.vstack((data[:, :2].T, np.ones_like(b_mat))).T
fit, __, __, __ = lstsq(a_mat, b_mat)
# Détermination des valeurs optimales pour les coords centrales
x_to_fill = data_to_fill[0]
y_to_fill = data_to_fill[1]
val = list(
map(lambda x, y: fit[0] * x + fit[1] * y + fit[2], x_to_fill, y_to_fill)
)
# Option d'affichage de la fonction plan
if display:
plot_function(data, fit)
return val
[docs]def fill_area_borders_using_interpolation(disp_map, masks_to_fill, options):
"""
Raster interpolation command
:param disp_map: disparity values
:type disp_map: 2D np.array (row, col)
:param masks_to_fill: masks to locate disp values to fill
:type masks_to_fill: list(2D np.array (row, col))
:param options: parameters for interpolation methods
:type options: dict
"""
# Copy input data - disparity values + mask with values to fill
raster = np.copy(disp_map["disp"].values)
# Interpolation step
for mask_to_fill in masks_to_fill:
# Exclude pixels outside of epipolar footprint
mask_to_fill = np.logical_and(
mask_to_fill,
disp_map[cst.EPI_MSK].values
!= mask_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION,
)
interpol_raster = make_raster_interpolation(
raster, mask_to_fill, options
)
# Insertion of interpolated data into disparity map
disp_map["disp"].values[mask_to_fill] = interpol_raster[mask_to_fill]
disp_map["disp_msk"].values[mask_to_fill] = 255
disp_map[cst.EPI_MSK].values[mask_to_fill] = 0
update_filling(disp_map, mask_to_fill, "plane.hole_border")
# --------------------------------------------------------------------
# Global functions for interpolation process
# --------------------------------------------------------------------
[docs]def make_raster_interpolation(
raster: np.ndarray, mask: np.ndarray, options: dict
):
"""
Raster interpolation (scipy, rasterio or pandora)
:param raster: disparity values
:type raster: 2D np.array (row, col)
:param mask: invalidity mask (values to fill)
:type mask: 2D np.array (row, col)
:param options: parameters for interpolation methods
:type options: dict
:return: interpolated raster
:rtype: 2D np.array
"""
if options["type"] == "fillnodata":
interpol_raster_tmp = np.copy(raster)
interpol_raster_tmp = fillnodata(
interpol_raster_tmp,
mask=~mask,
max_search_distance=options["max_search_distance"],
smoothing_iterations=options["smoothing_iterations"],
)
interpol_raster = median_filter(interpol_raster_tmp, size=(3, 3))
elif options["type"] == "pandora" and options["method"] == "mc_cnn":
interpol_raster, __ = fill_disp_pandora(raster, mask, 16)
elif options["type"] == "pandora" and options["method"] == "sgm":
interpol_raster, __ = fill_disp_pandora(raster, mask, 8)
else:
raise RuntimeError("Invalid interpolation type.")
return interpol_raster
[docs]def fill_disp_pandora(
disp: np.ndarray, msk_fill_disp: np.ndarray, nb_directions: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Interpolation of the left disparity map to fill holes.
Interpolate invalid pixel by finding the nearest correct pixels in
8/16 different directions and use the median of their disparities.
?bontar, J., & LeCun, Y. (2016). Stereo matching by training
a convolutional neural network to compare image
patches. The journal of machine learning research, 17(1), 2287-2318.
HIRSCHMULLER, Heiko. Stereo processing by semiglobal matching
and mutual information.
IEEE Transactions on pattern analysis and machine intelligence,
2007, vol. 30, no 2, p. 328-341.
Copied/adapted fct from pandora/validation/interpolated_disparity.py
:param disp: disparity map
:type disp: 2D np.array (row, col)
:param msk_fill_disp: validity mask
:type msk_fill_disp: 2D np.array (row, col)
:param nb_directions: nb directions to explore
:type nb_directions: integer
:return: the interpolate left disparity map,
with the validity mask update :
:rtype: tuple(2D np.array (row, col), 2D np.array (row, col))
"""
return dense_match_filling_cpp.fill_disp_pandora(
disp, msk_fill_disp, nb_directions
)
[docs]def estimate_poly_with_disp(poly, dmin=0, dmax=0):
"""
Estimate new polygone using disparity range
:param poly: polygone to estimate
:type poly: Polygon
:param dmin: minimum disparity
:type dmin: int
:param dmax: maximum disparity
:type dmax: int
:return: polygon in disparity range
:rtype: Polygon
"""
dmin = int(math.floor(dmin))
dmax = int(math.ceil(dmax))
new_poly = copy.copy(poly)
for disp in range(dmin, dmax + 1):
translated_poly = affinity.translate(poly, xoff=0.0, yoff=disp)
new_poly = new_poly.union(translated_poly)
return poly
[docs]def get_corresponding_holes(tile_poly, holes_poly_list):
"""
Get list of holes situated in tile
:param tile_poly: envelop of tile
:type tile_poly: Polygon
:param holes_poly_list: envelop of holes
:type holes_poly_list: list(Polygon)
:return: list of holes envelops
:rtype: list(Polygon)
"""
corresponding_holes = []
for poly in holes_poly_list:
if tile_poly.intersects(poly):
corresponding_holes.append(poly)
return corresponding_holes
[docs]def get_corresponding_tiles(tiles_polygones, corresponding_holes, epi_disp_map):
"""
Get list of tiles intersecting with holes
:param tiles_polygones: envelop of tiles
:type tiles_polygones: list(Polygon)
:param corresponding_holes: envelop of holes
:type corresponding_holes: list(Polygon)
:param epi_disp_map: disparity map cars dataset
:type epi_disp_map: CarsDataset
:return: list of tiles to use (window, overlap, xr.Dataset)
:rtype: list(tuple)
"""
corresponding_tiles_row_col = []
corresponding_tiles = []
for key_tile, poly_tile in tiles_polygones.items():
for poly_hole in corresponding_holes:
if poly_tile.intersects(poly_hole):
if key_tile not in corresponding_tiles_row_col:
corresponding_tiles_row_col.append(key_tile)
for row, col in corresponding_tiles_row_col:
corresponding_tiles.append(
(
epi_disp_map.tiling_grid[row, col],
epi_disp_map.overlaps[row, col],
epi_disp_map[row, col],
)
)
return corresponding_tiles
[docs]def get_polygons_from_cars_ds(cars_ds):
"""
Get the holes envelops computed in holes detection application
cars_ds must contain dicts, and not delayed.
This function must be called after an orchestrator.breakpoint()
:param cars_ds: holes cars dataset
:type cars_ds: CarsDataset
:return: list of holes
:rtype: list(Polygon)
"""
list_poly = []
if cars_ds is not None:
for row in range(cars_ds.shape[0]):
for col in range(cars_ds.shape[1]):
if cars_ds[row, col] is not None:
list_poly += cars_ds[row, col].data["list_bbox"]
return list_poly
[docs]def merge_intersecting_polygones(list_poly):
"""
Merge polygons that intersects each other
:param list_poly: list of holes
:type list_poly: list(Polygon)
:return: list of holes
:rtype: list(Polygon)
"""
new_list_poly = list_poly
merged_list = []
while len(new_list_poly) > 0:
current_poly = new_list_poly[0]
new_poly = current_poly
to_delete = [0]
for element_id in range(1, len(new_list_poly)):
if new_poly.intersects(new_list_poly[element_id]):
# Delete from list
to_delete.append(element_id)
# Merge with current
new_poly = new_poly.union(new_list_poly[element_id])
# Add new poly to merged list
merged_list.append(new_poly)
# Delete element
for _ in range(len(to_delete)):
# Start with last ones
pos = to_delete.pop()
new_list_poly.pop(pos)
return merged_list
[docs]def fill_disp_using_plane(
disp_map: xr.Dataset,
corresponding_poly,
row_min,
col_min,
ignore_nodata: bool,
ignore_zero_fill: bool,
ignore_extrema: bool,
nb_pix: int,
percent_to_erode: float,
interp_options: dict,
classification,
) -> xr.Dataset:
"""
Fill disparity map holes
:param disp_map: disparity map
:type disp_map: xr.Dataset
:param corresponding_poly: list of holes polygons
:type corresponding_poly: list(Polygon)
:param row_min: row offset of combined tile
:type row_min: int
:param col_min: col offset of combined tile
:type col_min: int
:param ignore_nodata: ingore nodata
:type ignore_nodata: bool
:param ignore_zero_fill: ingnore zero fill
:type ignore_zero_fill: bool
:param ignore_extrema: ignore extrema
:type ignore_extrema: bool
:param nb_pix: margin to use
:type nb_pix: int
:param percent_to_erode: percent to erode
:type percent_to_erode: float
:param interp_options: interp_options
:type interp_options: dict
:param classification: list of tag to use
:type classification: list(str)
"""
border_region = fill_central_area_using_plane(
disp_map,
corresponding_poly,
row_min,
col_min,
ignore_nodata,
ignore_zero_fill,
ignore_extrema,
nb_pix,
percent_to_erode,
classification,
)
fill_area_borders_using_interpolation(
disp_map,
border_region,
interp_options,
)
[docs]def fill_disp_using_zero_padding(
disp_map: xr.Dataset,
class_index,
) -> xr.Dataset:
"""
Fill disparity map holes
:param disp_map: disparity map
:type disp_map: xr.Dataset
:param class_index: class index according to the classification tag
:type class_index: int
"""
# get index of the application class config
# according the coords classif band
if cst.BAND_CLASSIF in disp_map.coords:
# get index for each band classification
stack_index = hole_detection_tools.classif_to_stacked_array(
disp_map, class_index
)
# Exclude pixels outside of epipolar footprint
stack_index = np.logical_and(
stack_index,
disp_map[cst.EPI_MSK].values
!= mask_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION,
)
# set disparity value to zero where the class is
# non zero value and masked region
disp_map["disp"].values[stack_index] = 0
disp_map["disp_msk"].values[stack_index] = 255
disp_map[cst.EPI_MSK].values[stack_index] = 0
# Add a band to disparity dataset to memorize which pixels are filled
update_filling(disp_map, stack_index, "zeros_padding")
[docs]def add_empty_filling_band(
output_dataset: xr.Dataset,
filling_types: list,
):
"""
Add filling attribute to dataset or band to filling attribute
if it already exists
:param output_dataset: output dataset
:param filling: input mask of filled pixels
:param band_filling: type of filling (zero padding or plane)
"""
nb_band = len(filling_types)
nb_row = len(output_dataset.coords[cst.ROW])
nb_col = len(output_dataset.coords[cst.COL])
filling = np.zeros((nb_band, nb_row, nb_col), dtype=bool)
filling = xr.Dataset(
data_vars={
cst.EPI_FILLING: ([cst.BAND_FILLING, cst.ROW, cst.COL], filling)
},
coords={
cst.BAND_FILLING: filling_types,
cst.ROW: output_dataset.coords[cst.ROW],
cst.COL: output_dataset.coords[cst.COL],
},
)
# Add band to EPI_FILLING attribute or create the attribute
return xr.merge([output_dataset, filling])
[docs]def update_filling(
output_dataset: xr.Dataset,
filling: np.ndarray = None,
filling_type: str = None,
):
"""
Update filling attribute of dataset with an additional mask
:param output_dataset: output dataset
:param filling: input mask of filled pixels
:param band_filling: type of filling (zero padding or plane)
"""
# Select accurate band of output according to the type of filling
filling_type = {cst.BAND_FILLING: filling_type}
# Add True values from inputmask to output accurate band
filling = filling.astype(bool)
output_dataset[cst.EPI_FILLING].sel(**filling_type).values[filling] = True