#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the wrapper used in disparity grid computation.
"""
# Standard imports
import itertools
import logging
# Third party imports
import numpy as np
import rasterio
import xarray as xr
from scipy.ndimage import maximum_filter, minimum_filter
from shareloc.proj_utils import transform_physical_point_to_index
# CARS imports
import cars.applications.dense_matching.dense_matching_constants as dm_cst
import cars.orchestrator.orchestrator as ocht
from cars.applications.dense_matching.dense_matching_algo import (
LinearInterpNearestExtrap,
)
from cars.core import inputs, projection
from cars.core.projection import point_cloud_conversion
from cars.data_structures import cars_dataset, cars_dict
# pylint: disable=too-many-positional-arguments
[docs]
def generate_disp_grids_dataset(
grid_min,
grid_max,
saving_info,
raster_profile,
window=None,
row_coords=None,
col_coords=None,
):
"""
Generate disparity grids xarray dataset
:param grid_min: disp grid min
:type grid_min: np.ndarray
:param grid_max: disp grid max
:type grid_max: np.ndarray
:param saving_info: saving infos
:type saving_info: dict
:param raster_profile: raster_profile
:type raster_profile: dict
:param row_coords: row cooordinates
:type row_coords: np.ndarray, optional
:param col_coords: col coordinates
:type col_coords: np.ndarray, optional
:return: disp range dataset
:rtype: xarray.Dataset
"""
if row_coords is None:
row_coords = np.arange(0, grid_min.shape[0])
if col_coords is None:
col_coords = np.arange(0, grid_min.shape[1])
disp_range_tile = xr.Dataset(
data_vars={
dm_cst.DISP_MIN_GRID: (["row", "col"], grid_min),
dm_cst.DISP_MAX_GRID: (["row", "col"], grid_max),
},
coords={
"row": row_coords,
"col": col_coords,
},
)
cars_dataset.fill_dataset(
disp_range_tile,
saving_info=saving_info,
window=window,
profile=raster_profile,
attributes=None,
overlaps=None,
)
return disp_range_tile
# pylint: disable=too-many-positional-arguments
[docs]
def generate_disp_range_const_tile_wrapper(
row_range,
col_range,
dmin,
dmax,
raster_profile,
saving_info,
saving_info_global_infos,
):
"""
Generate disparity range dataset from constant dmin and dmax
:param row_range: Row range
:type row_range: list
:param col_range: Column range.
:type col_range: list
:param dmin: disparity minimum.
:type dmin: float
:param dmax: disparity maximum.
:type dmax: float
:param raster_profile: The raster profile.
:type raster_profile: dict
:param saving_info: The disp range grid saving information.
:type saving_info: dict
:param saving_info_global_infos: Global info saving infos.
:type saving_info_global_infos: dict
:return: Disparity range grid
:rtype: dict
"""
grid_min = np.empty((len(row_range), len(col_range)))
grid_max = np.empty((len(row_range), len(col_range)))
grid_min[:, :] = dmin
grid_max[:, :] = dmax
mono_tile_saving_info = ocht.update_saving_infos(saving_info, row=0, col=0)
disp_range = generate_disp_grids_dataset(
grid_min, grid_max, mono_tile_saving_info, raster_profile, window=None
)
# Generate infos on global min and max
global_infos = cars_dict.CarsDict(
{"global_min": np.nanmin(dmin), "global_max": np.nanmin(dmax)}
)
cars_dataset.fill_dict(global_infos, saving_info=saving_info_global_infos)
return disp_range, global_infos
# pylint: disable=too-many-positional-arguments
[docs]
def generate_disp_range_from_dem_wrapper(
epipolar_grid_array_window,
full_epi_row_range,
full_epi_col_range,
sensor_image_right,
grid_right,
geom_plugin_with_dem_and_geoid,
dem_min,
dem_max,
raster_profile,
saving_info,
saving_info_global_infos,
filter_overlap,
disp_to_alt_ratio,
disp_min_threshold=None,
disp_max_threshold=None,
):
"""
Generate disparity range dataset from dems
:param epipolar_grid_array_window: The window of the epipolar grid array.
:type epipolar_grid_array_window: dict
:param full_epi_row_range: The full range of rows in the epipolar grid.
:type full_epi_row_range: list
:param full_epi_col_range: The full range of columns in the epipolar grid.
:type full_epi_col_range: list
:param sensor_image_right: The right sensor image.
:type sensor_image_right: dict
:param grid_right: The right epipolar grid.
:type grid_right: dict
:param geom_plugin_with_dem_and_geoid: The geometry plugin with DEM.
:type geom_plugin_with_dem_and_geoid: object
:param dem_min: Path of dem min.
:type dem_min: str
:param dem_max: Path of dem max.
:type dem_max: srt
:param raster_profile: The raster profile.
:type raster_profile: dict
:param saving_info: The disp range grid saving information.
:type saving_info: dict
:param saving_info_global_infos: Global info saving infos.
:type saving_info_global_infos: dict
:param filter_overlap: The overlap to use for filtering.
:type filter_overlap: int
:param disp_to_alt_ratio: disparity to altitude ratio
:type disp_to_alt_ratio: float
:param disp_min_threshold: The minimum disparity threshold.
:type disp_min_threshold: float, optional
:param disp_max_threshold: The maximum disparity threshold.
:type disp_max_threshold: float, optional
:return: Disparity range grid
:rtype: dict
"""
# compute reverse matrix
transform_sensor = inputs.rasterio_get_transform(
sensor_image_right["image"]["bands"]["b0"]["path"], convention="north"
)
trans_inv_sensor = ~transform_sensor
# Geometry plugin
geo_plugin = geom_plugin_with_dem_and_geoid
dem_median = geo_plugin.dem
# get epsg
terrain_epsg = inputs.rasterio_get_epsg(dem_median)
# Get epipolar position of all dem mean
transform_dem_median = inputs.rasterio_get_transform(dem_median)
# use local disparity
# Get associated alti mean / min / max values
dem_median_shape = inputs.rasterio_get_size(dem_median)
dem_median_width, dem_median_height = dem_median_shape
# get corresponding window from epipolar_array_window
epi_grid_margin = filter_overlap + 1
epi_grid_row_min = epipolar_grid_array_window["row_min"]
epi_grid_row_max = epipolar_grid_array_window["row_max"]
epi_grid_col_min = epipolar_grid_array_window["col_min"]
epi_grid_col_max = epipolar_grid_array_window["col_max"]
def clip(value, min_value, max_value):
"""
Clip a value inside bounds
"""
return int(max(min_value, min(value, max_value)))
# Epi grid tile coordinate to use, with and without margins
epi_grid_row_min_with_margin = clip(
epi_grid_row_min - epi_grid_margin, 0, len(full_epi_row_range)
)
epi_grid_row_max_with_margin = clip(
epi_grid_row_max + epi_grid_margin, 0, len(full_epi_row_range)
)
epi_grid_col_min_with_margin = clip(
epi_grid_col_min - epi_grid_margin, 0, len(full_epi_col_range)
)
epi_grid_col_max_with_margin = clip(
epi_grid_col_max + epi_grid_margin, 0, len(full_epi_col_range)
)
# range to use for epipolar interpolation
row_range_with_margin = full_epi_row_range[
epi_grid_row_min_with_margin:epi_grid_row_max_with_margin
]
row_range_no_margin = full_epi_row_range[epi_grid_row_min:epi_grid_row_max]
col_range_with_margin = full_epi_col_range[
epi_grid_col_min_with_margin:epi_grid_col_max_with_margin
]
col_range_no_margin = full_epi_col_range[epi_grid_col_min:epi_grid_col_max]
# Loc on dem median
epi_bbox = [
(np.min(col_range_with_margin), np.min(row_range_with_margin)),
(np.min(col_range_with_margin), np.max(row_range_with_margin)),
(np.max(col_range_with_margin), np.min(row_range_with_margin)),
(np.max(col_range_with_margin), np.max(row_range_with_margin)),
]
sensor_bbox = geo_plugin.sensor_position_from_grid(
grid_right, epi_bbox, interpolation_method="linear"
)
row_sensor_bbox, col_sensor_bbox = transform_physical_point_to_index(
trans_inv_sensor, sensor_bbox[:, 1], sensor_bbox[:, 0]
)
terrain_bbox = geo_plugin.safe_direct_loc(
sensor_image_right["image"]["bands"]["b0"]["path"],
sensor_image_right["geomodel"],
col_sensor_bbox,
row_sensor_bbox,
)
# reshape terrain bbox
terrain_bbox = terrain_bbox[0:2].T
terrain_bbox[:, [1, 0]] = terrain_bbox[:, [0, 1]]
# get pixel location on dem median
pixel_roi_dem_mean = inputs.rasterio_get_pixel_points(
dem_median, terrain_bbox
)
# Add margins (for interpolation) and clip
dem_margin = 10 # arbitrary
roi_lower_row = np.floor(np.min(pixel_roi_dem_mean[:, 0])) - dem_margin
roi_upper_row = np.ceil(np.max(pixel_roi_dem_mean[:, 0])) + dem_margin
roi_lower_col = np.floor(np.min(pixel_roi_dem_mean[:, 1])) - dem_margin
roi_upper_col = np.ceil(np.max(pixel_roi_dem_mean[:, 1])) + dem_margin
min_row = clip(roi_lower_row, 0, dem_median_height)
max_row = clip(roi_upper_row, 0, dem_median_height)
min_col = clip(roi_lower_col, 0, dem_median_width)
max_col = clip(roi_upper_col, 0, dem_median_width)
# compute terrain positions to use (all dem min and max)
row_indexes = range(min_row, max_row)
col_indexes = range(min_col, max_col)
transformer = rasterio.transform.AffineTransformer(transform_dem_median)
if len(row_indexes) == 0 or len(col_indexes) == 0:
disp_range, global_infos = empty_disparity_grids(
row_range_no_margin,
col_range_no_margin,
epipolar_grid_array_window,
raster_profile,
saving_info,
saving_info_global_infos,
)
return disp_range, global_infos
indexes = np.array(list(itertools.product(row_indexes, col_indexes)))
row = indexes[:, 0]
col = indexes[:, 1]
x_mean, y_mean = transformer.xy(row, col)
terrain_positions = np.transpose(np.array([x_mean, y_mean]))
# dem mean in terrain_epsg
x_mean = terrain_positions[:, 0]
y_mean = terrain_positions[:, 1]
dem_median_list = inputs.rasterio_get_values(
dem_median, x_mean, y_mean, point_cloud_conversion
)
nan_mask = ~np.isnan(dem_median_list)
# transform to lon lat
terrain_position_lon_lat = projection.point_cloud_conversion(
terrain_positions, terrain_epsg, 4326
)
lon_mean = terrain_position_lon_lat[:, 0]
lat_mean = terrain_position_lon_lat[:, 1]
# dem min and max are in 4326
dem_min_list = inputs.rasterio_get_values(
dem_min, lon_mean, lat_mean, point_cloud_conversion
)
dem_max_list = inputs.rasterio_get_values(
dem_max, lon_mean, lat_mean, point_cloud_conversion
)
if dem_min_list is None or dem_max_list is None:
logging.warning("DEM min and DEM max does not cover this tile")
disp_range, global_infos = empty_disparity_grids(
row_range_no_margin,
col_range_no_margin,
epipolar_grid_array_window,
raster_profile,
saving_info,
saving_info_global_infos,
)
return disp_range, global_infos
nan_mask = nan_mask & ~np.isnan(dem_min_list) & ~np.isnan(dem_max_list)
# filter nan value from input points
lon_mean = lon_mean[nan_mask]
lat_mean = lat_mean[nan_mask]
dem_median_list = dem_median_list[nan_mask]
dem_min_list = dem_min_list[nan_mask]
dem_max_list = dem_max_list[nan_mask]
# sensors physical positions
(
ind_cols_sensor,
ind_rows_sensor,
_,
) = geom_plugin_with_dem_and_geoid.safe_inverse_loc(
sensor_image_right["image"]["bands"]["b0"]["path"],
sensor_image_right["geomodel"],
lat_mean,
lon_mean,
z_coord=dem_median_list,
)
# Generate epipolar disp grids
# Get epipolar positions
epipolar_positions_row, epipolar_positions_col = np.meshgrid(
col_range_with_margin,
row_range_with_margin,
)
epipolar_positions = np.stack(
[epipolar_positions_row, epipolar_positions_col], axis=2
)
# Get sensor position
sensors_positions = (
geom_plugin_with_dem_and_geoid.sensor_position_from_grid(
grid_right,
np.reshape(
epipolar_positions,
(
epipolar_positions.shape[0] * epipolar_positions.shape[1],
2,
),
),
interpolation_method="linear",
)
)
# Transform physical position to index
ind_rows_sensor_grid, ind_cols_sensor_grid = (
transform_physical_point_to_index(
trans_inv_sensor, sensors_positions[:, 1], sensors_positions[:, 0]
)
)
if len(ind_rows_sensor) < 5:
# QH6214 needs at least 4 points for interpolation
disp_range, global_infos = empty_disparity_grids(
row_range_no_margin,
col_range_no_margin,
epipolar_grid_array_window,
raster_profile,
saving_info,
saving_info_global_infos,
)
return disp_range, global_infos
# Interpolate disparity
disp_min_points = -(dem_max_list - dem_median_list) / disp_to_alt_ratio
disp_max_points = -(dem_min_list - dem_median_list) / disp_to_alt_ratio
interp_min_linear = LinearInterpNearestExtrap(
list(zip(ind_rows_sensor, ind_cols_sensor)), # noqa: B905
disp_min_points,
)
interp_max_linear = LinearInterpNearestExtrap(
list(zip(ind_rows_sensor, ind_cols_sensor)), # noqa: B905
disp_max_points,
)
grid_min = np.reshape(
interp_min_linear(ind_rows_sensor_grid, ind_cols_sensor_grid),
(
epipolar_positions.shape[0],
epipolar_positions.shape[1],
),
)
grid_max = np.reshape(
interp_max_linear(ind_rows_sensor_grid, ind_cols_sensor_grid),
(
epipolar_positions.shape[0],
epipolar_positions.shape[1],
),
)
# Add margin
diff = grid_max - grid_min
logging.info("Max grid max - grid min : {} disp ".format(np.max(diff)))
if disp_min_threshold is not None:
if np.any(grid_min < disp_min_threshold):
logging.warning(
"Override disp_min with disp_min_threshold {}".format(
disp_min_threshold
)
)
grid_min[grid_min < disp_min_threshold] = disp_min_threshold
if disp_max_threshold is not None:
if np.any(grid_max > disp_max_threshold):
logging.warning(
"Override disp_max with disp_max_threshold {}".format(
disp_max_threshold
)
)
grid_max[grid_max > disp_max_threshold] = disp_max_threshold
# generate footprint
footprint_mask = create_circular_mask(filter_overlap, filter_overlap)
grid_min = minimum_filter(
grid_min, footprint=footprint_mask, mode="nearest"
)
grid_max = maximum_filter(
grid_max, footprint=footprint_mask, mode="nearest"
)
# Create xarray dataset
disp_range = generate_disp_grids_dataset(
grid_min,
grid_max,
saving_info,
raster_profile,
window=epipolar_grid_array_window,
row_coords=row_range_with_margin,
col_coords=col_range_with_margin,
)
# crop epipolar grid from margin added for propagation filter
disp_range = disp_range.sel(
row=list(row_range_no_margin), col=list(col_range_no_margin)
)
# Generate infos on global min and max
global_infos = cars_dict.CarsDict(
{
"global_min": np.floor(np.nanmin(grid_min)),
"global_max": np.ceil(np.nanmax(grid_max)),
}
)
cars_dataset.fill_dict(global_infos, saving_info=saving_info_global_infos)
return disp_range, global_infos
[docs]
def empty_disparity_grids( # pylint: disable=too-many-positional-arguments
row_range_no_margin,
col_range_no_margin,
epipolar_grid_array_window,
raster_profile,
saving_info,
saving_info_global_infos,
):
"""
Return empty disparity grids
:param row_range_no_margin: Rows id in grid
:type row_range_no_margin: int
:param col_range_no_margin: Cols id in grid
:type col_range_no_margin: int
:param epipolar_grid_array_window: ROI of grid
:type epipolar_grid_array_window: dict
:param raster_profile: The raster profile.
:type raster_profile: dict
:param saving_info: The disp range grid saving information.
:type saving_info: dict
:param saving_info_global_infos: Global info saving infos.
:type saving_info_global_infos: dict
"""
grid_min = np.empty((len(row_range_no_margin), len(col_range_no_margin)))
grid_max = np.empty((len(row_range_no_margin), len(col_range_no_margin)))
grid_min[:, :] = 0
grid_max[:, :] = 0
disp_range = generate_disp_grids_dataset(
grid_min,
grid_max,
saving_info,
raster_profile,
window=epipolar_grid_array_window,
row_coords=row_range_no_margin,
col_coords=col_range_no_margin,
)
# Generate infos on global min and max
global_infos = cars_dict.CarsDict(
{
"global_min": 0,
"global_max": 0,
}
)
cars_dataset.fill_dict(global_infos, saving_info=saving_info_global_infos)
return disp_range, global_infos
[docs]
def create_circular_mask(height, width):
"""
Create a circular mask for footprint around pixel
:param height: height of footprint
:type height: int
:param width: width of footprint
:type width: int
:return: mask representing circular footprint
:rtype: np.ndarray
"""
center = (int(width / 2), int(height / 2))
radius = min(center[0], center[1], width - center[0], height - center[1])
y_grid, x_grid = np.ogrid[:height, :width]
dist_from_center = np.sqrt(
(x_grid - center[0]) ** 2 + (y_grid - center[1]) ** 2
)
mask = dist_from_center <= radius
return mask.astype(bool)