Source code for cars.applications.auxiliary_filling.auxiliary_filling_algo

# !/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the AuxiliaryFillingFromSensors application class.
"""


import numpy as np
import rasterio as rio
from scipy import interpolate


[docs] def fill_auxiliary( # pylint: disable=too-many-positional-arguments sensor_inputs, pairing, longitudes, latitudes, altitudes, geom_plugin, number_of_color_bands, number_of_classification_bands, texture_bands, texture_interpolator, use_mask=False, ): """ Compute color and classification for a list of points (lon, lat, alt) using information from sensor images :param sensor_inputs: dictionary containing paths to input images and models :type sensor_inputs: dict :param pairing: pairing between input images :type pairing: list :param longitudes: list containing longitudes coordinates :type longitudes: list :param latitudes: list containing latitudes coordinates :type latitudes: list :param altitudes: list containing altitudes coordinates :type altitudes: list :param geom_plugin: geometry plugin used for inverse locations :type geom_plugin: AbstractGeometry :param number_of_color_bands: number of bands in the color image :type number_of_color_bands: int :param number_of_classification_bands: number of bands in the color image :type number_of_classification_bands: int :param texture_bands: list of band names used for output texture :type texture_bands: list :param texture_interpolator: scipy interpolator use to interpolate color values :type texture_interpolator: str :param use_mask: use mask information from sensors in color computation :type use_mask: bool """ filled_color = np.zeros((number_of_color_bands, len(altitudes))) filled_classif = None if number_of_classification_bands: filled_classif = np.zeros( (number_of_classification_bands, len(altitudes)), dtype=bool ) weights = np.zeros(len(altitudes)) full_weights = np.zeros(len(altitudes)) all_values = np.zeros((number_of_color_bands, len(altitudes))) for pair in pairing: first_sensor = sensor_inputs.get(pair[0]) second_sensor = sensor_inputs.get(pair[1]) # if first sensor has been filtered, use the second sensor instead if first_sensor is None: first_sensor = second_sensor second_sensor = None # process first sensor if first_sensor is not None: not_interpolated_mask, all_values_sensor = fill_from_one_sensor( first_sensor, filled_color, filled_classif, weights, longitudes, latitudes, altitudes, geom_plugin, number_of_color_bands, number_of_classification_bands, texture_bands, texture_interpolator, not_interpolated_mask=None, use_mask=use_mask, return_all_points=True, ) if all_values_sensor is not None: all_values_sensor_mask = ~np.isnan(all_values_sensor) all_values[all_values_sensor_mask] += all_values_sensor[ all_values_sensor_mask ] full_weights[np.any(all_values_sensor_mask[0, :], axis=0)] += 1 # process second sensor if second_sensor is not None: fill_from_one_sensor( second_sensor, filled_color, filled_classif, weights, longitudes, latitudes, altitudes, geom_plugin, number_of_color_bands, number_of_classification_bands, texture_bands, texture_interpolator, not_interpolated_mask, use_mask=use_mask, return_all_points=False, ) interpolated_pixels = np.any(filled_color != 0, axis=0) filled_color[:, interpolated_pixels] /= weights[interpolated_pixels] if use_mask is True: full_interpolated_pixels = ~np.logical_or( np.any(np.isnan(all_values), axis=0), interpolated_pixels ) filled_color[:, full_interpolated_pixels] = ( all_values[:, full_interpolated_pixels] / full_weights[full_interpolated_pixels] ) return filled_color, filled_classif
[docs] def fill_from_one_sensor( # pylint: disable=too-many-positional-arguments # noqa C901 sensor, filled_color, filled_classif, weights, longitudes, latitudes, altitudes, geom_plugin, number_of_color_bands, number_of_classification_bands, texture_bands, texture_interpolator, not_interpolated_mask=None, use_mask=False, return_all_points=False, ): """ Compute color and classification contribution for a list of points (lon, lat, alt) using information from a sensor image :param sensor: dictionary containing paths to input images and model :type sensor: dict :param filled_color: array containing (non normalized) color information :type filled_color: numpy.ndarray :param filled_classif: array containing classification information :type filled_classif: numpy.array :param weights: array containing weight for normalization :type weights: numpy.array :param longitudes: list containing longitudes coordinates :type longitudes: list :param latitudes: list containing latitudes coordinates :type latitudes: list :param altitudes: list containing altitudes coordinates :type altitudes: list :param geom_plugin: geometry plugin used for inverse locations :type geom_plugin: AbstractGeometry :param number_of_color_bands: number of bands in the color image :type number_of_color_bands: int :param number_of_classification_bands: number of bands in the color image :type number_of_classification_bands: int :param texture_bands: list of band names used for output texture :type texture_bands: list :param texture_interpolator: scipy interpolator use to interpolate color values :type texture_interpolator: str :param not_interpolated_mask: use mask information in color computation :type not_interpolated_mask: numpy.array :param use_mask: use mask information in color computation :type use_mask: bool :param return_all_points: compute interpolated values for all points :type return_all_points: bool """ # Check if the sensor has color or classification reference_sensor_image = sensor["image"]["bands"]["b0"]["path"] output_not_interpolated_mask = np.ones(len(altitudes), dtype=bool) all_values = np.zeros((number_of_color_bands, len(altitudes))) # No filling information for this sensor, return if reference_sensor_image is None: return output_not_interpolated_mask, all_values # read metadata with rio.open(reference_sensor_image) as reference_image: sensor_height = reference_image.height sensor_width = reference_image.width # sensors physical positions ( ind_cols_sensor, ind_rows_sensor, _, ) = geom_plugin.inverse_loc( reference_sensor_image, sensor["geomodel"], latitudes, longitudes, altitudes, ) # Compute col and row bounds min_rows = np.min(ind_rows_sensor) max_rows = np.max(ind_rows_sensor) min_cols = np.min(ind_cols_sensor) max_cols = np.max(ind_cols_sensor) # Check for out of bound coordinates if ( min_rows > sensor_height or max_rows < 0 or min_cols > sensor_width or max_cols < 0 ): return output_not_interpolated_mask, all_values if texture_interpolator in ("linear", "nearest"): texture_interpolator_margin = 1 elif texture_interpolator == "cubic": texture_interpolator_margin = 3 else: raise RuntimeError(f"Invalid interpolator {texture_interpolator}") # Classification interpolator is always nearest classif_interpolator = "nearest" classif_interpolator_margin = 1 validity_mask = not_interpolated_mask if use_mask and sensor.get("mask"): with rio.open(sensor["mask"]) as sensor_mask_image: first_row = np.floor(max(min_rows - classif_interpolator_margin, 0)) last_row = np.ceil( min( max_rows + classif_interpolator_margin, sensor_mask_image.height, ) ) first_col = np.floor(max(min_cols - classif_interpolator_margin, 0)) last_col = np.ceil( min( max_cols + classif_interpolator_margin, sensor_mask_image.width, ) ) rio_window = rio.windows.Window.from_slices( (first_row, last_row), (first_col, last_col), ) sensor_points = ( np.arange(first_row, last_row), np.arange(first_col, last_col), ) sensor_data = sensor_mask_image.read(1, window=rio_window) validity_mask = np.logical_not( interpolate.interpn( sensor_points, sensor_data, (ind_rows_sensor, ind_cols_sensor), bounds_error=False, fill_value=1, method=classif_interpolator, ) ) if not_interpolated_mask is not None: validity_mask = np.logical_and( validity_mask, not_interpolated_mask ) if sensor.get("image"): # Only fill color if all texture bands are present if all( band_name in sensor["image"]["bands"] for band_name in texture_bands ): with rio.open( sensor["image"]["bands"]["b0"]["path"] ) as sensor_color_image: first_row = np.floor( max( np.min(ind_rows_sensor) - texture_interpolator_margin, 0 ) ) last_row = np.ceil( min( np.max(ind_rows_sensor) + texture_interpolator_margin, sensor_color_image.height, ) ) first_col = np.floor( max( np.min(ind_cols_sensor) - texture_interpolator_margin, 0 ) ) last_col = np.ceil( min( np.max(ind_cols_sensor) + texture_interpolator_margin, sensor_color_image.width, ) ) rio_window = rio.windows.Window.from_slices( (first_row, last_row), (first_col, last_col), ) sensor_points = ( np.arange(first_row, last_row), np.arange(first_col, last_col), ) if validity_mask is not None: interpolated_mask = validity_mask else: interpolated_mask = np.ones(len(altitudes), dtype=bool) for output_band, band_name in enumerate(texture_bands): # rio band convention sensor_file = rio.open( sensor["image"]["bands"][band_name]["path"] ) input_band = sensor["image"]["bands"][band_name]["band"] sensor_data = sensor_file.read( input_band + 1, window=rio_window ) if validity_mask is not None: if return_all_points is True: all_values[output_band, :] = interpolate.interpn( sensor_points, sensor_data, (ind_rows_sensor, ind_cols_sensor), bounds_error=False, method=texture_interpolator, ) band_values = all_values[output_band, validity_mask] # No need to interpolate on every points else: band_values = interpolate.interpn( sensor_points, sensor_data, ( ind_rows_sensor[validity_mask], ind_cols_sensor[validity_mask], ), bounds_error=False, method=texture_interpolator, ) nan_values = np.isnan(band_values) interpolated_mask[validity_mask] = np.logical_or( interpolated_mask[validity_mask], ~nan_values ) filled_color[output_band, interpolated_mask] += band_values else: band_values = interpolate.interpn( sensor_points, sensor_data, (ind_rows_sensor, ind_cols_sensor), bounds_error=False, method=texture_interpolator, ) interpolated_mask = np.logical_or( interpolated_mask, ~np.isnan(band_values) ) filled_color[output_band, interpolated_mask] += band_values output_not_interpolated_mask = ~interpolated_mask weights[interpolated_mask] += 1 if filled_classif is not None and sensor.get("classification"): if number_of_classification_bands == len( sensor["classification"]["values"] ): with rio.open( sensor["classification"]["path"] ) as sensor_classif_image: first_row = np.floor( max( np.min(ind_rows_sensor) - classif_interpolator_margin, 0 ) ) last_row = np.ceil( min( np.max(ind_rows_sensor) + classif_interpolator_margin, sensor_classif_image.height, ) ) first_col = np.floor( max( np.min(ind_cols_sensor) - classif_interpolator_margin, 0 ) ) last_col = np.ceil( min( np.max(ind_cols_sensor) + classif_interpolator_margin, sensor_classif_image.width, ) ) rio_window = rio.windows.Window.from_slices( (first_row, last_row), (first_col, last_col), ) sensor_points = ( np.arange(first_row, last_row), np.arange(first_col, last_col), ) classif_data = rio.open(sensor["classification"]["path"]).read( 1, window=rio_window ) for output_band, value in enumerate( sensor["classification"]["values"] ): binary_band_data = classif_data == value filled_classif[output_band, :] = np.logical_or( filled_classif[output_band, :], interpolate.interpn( sensor_points, binary_band_data, (ind_rows_sensor, ind_cols_sensor), bounds_error=False, method=classif_interpolator, ), ) return output_not_interpolated_mask, all_values