Source code for cars.applications.resampling.resampling_algo

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Resampling module:
contains functions used for epipolar resampling
"""

# Standard imports
import math

# Third party imports
import numpy as np
import rasterio as rio
import resample as cresample
from rasterio.windows import Window, bounds

from cars.conf import mask_cst as msk_cst

# CARS imports
from cars.core import constants as cst
from cars.core import datasets, inputs, tiling
from cars.core.geometry import abstract_geometry


[docs] def epipolar_rectify_images( # pylint: disable=too-many-positional-arguments left_imgs, right_imgs, grid1, grid2, region, margins, epipolar_size_x, epipolar_size_y, interpolator_image="bicubic", interpolator_classif="nearest", interpolator_mask="nearest", interpolators_edges=None, step=None, mask1=None, mask2=None, edges1=None, edges2=None, left_classifs=None, right_classifs=None, nodata1=0, nodata2=0, add_classif=True, add_edges=True, ): """ Resample left and right images """ if interpolators_edges is None: interpolators_edges = { "edges_mask": "nearest", "depth_map": "nearest", "normals": "nearest", "tile_id": "nearest", } # Force region to be float region = [int(x) for x in region] # Apply margins to left image # TODO: tiled region should be given in parameter # TODO: keep only rectification here (keep functional unitary approach) left_region = region.copy() left_margins = margins["left_margin"].data left_roi = tiling.crop( left_region, [0, 0, epipolar_size_x, epipolar_size_y] ) left_region = tiling.crop( tiling.pad(left_region, left_margins), [0, 0, epipolar_size_x, epipolar_size_y], ) left_margins = margins["left_margin"].data # Get actual margin taking cropping into account left_margins[0] = left_region[0] - left_roi[0] left_margins[1] = left_region[1] - left_roi[1] left_margins[2] = left_region[2] - left_roi[2] left_margins[3] = left_region[3] - left_roi[3] # Apply margins to right image right_region = region.copy() right_margins = margins["right_margin"].data right_roi = tiling.crop( right_region, [0, 0, epipolar_size_x, epipolar_size_y] ) right_region = tiling.crop( tiling.pad(right_region, right_margins), [0, 0, epipolar_size_x, epipolar_size_y], ) # Get actual margin taking cropping into account right_margins[0] = right_region[0] - right_roi[0] right_margins[1] = right_region[1] - right_roi[1] right_margins[2] = right_region[2] - right_roi[2] right_margins[3] = right_region[3] - right_roi[3] # Resample left images left_img_transform = inputs.rasterio_get_transform( next(iter(left_imgs)), convention="north" ) left_dataset = resample_image( left_imgs, grid1, [epipolar_size_x, epipolar_size_y], step=step, region=left_region, nodata=nodata1, mask=mask1, band_coords=cst.BAND_IM, interpolator_img=interpolator_image, interpolator_mask=interpolator_mask, img_transform=left_img_transform, ) # Update attributes left_dataset.attrs[cst.ROI] = np.array(left_roi) left_dataset.attrs[cst.ROI_WITH_MARGINS] = np.array(left_region) # Remove region key as it duplicates roi_with_margins key # left_dataset.attrs.pop("region", None) left_dataset.attrs[cst.EPI_MARGINS] = np.array(left_margins) if "disp_min" in margins.attrs: left_dataset.attrs[cst.EPI_DISP_MIN] = margins.attrs["disp_min"] if "disp_max" in margins.attrs: left_dataset.attrs[cst.EPI_DISP_MAX] = margins.attrs["disp_max"] # Resample right image right_img_transform = inputs.rasterio_get_transform(next(iter(right_imgs))) right_dataset = resample_image( right_imgs, grid2, [epipolar_size_x, epipolar_size_y], step=step, region=right_region, nodata=nodata2, mask=mask2, band_coords=cst.BAND_IM, interpolator_img=interpolator_image, interpolator_mask=interpolator_mask, img_transform=right_img_transform, ) # Update attributes right_dataset.attrs[cst.ROI] = np.array(right_roi) right_dataset.attrs[cst.ROI_WITH_MARGINS] = np.array(right_region) # Remove region key as it duplicates roi_with_margins key # right_dataset.attrs.pop("region", None) right_dataset.attrs[cst.EPI_MARGINS] = np.array(right_margins) if "disp_min" in margins.attrs: right_dataset.attrs[cst.EPI_DISP_MIN] = margins.attrs["disp_min"] if "disp_max" in margins.attrs: right_dataset.attrs[cst.EPI_DISP_MAX] = margins.attrs["disp_max"] # Resample classifications left_classif_dataset = None right_classif_dataset = None if add_classif: if left_classifs: left_classif_dataset = resample_image( left_classifs, grid1, [epipolar_size_x, epipolar_size_y], region=left_region, band_coords=cst.BAND_CLASSIF, interpolator_img=interpolator_classif, interpolator_mask=interpolator_mask, img_transform=left_img_transform, ) if right_classifs: right_classif_dataset = resample_image( right_classifs, grid2, [epipolar_size_x, epipolar_size_y], region=right_region, band_coords=cst.BAND_CLASSIF, interpolator_img=interpolator_classif, interpolator_mask=interpolator_mask, img_transform=right_img_transform, ) # Resample edges left_edges_datasets = { "edges_mask": None, "depth_map": None, "normals": None, "tile_id": None, } right_edges_datasets = { "edges_mask": None, "depth_map": None, "normals": None, "tile_id": None, } if add_edges: band_format_edges1 = ( to_band_format(edges1) if edges1 is not None else None ) band_format_edges2 = ( to_band_format(edges2) if edges2 is not None else None ) edge_sources = [ (band_format_edges1, left_edges_datasets), (band_format_edges2, right_edges_datasets), ] for key, band_name in [ ("edges_mask", cst.BAND_EDGES_MASK), ("depth_map", cst.BAND_EDGES_DEPTH_MAP), ("normals", cst.BAND_EDGES_NORMALS), ("tile_id", cst.BAND_EDGES_TILE_ID), ]: for band_format, target_dataset in edge_sources: if band_format and band_format.get(key): target_dataset[key] = resample_image( band_format[key], grid1, [epipolar_size_x, epipolar_size_y], region=left_region, band_coords=band_name, interpolator_img=interpolators_edges[key], interpolator_mask=interpolator_mask, img_transform=left_img_transform, ) return ( left_dataset, right_dataset, left_classif_dataset, right_classif_dataset, left_edges_datasets, right_edges_datasets, )
# pylint: disable=too-many-positional-arguments
[docs] def resample_image( # noqa: C901 imgs, grid, largest_size, step=None, region=None, nodata=None, mask=None, band_coords=False, interpolator_img="bicubic", interpolator_mask="nearest", img_transform=None, ): """ Resample image according to grid and largest size. :param img: Path to the image to resample :type img: string :param grid: rectification grid dict :type grid: dict :param largest_size: Size of full output image :type largest_size: list of two int :param step: horizontal step of resampling (useful for strip resampling) :type step: int :param region: A subset of the output image to produce :type region: None (full output is produced) or array of four floats [xmin,ymin,xmax,ymax] :param nodata: Nodata value to use (both for input and output) :type nodata: None or float :param mask: Mask to resample as well :type mask: None or path to mask image :param band_coords: Force bands coordinate in output dataset :type band_coords: boolean :param interpolator: interpolator type (bicubic (default) or nearest) :type interpolator: str ("nearest" "linear" "bco") :rtype: xarray.Dataset with resampled image and mask """ img_sample = next(iter(imgs)) # Handle region is None if region is None: region = [0, 0, largest_size[0], largest_size[1]] else: region = [ int(math.floor(region[0])), int(math.floor(region[1])), int(math.ceil(region[2])), int(math.ceil(region[3])), ] if img_transform is None: img_transform = inputs.rasterio_get_transform( img_sample, convention="north" ) # Convert largest_size to int if needed largest_size = [int(x) for x in largest_size] # Localize blocks of the tile to resample if step is None: step = region[2] - region[0] xmin_of_blocks = np.arange(region[0], region[2], step) xmax_of_blocks = np.append( np.arange(region[0] + step, region[2], step), region[2] ) ymin_of_blocks = np.arange(region[1], region[3], step) ymax_of_blocks = np.append( np.arange(region[1] + step, region[3], step), region[3] ) resampled_images_list = [] resampled_masks_list = [] band_names = [] data_types = [] nodata_msk = msk_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION for img in imgs: bands = imgs[img] if "band_id" in bands: nb_bands = len(bands["band_id"]) elif "values" in bands: nb_bands = len(bands["values"]) else: raise ValueError( "File {} not recognised as an image or classif file".format( bands ) ) # Initialize outputs of the entire tile resamp = np.empty( (nb_bands, region[3] - region[1], region[2] - region[0]), dtype=np.float32, ) msk = np.empty( (nb_bands, region[3] - region[1], region[2] - region[0]), dtype=np.float32, ) ystart = 0 with rio.open(grid["path"]) as grid_reader, rio.open(img) as img_reader: for ymin, ymax in zip(ymin_of_blocks, ymax_of_blocks): # noqa: B905 ysize = ymax - ymin xstart = 0 for xmin, xmax in zip( # noqa: B905 xmin_of_blocks, xmax_of_blocks ): block_region = [xmin, ymin, xmax, ymax] xsize = xmax - xmin resamp, msk = oversampling_func( grid_reader, img_reader, img_transform, block_region, interpolator_img, band_coords, nb_bands, bands, resamp, nodata, msk, mask, nodata_msk, interpolator_mask, ysize, xsize, ystart, xstart, ) xstart += xsize ystart += ysize if "band_id" in bands: band_names += bands["band_name"] elif "values" in bands: band_names += list(map(str, bands["values"])) else: raise ValueError( "File {} not recognised as an image or classif file".format( bands ) ) data_types += [inputs.rasterio_get_image_type(img)] * nb_bands resampled_images_list.append(resamp) resampled_masks_list.append(msk) resamp_final = np.concatenate(resampled_images_list, axis=0) msk_final = np.concatenate(resampled_masks_list, axis=0) dataset = datasets.create_im_dataset( resamp_final, region, largest_size, img_sample, band_coords, band_names, data_types, msk_final, ) return dataset
[docs] def oversampling_func( # pylint: disable=too-many-positional-arguments grid_reader, img_reader, img_transform, block_region, interpolator_img, band_coords, nb_bands, bands, resamp, nodata, msk, mask, nodata_msk, interpolator_mask, ysize, xsize, ystart, xstart, ): """ Do the resampling calculus """ xmin = block_region[0] ymin = block_region[1] xmax = block_region[2] ymax = block_region[3] # Build rectification pipelines for images res_x, res_y = grid_reader.res assert res_x == res_y oversampling = int(res_x) assert res_x == oversampling grid_origin_x = grid_reader.transform[2] grid_origin_y = grid_reader.transform[5] assert grid_origin_x == grid_origin_y grid_margin = int(-grid_origin_x / oversampling - 0.5) grid_margin = int(grid_margin) # Convert resampled region to grid region with oversampling grid_region = np.array( [ math.floor(xmin / oversampling), math.floor(ymin / oversampling), math.ceil(xmax / oversampling), math.ceil(ymax / oversampling), ] ) # Out region of epipolar image out_region = oversampling * grid_region # Grid region grid_region += grid_margin grid_window = Window.from_slices( (grid_region[1], grid_region[3] + 1), (grid_region[0], grid_region[2] + 1), ) grid_as_array = grid_reader.read(window=grid_window) grid_as_array = grid_as_array.astype(np.float32) grid_as_array = grid_as_array.astype(np.float64) # get needed source bounding box left = math.floor(np.amin(grid_as_array[0, ...])) right = math.ceil(np.amax(grid_as_array[0, ...])) top = math.floor(np.amin(grid_as_array[1, ...])) bottom = math.ceil(np.amax(grid_as_array[1, ...])) transform = rio.Affine(*np.abs(img_transform)) # transform xmin and xmax positions to index (top, bottom, left, right) = abstract_geometry.min_max_to_index_min_max( left, right, top, bottom, transform ) # filter margin for bicubic = 2 filter_margin = 2 top -= filter_margin bottom += filter_margin left -= filter_margin right += filter_margin left, right = list(np.clip([left, right], 0, img_reader.shape[0])) top, bottom = list(np.clip([top, bottom], 0, img_reader.shape[1])) img_window = Window.from_slices([left, right], [top, bottom]) in_sensor = True if right - left == 0 or bottom - top == 0: in_sensor = False # round window img_window = img_window.round_offsets() img_window = img_window.round_lengths() # compute offset res_x = float(abs(transform[0])) res_y = float(abs(transform[4])) tile_bounds = list(bounds(img_window, transform)) x_offset = min(tile_bounds[0], tile_bounds[2]) y_offset = min(tile_bounds[1], tile_bounds[3]) if in_sensor: # get sensor data if "band_id" in bands: # image img_as_array = img_reader.read(bands["band_id"], window=img_window) elif "values" in bands: # classification img_as_array = img_reader.read([1], window=img_window) else: raise ValueError( "File {} not recognised as an image or classif file".format( bands ) ) # get the nodata mask before blurring img_nan_mask = img_as_array == nodata # set the nodata values back if nodata is not None: img_as_array[img_nan_mask] = nodata # shift grid regarding the img extraction grid_as_array[0, ...] -= x_offset grid_as_array[1, ...] -= y_offset # apply input resolution grid_as_array[0, ...] /= res_x grid_as_array[1, ...] /= res_y block_resamp = cresample.grid( img_as_array, grid_as_array, oversampling, interpolator=interpolator_img, nodata=0, ).astype(np.float32) if interpolator_img == "bicubic" and band_coords == cst.BAND_CLASSIF: block_resamp = np.where( block_resamp >= 0.5, 1, np.where(block_resamp < 0.5, 0, block_resamp), ).astype(int) # extract exact region ext_region = block_region - out_region block_resamp = block_resamp[ ..., ext_region[1] : ext_region[3] - 1, ext_region[0] : ext_region[2] - 1, ] if "values" in bands: # Extract binary bands from mono-band classification multiband_block_resamp = np.zeros( ( nb_bands, block_region[3] - block_region[1], block_region[2] - block_region[0], ) ) for band_id, value in enumerate(bands["values"]): multiband_block_resamp[band_id] = block_resamp == value block_resamp = multiband_block_resamp else: block_resamp = np.zeros( ( nb_bands, block_region[3] - block_region[1], block_region[2] - block_region[0], ) ) resamp[:, ystart : ystart + ysize, xstart : xstart + xsize] = block_resamp # create msk if in_sensor: # get mask in source geometry if mask is not None: with rio.open(mask) as msk_reader: msk_as_array = msk_reader.read(1, window=img_window) msk_as_array = np.array([msk_as_array] * img_as_array.shape[0]) else: msk_as_array = np.zeros(img_as_array.shape) if nodata is not None: nodata_index = img_as_array == nodata msk_as_array[nodata_index] = nodata_msk # resample mask block_msk = cresample.grid( msk_as_array, grid_as_array, oversampling, interpolator=interpolator_mask, nodata=nodata_msk, ) if interpolator_mask == "bicubic": block_msk = np.where( block_msk >= 0.5, 1, np.where(block_msk < 0.5, 0, block_msk), ).astype(int) block_msk = block_msk[ ..., ext_region[1] : ext_region[3] - 1, ext_region[0] : ext_region[2] - 1, ] else: block_msk = np.full( ( nb_bands, block_region[3] - block_region[1], block_region[2] - block_region[0], ), fill_value=nodata_msk, ) msk[:, ystart : ystart + ysize, xstart : xstart + xsize] = block_msk return resamp, msk
[docs] def to_band_format(edges_dict): """ Helper function, that takes the edges input dict and transforms it into a band format image, so resampling can understand it """ band_format = edges_dict.copy() for key in edges_dict.keys(): if edges_dict[key] is not None: if key == "normals": band_format[key] = { edges_dict[key]: { "band_id": [1, 2, 3], "band_name": ["b0", "b1", "b2"], } } else: band_format[key] = { edges_dict[key]: { "band_id": [1], "band_name": ["b0"], } } return band_format