#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Resampling module:
contains functions used for epipolar resampling
"""
# Standard imports
import math
# Third party imports
import numpy as np
import rasterio as rio
import resample as cresample
from rasterio.windows import Window, bounds
from cars.conf import mask_cst as msk_cst
# CARS imports
from cars.core import constants as cst
from cars.core import datasets, inputs, tiling
from cars.core.geometry import abstract_geometry
from cars.data_structures import cars_dataset
[docs]def epipolar_rectify_images(
img1,
img2,
grid1,
grid2,
region,
margins,
epipolar_size_x,
epipolar_size_y,
interpolator_image="bicubic",
interpolator_color="bicubic",
interpolator_classif="nearest",
interpolator_mask="nearest",
step=None,
color1=None,
mask1=None,
mask2=None,
classif1=None,
classif2=None,
nodata1=0,
nodata2=0,
add_color=True,
add_classif=True,
):
"""
Resample left and right images, with color on left
"""
# Force region to be float
region = [int(x) for x in region]
# Apply margins to left image
# TODO: tiled region should be given in parameter
# TODO: keep only rectification here (keep functional unitary approach)
left_region = region.copy()
left_margins = margins["left_margin"].data
left_roi = tiling.crop(
left_region, [0, 0, epipolar_size_x, epipolar_size_y]
)
left_region = tiling.crop(
tiling.pad(left_region, left_margins),
[0, 0, epipolar_size_x, epipolar_size_y],
)
left_margins = margins["left_margin"].data
# Get actual margin taking cropping into account
left_margins[0] = left_region[0] - left_roi[0]
left_margins[1] = left_region[1] - left_roi[1]
left_margins[2] = left_region[2] - left_roi[2]
left_margins[3] = left_region[3] - left_roi[3]
# Apply margins to right image
right_region = region.copy()
right_margins = margins["right_margin"].data
right_roi = tiling.crop(
right_region, [0, 0, epipolar_size_x, epipolar_size_y]
)
right_region = tiling.crop(
tiling.pad(right_region, right_margins),
[0, 0, epipolar_size_x, epipolar_size_y],
)
# Get actual margin taking cropping into account
right_margins[0] = right_region[0] - right_roi[0]
right_margins[1] = right_region[1] - right_roi[1]
right_margins[2] = right_region[2] - right_roi[2]
right_margins[3] = right_region[3] - right_roi[3]
# Resample left image
left_img_transform = inputs.rasterio_get_transform(img1)
left_dataset = resample_image(
img1,
grid1,
[epipolar_size_x, epipolar_size_y],
step=step,
region=left_region,
nodata=nodata1,
mask=mask1,
interpolator_img=interpolator_image,
interpolator_mask=interpolator_mask,
img_transform=left_img_transform,
)
# Update attributes
left_dataset.attrs[cst.ROI] = np.array(left_roi)
left_dataset.attrs[cst.ROI_WITH_MARGINS] = np.array(left_region)
# Remove region key as it duplicates roi_with_margins key
# left_dataset.attrs.pop("region", None)
left_dataset.attrs[cst.EPI_MARGINS] = np.array(left_margins)
if "disp_min" in margins.attrs:
left_dataset.attrs[cst.EPI_DISP_MIN] = margins.attrs["disp_min"]
if "disp_max" in margins.attrs:
left_dataset.attrs[cst.EPI_DISP_MAX] = margins.attrs["disp_max"]
# Resample right image
right_img_transform = inputs.rasterio_get_transform(img2)
right_dataset = resample_image(
img2,
grid2,
[epipolar_size_x, epipolar_size_y],
step=step,
region=right_region,
nodata=nodata2,
mask=mask2,
interpolator_img=interpolator_image,
interpolator_mask=interpolator_mask,
img_transform=right_img_transform,
)
# Update attributes
right_dataset.attrs[cst.ROI] = np.array(right_roi)
right_dataset.attrs[cst.ROI_WITH_MARGINS] = np.array(right_region)
# Remove region key as it duplicates roi_with_margins key
# right_dataset.attrs.pop("region", None)
right_dataset.attrs[cst.EPI_MARGINS] = np.array(right_margins)
if "disp_min" in margins.attrs:
right_dataset.attrs[cst.EPI_DISP_MIN] = margins.attrs["disp_min"]
if "disp_max" in margins.attrs:
right_dataset.attrs[cst.EPI_DISP_MAX] = margins.attrs["disp_max"]
left_color_dataset = None
if add_color:
# Build rectification pipeline for color image, and build datasets
if color1 is None:
color1 = img1
if inputs.rasterio_get_size(color1) == inputs.rasterio_get_size(img1):
left_color_dataset = resample_image(
color1,
grid1,
[epipolar_size_x, epipolar_size_y],
region=left_region,
band_coords=cst.BAND_IM,
interpolator_img=interpolator_color,
interpolator_mask=interpolator_mask,
img_transform=left_img_transform,
)
else:
raise RuntimeError(
"The image and the color "
"haven't the same sizes "
"{} != {}".format(
inputs.rasterio_get_size(color1),
inputs.rasterio_get_size(img1),
)
)
# resample the mask images
left_classif_dataset = None
right_classif_dataset = None
if add_classif:
if classif1:
left_classif_dataset = resample_image(
classif1,
grid1,
[epipolar_size_x, epipolar_size_y],
region=left_region,
band_coords=cst.BAND_CLASSIF,
interpolator_img=interpolator_classif,
interpolator_mask=interpolator_mask,
img_transform=left_img_transform,
)
if classif2:
right_classif_dataset = resample_image(
classif2,
grid2,
[epipolar_size_x, epipolar_size_y],
region=right_region,
band_coords=cst.BAND_CLASSIF,
interpolator_img=interpolator_classif,
interpolator_mask=interpolator_mask,
img_transform=right_img_transform,
)
return (
left_dataset,
right_dataset,
left_color_dataset,
left_classif_dataset,
right_classif_dataset,
)
[docs]def resample_image(
img,
grid,
largest_size,
step=None,
region=None,
nodata=None,
mask=None,
band_coords=False,
interpolator_img="bicubic",
interpolator_mask="nearest",
img_transform=None,
):
"""
Resample image according to grid and largest size.
:param img: Path to the image to resample
:type img: string
:param grid: Path to the rectification grid
:type grid: string
:param largest_size: Size of full output image
:type largest_size: list of two int
:param step: horizontal step of resampling (useful for strip resampling)
:type step: int
:param region: A subset of the output image to produce
:type region: None (full output is produced) or array of four floats
[xmin,ymin,xmax,ymax]
:param nodata: Nodata value to use (both for input and output)
:type nodata: None or float
:param mask: Mask to resample as well
:type mask: None or path to mask image
:param band_coords: Force bands coordinate in output dataset
:type band_coords: boolean
:param interpolator: interpolator type (bicubic (default) or nearest)
:type interpolator: str ("nearest" "linear" "bco")
:rtype: xarray.Dataset with resampled image and mask
"""
# Handle region is None
if region is None:
region = [0, 0, largest_size[0], largest_size[1]]
else:
region = [
int(math.floor(region[0])),
int(math.floor(region[1])),
int(math.ceil(region[2])),
int(math.ceil(region[3])),
]
if img_transform is None:
img_transform = inputs.rasterio_get_transform(img)
# Convert largest_size to int if needed
largest_size = [int(x) for x in largest_size]
# Get path if grid is of type CarsDataset TODO remove
if isinstance(grid, cars_dataset.CarsDataset):
grid = grid.attributes["path"]
# Localize blocks of the tile to resample
if step is None:
step = region[2] - region[0]
xmin_of_blocks = np.arange(region[0], region[2], step)
xmax_of_blocks = np.append(
np.arange(region[0] + step, region[2], step), region[2]
)
ymin = region[1]
ymax = region[3]
# Initialize outputs of the entire tile
nb_bands = inputs.rasterio_get_nb_bands(img)
resamp = np.empty((nb_bands, region[3] - region[1], 0), dtype=np.float32)
if nodata is not None or mask is not None:
msk = np.empty((1, region[3] - region[1], 0), dtype=np.float32)
else:
msk = None
with rio.open(grid) as grid_reader, rio.open(img) as img_reader:
for xmin, xmax in zip(xmin_of_blocks, xmax_of_blocks): # noqa: B905
block_region = [xmin, ymin, xmax, ymax]
# Build rectification pipelines for images
res_x, res_y = grid_reader.res
assert res_x == res_y
oversampling = int(res_x)
assert res_x == oversampling
grid_origin_x = grid_reader.transform[2]
grid_origin_y = grid_reader.transform[5]
assert grid_origin_x == grid_origin_y
grid_margin = -grid_origin_x / oversampling - 0.5
assert grid_margin == int(grid_margin)
grid_margin = int(grid_margin)
# Convert resampled region to grid region with oversampling
grid_region = np.array(
[
math.floor(xmin / oversampling),
math.floor(ymin / oversampling),
math.ceil(xmax / oversampling),
math.ceil(ymax / oversampling),
]
)
# Out region of epipolar image
out_region = oversampling * grid_region
# Grid region
grid_region += grid_margin
grid_window = Window.from_slices(
(grid_region[1], grid_region[3] + 1),
(grid_region[0], grid_region[2] + 1),
)
grid_as_array = grid_reader.read(window=grid_window)
grid_as_array = grid_as_array.astype(np.float32)
grid_as_array = grid_as_array.astype(np.float64)
# get needed source bounding box
left = math.floor(np.amin(grid_as_array[0, ...]))
right = math.ceil(np.amax(grid_as_array[0, ...]))
top = math.floor(np.amin(grid_as_array[1, ...]))
bottom = math.ceil(np.amax(grid_as_array[1, ...]))
transform = rio.Affine(*np.abs(img_transform))
# transform xmin and xmax positions to index
(top, bottom, left, right) = (
abstract_geometry.min_max_to_index_min_max(
left, right, top, bottom, transform
)
)
# filter margin for bicubic = 2
filter_margin = 2
top -= filter_margin
bottom += filter_margin
left -= filter_margin
right += filter_margin
left, right = list(np.clip([left, right], 0, img_reader.shape[0]))
top, bottom = list(np.clip([top, bottom], 0, img_reader.shape[1]))
img_window = Window.from_slices([left, right], [top, bottom])
in_sensor = True
if right - left == 0 or bottom - top == 0:
in_sensor = False
# round window
img_window = img_window.round_offsets()
img_window = img_window.round_lengths()
# Compute offset
res_x = float(abs(transform[0]))
res_y = float(abs(transform[4]))
tile_bounds = list(bounds(img_window, transform))
x_offset = min(tile_bounds[0], tile_bounds[2])
y_offset = min(tile_bounds[1], tile_bounds[3])
if in_sensor:
# Get sensor data
img_as_array = img_reader.read(window=img_window)
# shift grid regarding the img extraction
grid_as_array[0, ...] -= x_offset
grid_as_array[1, ...] -= y_offset
# apply input resolution
grid_as_array[0, ...] /= res_x
grid_as_array[1, ...] /= res_y
block_resamp = cresample.grid(
img_as_array,
grid_as_array,
oversampling,
interpolator=interpolator_img,
nodata=0,
).astype(np.float32)
if (
interpolator_img == "bicubic"
and band_coords == cst.BAND_CLASSIF
):
block_resamp = np.where(
block_resamp >= 0.5,
1,
np.where(block_resamp < 0.5, 0, block_resamp),
).astype(int)
# extract exact region
ext_region = block_region - out_region
block_resamp = block_resamp[
...,
ext_region[1] : ext_region[3] - 1,
ext_region[0] : ext_region[2] - 1,
]
else:
block_resamp = np.zeros(
(
img_reader.count,
block_region[3] - block_region[1],
block_region[2] - block_region[0],
)
)
resamp = np.concatenate((resamp, block_resamp), axis=2)
# create msk
if nodata is not None or mask is not None:
if in_sensor:
# get mask in source geometry
nodata_index = img_as_array == nodata
if mask is not None:
with rio.open(mask) as msk_reader:
msk_as_array = msk_reader.read(window=img_window)
else:
msk_as_array = np.zeros(img_as_array.shape)
nodata_msk = msk_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION
msk_as_array[nodata_index] = nodata_msk
# resample mask
block_msk = cresample.grid(
msk_as_array,
grid_as_array,
oversampling,
interpolator=interpolator_mask,
nodata=nodata_msk,
)
if interpolator_mask == "bicubic":
block_msk = np.where(
block_msk >= 0.5,
1,
np.where(block_msk < 0.5, 0, block_msk),
).astype(int)
block_msk = block_msk[
...,
ext_region[1] : ext_region[3] - 1,
ext_region[0] : ext_region[2] - 1,
]
else:
nodata_msk = msk_cst.NO_DATA_IN_EPIPOLAR_RECTIFICATION
block_msk = np.full(
(
1,
block_region[3] - block_region[1],
block_region[2] - block_region[0],
),
fill_value=nodata_msk,
)
msk = np.concatenate((msk, block_msk), axis=2)
dataset = datasets.create_im_dataset(
resamp, region, largest_size, img, band_coords, msk
)
return dataset