#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the bicubic_resampling application class.
"""
# pylint: disable=too-many-lines
# TODO refacto: factorize disributed code, and remove too-many-lines
# Standard imports
import logging
import os
from typing import Dict, Tuple
# Third party imports
import numpy as np
import xarray as xr
from json_checker import And, Checker, Or
from shapely.geometry import Polygon
# CARS imports
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.resampling import (
resampling_algo,
resampling_constants,
resampling_wrappers,
)
from cars.applications.resampling.abstract_resampling_app import Resampling
from cars.core import constants as cst
from cars.core import inputs, tiling
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset, format_transformation
from cars.data_structures.cars_dict import CarsDict
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
[docs]
class BicubicResampling(Resampling, short_name="bicubic"):
"""
BicubicResampling
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, conf=None):
"""
Init function of BicubicResampling
:param conf: configuration for resampling
:return: an application_to_use object
"""
super().__init__(conf=conf)
# check conf
self.used_method = self.used_config["method"]
self.strip_height = self.used_config["strip_height"]
self.step = self.used_config["step"]
# Saving bools
self.save_intermediate_data = self.used_config["save_intermediate_data"]
self.interpolator_image = self.used_config["interpolator_image"]
self.interpolator_classif = self.used_config["interpolator_classif"]
self.interpolator_mask = self.used_config["interpolator_mask"]
self.interpolators_edges = self.used_config["interpolators_edges"]
# Init orchestrator
self.orchestrator = None
[docs]
def check_conf(self, conf):
"""
Check configuration
:param conf: configuration to check
:type conf: dict
:return: overloaded configuration
:rtype: dict
"""
# init conf
if conf is not None:
overloaded_conf = conf.copy()
else:
conf = {}
overloaded_conf = {}
# Overload conf
# get rasterization parameter
overloaded_conf["method"] = conf.get("method", "bicubic")
overloaded_conf["strip_height"] = conf.get("strip_height", 60)
overloaded_conf["interpolator_image"] = conf.get(
"interpolator_image", "bicubic"
)
overloaded_conf["interpolator_classif"] = conf.get(
"interpolator_classif", "nearest"
)
overloaded_conf["interpolator_mask"] = conf.get(
"interpolator_mask", "nearest"
)
overloaded_conf["interpolators_edges"] = conf.get(
"interpolators_edges",
{
"edges_mask": "nearest",
"depth_map": "nearest",
"normals": "nearest",
"tile_id": "nearest",
},
)
overloaded_conf["step"] = conf.get("step", 500)
# Saving bools
overloaded_conf["save_intermediate_data"] = conf.get(
"save_intermediate_data", False
)
rectification_schema = {
"method": str,
"strip_height": And(int, lambda x: x > 0),
"interpolator_image": str,
"interpolator_classif": str,
"interpolator_mask": str,
"interpolators_edges": dict,
"step": Or(None, int),
"save_intermediate_data": bool,
}
# Check conf
checker = Checker(rectification_schema)
checker.validate(overloaded_conf)
# update edges interp dict and check its format
temp_interps_edges = {
"edges_mask": "nearest",
"depth_map": "nearest",
"normals": "nearest",
"tile_id": "nearest",
}
temp_interps_edges.update(overloaded_conf["interpolators_edges"])
overloaded_conf["interpolators_edges"] = temp_interps_edges
interpolators_schema = {
"edges_mask": str,
"depth_map": str,
"normals": str,
"tile_id": str,
}
checker = Checker(interpolators_schema)
checker.validate(overloaded_conf["interpolators_edges"])
return overloaded_conf
[docs]
def pre_run(
self,
grid_left,
tile_width,
tile_height,
):
"""
Pre run some computations : tiling grid
:param grid_left: left grid
:type grid_left: dict
:param optimum_tile_size: optimum tile size
:type optimum_tile_size: int
:return: epipolar_regions_grid, epipolar_regions,
opt_epipolar_tile_size, largest_epipolar_region,
"""
# Get largest epipolar regions from configuration file
largest_epipolar_region = [
0,
0,
grid_left["epipolar_size_x"],
grid_left["epipolar_size_y"],
]
origin = grid_left["grid_origin"]
spacing = grid_left["grid_spacing"]
logging.info(
"Size of epipolar image: {}".format(largest_epipolar_region)
)
logging.debug("Origin of epipolar grid: {}".format(origin))
logging.debug("Spacing of epipolar grid: {}".format(spacing))
if tile_width is None:
tile_width = grid_left["epipolar_size_x"]
if tile_height is None:
tile_height = self.strip_height
logging.info(
"Tile size for epipolar regions: "
"{width}x{height} pixels".format(
width=tile_width, height=tile_height
)
)
epipolar_regions_grid = tiling.generate_tiling_grid(
0,
0,
grid_left["epipolar_size_y"],
grid_left["epipolar_size_x"],
tile_height,
tile_width,
)
logging.info(
"Epipolar image will be processed in {} splits".format(
epipolar_regions_grid.shape[0] * epipolar_regions_grid.shape[1]
)
)
return (
epipolar_regions_grid,
tile_width,
tile_height,
largest_epipolar_region,
)
[docs]
def run( # pylint: disable=too-many-positional-arguments # noqa: C901
self,
sensor_image_left,
sensor_image_right,
grid_left,
grid_right,
geom_plugin,
orchestrator=None,
pair_folder=None,
pair_key="PAIR_0",
margins_fun=None,
tile_width=None,
tile_height=None,
add_classif=True,
add_edges=True,
epipolar_roi=None,
required_bands=None,
texture_bands=None,
):
"""
Run resampling application.
Creates left and right CarsDataset filled with xarray.Dataset,
corresponding to sensor images resampled in epipolar geometry.
:param sensor_images_left: tiled sensor left image
Dict Must contain keys : "image", "geomodel",
"no_data", "mask", "classification". Paths must be absolutes
:type sensor_images_left: CarsDataset
:param sensor_images_right: tiled sensor right image
Dict Must contain keys : "image", "geomodel",
"no_data", "mask", "classification". Paths must be absolutes
:type sensor_images_right: CarsDataset
:param grid_left: left epipolar grid
Grid dict contains :
- "grid_spacing", "grid_origin", \
"epipolar_size_x", "epipolar_size_y", "epipolar_origin_x",\
"epipolar_origin_y", epipolar_spacing_x",\
"epipolar_spacing", "disp_to_alt_ratio", "path"
:type grid_left: dict
:param grid_right: right epipolar grid. Grid dict contains :
- "grid_spacing", "grid_origin",\
"epipolar_size_x", "epipolar_size_y", "epipolar_origin_x",\
"epipolar_origin_y", epipolar_spacing_x",\
"epipolar_spacing", "disp_to_alt_ratio", "path"
:type grid_right: dict
:param orchestrator: orchestrator used
:param pair_folder: folder used for current pair
:type pair_folder: directory to save files to
:param pair_key: pair id
:type pair_key: str
:param margins_fun: margins function to use
:type margins_fun: fun
:param optimum_tile_size: optimum tile size to use
:type optimum_tile_size: int
:param tile_width: width of tile
:type tile_width: int
:param tile_height: height of tile
:type tile_height: int
:param add_classif: add classif to dataset
:type add_classif: bool
:param epipolar_roi: Epipolar roi to use if set.
Set None tiles outsize roi
:type epipolar_roi: list(int), [row_min, row_max, col_min, col_max]
:param required_bands: bands to resample on left and right image
:type required_bands: dict
:param texture_bands: name of bands used for output texture
:type texture_bands: list
:return: left epipolar image, right epipolar image. \
Each CarsDataset contains:
- N x M Delayed tiles. \
Each tile will be a future xarray Dataset containing:
- data with keys : "im", "msk", "classif"
- attrs with keys: "margins" with "disp_min" and "disp_max"\
"transform", "crs", "valid_pixels", "no_data_mask",
"no_data_img"
- attributes containing: \
"largest_epipolar_region","opt_epipolar_tile_size",
"disp_min_tiling", "disp_max_tiling"
:rtype: Tuple(CarsDataset, CarsDataset)
"""
# Default orchestrator
if orchestrator is None:
# Create default sequential orchestrator for current application
# be aware, no out_json will be shared between orchestrators
# No files saved
self.orchestrator = ocht.Orchestrator(
orchestrator_conf={"mode": "sequential"}
)
else:
self.orchestrator = orchestrator
if pair_folder is None:
pair_folder = os.path.join(self.orchestrator.out_dir, "tmp")
# Create zeros margins if not provided
if margins_fun is None:
def margins_fun( # pylint: disable=unused-argument
row_min, row_max, col_min, col_max
):
"""
Default margin function, returning zeros
"""
corner = ["left", "up", "right", "down"]
data = np.zeros(len(corner))
col = np.arange(len(corner))
margins = xr.Dataset(
{"left_margin": (["col"], data)}, coords={"col": col}
)
margins["right_margin"] = xr.DataArray(data, dims=["col"])
return margins
# Get grids and regions for current pair
(
epipolar_regions_grid,
tile_width,
tile_height,
largest_epipolar_region,
) = self.pre_run(
grid_left,
tile_width,
tile_height,
)
# Create CarsDataset
# Epipolar_images
epipolar_images_left = cars_dataset.CarsDataset(
"arrays", name="resampling_left_" + pair_key
)
epipolar_images_right = cars_dataset.CarsDataset(
"arrays", name="resampling_" + pair_key
)
# Compute tiling grid
epipolar_images_left.tiling_grid = epipolar_regions_grid
# Generate tiling grid
epipolar_images_right.tiling_grid = epipolar_regions_grid
# Compute overlaps
(
epipolar_images_left.overlaps,
epipolar_images_right.overlaps,
used_disp_min,
used_disp_max,
) = format_transformation.grid_margins_2_overlaps(
epipolar_images_left.tiling_grid, margins_fun
)
# add image type in attributes for future checking
if texture_bands is not None:
im_type = inputs.rasterio_get_image_type(
sensor_image_left[sens_cst.INPUT_IMG]["bands"][
texture_bands[0]
]["path"]
)
else:
im_type = inputs.rasterio_get_image_type(
sensor_image_left[sens_cst.INPUT_IMG]["bands"]["b0"]["path"]
)
# update attributes
epipolar_images_attributes = {
"largest_epipolar_region": largest_epipolar_region,
"tile_width": tile_width,
"tile_height": tile_height,
"disp_min_tiling": used_disp_min,
"disp_max_tiling": used_disp_max,
"image_type": im_type,
}
epipolar_images_left.attributes.update(epipolar_images_attributes)
epipolar_images_right.attributes.update(epipolar_images_attributes)
# Save objects
if self.save_intermediate_data:
safe_makedirs(pair_folder)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_left.tif"),
cst.EPI_IMAGE,
epipolar_images_left,
cars_ds_name="epi_img_left",
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_right.tif"),
cst.EPI_IMAGE,
epipolar_images_right,
cars_ds_name="epi_img_right",
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_left_mask.tif"),
cst.EPI_MSK,
epipolar_images_left,
cars_ds_name="epi_img_left_mask",
dtype=np.uint8,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_right_mask.tif"),
cst.EPI_MSK,
epipolar_images_right,
cars_ds_name="epi_img_right_mask",
dtype=np.uint8,
)
if self.save_intermediate_data and add_classif:
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_left_classif.tif"),
cst.EPI_CLASSIFICATION,
epipolar_images_left,
cars_ds_name="epi_img_left_classif",
dtype=np.uint8,
optional_data=True,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_right_classif.tif"),
cst.EPI_CLASSIFICATION,
epipolar_images_right,
cars_ds_name="epi_img_right_classif",
dtype=np.uint8,
optional_data=True,
)
if self.save_intermediate_data and add_edges:
# left edges data
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_left_edges_mask.tif"),
cst.EPI_EDGES_MASK,
epipolar_images_left,
cars_ds_name="epi_img_left_edges_mask",
dtype=np.uint8,
optional_data=True,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_left_edges_depth_map.tif"),
cst.EPI_EDGES_DEPTH_MAP,
epipolar_images_left,
cars_ds_name="epi_img_left_edges_depth_map",
dtype=np.float32,
optional_data=True,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_left_edges_normals.tif"),
cst.EPI_EDGES_NORMALS,
epipolar_images_left,
cars_ds_name="epi_img_left_edges_normals",
dtype=np.float32,
optional_data=True,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_left_edges_tile_id.tif"),
cst.EPI_EDGES_TILE_ID,
epipolar_images_left,
cars_ds_name="epi_img_left_edges_tile_id",
dtype=np.uint8,
optional_data=True,
)
# right edges data
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_right_edges_mask.tif"),
cst.EPI_EDGES_MASK,
epipolar_images_right,
cars_ds_name="epi_img_right_edges_mask",
dtype=np.uint8,
optional_data=True,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_right_edges_depth_map.tif"),
cst.EPI_EDGES_DEPTH_MAP,
epipolar_images_right,
cars_ds_name="epi_img_right_edges_depth_map",
dtype=np.float32,
optional_data=True,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_right_edges_normals.tif"),
cst.EPI_EDGES_NORMALS,
epipolar_images_right,
cars_ds_name="epi_img_right_edges_normals",
dtype=np.float32,
optional_data=True,
)
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_img_right_edges_tile_id.tif"),
cst.EPI_EDGES_TILE_ID,
epipolar_images_right,
cars_ds_name="epi_img_right_edges_tile_id",
dtype=np.uint8,
optional_data=True,
)
# Get saving infos in order to save tiles when they are computed
[
saving_info_left,
saving_info_right,
] = self.orchestrator.get_saving_infos(
[epipolar_images_left, epipolar_images_right]
)
logging.info(
"Number of tiles in epipolar resampling: "
"row: {} "
"col: {}".format(
epipolar_images_left.tiling_grid.shape[0],
epipolar_images_left.tiling_grid.shape[1],
)
)
# Add infos to orchestrator.out_json
updating_dict = {
application_constants.APPLICATION_TAG: {
resampling_constants.RESAMPLING_RUN_TAG: {
pair_key: {resampling_constants.METHOD: self.used_method},
}
}
}
self.orchestrator.update_out_info(updating_dict)
# retrieve data
epipolar_size_x = grid_left["epipolar_size_x"]
epipolar_size_y = grid_left["epipolar_size_y"]
left_images = resampling_wrappers.get_paths_and_bands_from_image(
sensor_image_left[sens_cst.INPUT_IMG],
required_bands["left"],
)
right_images = resampling_wrappers.get_paths_and_bands_from_image(
sensor_image_right[sens_cst.INPUT_IMG],
required_bands["right"],
)
grid1 = grid_left
grid2 = grid_right
nodata1 = sensor_image_left[sens_cst.INPUT_IMG].get(
sens_cst.INPUT_NODATA, None
)
nodata2 = sensor_image_right[sens_cst.INPUT_IMG].get(
sens_cst.INPUT_NODATA, None
)
mask1 = sensor_image_left.get(sens_cst.INPUT_MSK, None)
mask2 = sensor_image_right.get(sens_cst.INPUT_MSK, None)
edges1 = sensor_image_left.get(sens_cst.INPUT_EDGES, None)
edges2 = sensor_image_right.get(sens_cst.INPUT_EDGES, None)
left_classifs = sensor_image_left.get(
sens_cst.INPUT_CLASSIFICATION, None
)
if left_classifs is not None:
left_classifs = (
resampling_wrappers.get_path_and_values_from_classif(
left_classifs
)
)
right_classifs = sensor_image_right.get(
sens_cst.INPUT_CLASSIFICATION, None
)
if right_classifs is not None:
right_classifs = (
resampling_wrappers.get_path_and_values_from_classif(
right_classifs
)
)
# Set Epipolar roi
epi_tilling_grid = epipolar_images_left.tiling_grid
if epipolar_roi is None:
epipolar_roi = [
np.min(epi_tilling_grid[:, :, 0]),
np.max(epi_tilling_grid[:, :, 1]),
np.min(epi_tilling_grid[:, :, 2]),
np.max(epi_tilling_grid[:, :, 3]),
]
# Convert roi to polygon
epipolar_roi_poly = Polygon(
[
[epipolar_roi[0], epipolar_roi[2]],
[epipolar_roi[0], epipolar_roi[3]],
[epipolar_roi[1], epipolar_roi[3]],
[epipolar_roi[1], epipolar_roi[2]],
[epipolar_roi[0], epipolar_roi[2]],
]
)
# Check if tiles are in sensors
in_sensor_left_array, in_sensor_right_array = (
resampling_wrappers.check_tiles_in_sensor(
sensor_image_left,
sensor_image_right,
epi_tilling_grid,
grid_left,
grid_right,
geom_plugin,
)
)
# broadcast grids
# Transform grids to CarsDict for broadcasting
# due to Dask issue https://github.com/dask/dask/issues/9969
broadcasted_grid1 = self.orchestrator.cluster.scatter(CarsDict(grid1))
broadcasted_grid2 = self.orchestrator.cluster.scatter(CarsDict(grid2))
# Generate Image pair
for col in range(epipolar_images_left.shape[1]):
for row in range(epipolar_images_left.shape[0]):
# Create polygon corresponding to tile
tile = epi_tilling_grid[row, col]
tile_roi_poly = Polygon(
[
[tile[0], tile[2]],
[tile[0], tile[3]],
[tile[1], tile[3]],
[tile[1], tile[2]],
[tile[0], tile[2]],
]
)
if epipolar_roi_poly.intersects(tile_roi_poly) and (
in_sensor_left_array[row, col]
or in_sensor_right_array[row, col]
):
# get overlaps
left_overlap = cars_dataset.overlap_array_to_dict(
epipolar_images_left.overlaps[row, col]
)
right_overlap = cars_dataset.overlap_array_to_dict(
epipolar_images_right.overlaps[row, col]
)
# get window
left_window = epipolar_images_left.get_window_as_dict(
row, col
)
# update saving infos for potential replacement
full_saving_info_left = ocht.update_saving_infos(
saving_info_left, row=row, col=col
)
full_saving_info_right = ocht.update_saving_infos(
saving_info_right, row=row, col=col
)
# Compute images
(
epipolar_images_left[row, col],
epipolar_images_right[row, col],
) = self.orchestrator.cluster.create_task(
generate_epipolar_images_wrapper, nout=2
)(
left_overlap,
right_overlap,
left_window,
epipolar_size_x,
epipolar_size_y,
left_images,
right_images,
broadcasted_grid1,
broadcasted_grid2,
self.interpolator_image,
self.interpolator_classif,
self.interpolator_mask,
self.interpolators_edges,
self.step,
used_disp_min=used_disp_min[row, col],
used_disp_max=used_disp_max[row, col],
add_classif=add_classif,
mask1=mask1,
mask2=mask2,
edges1=edges1,
edges2=edges2,
left_classifs=left_classifs,
right_classifs=right_classifs,
nodata1=nodata1,
nodata2=nodata2,
saving_info_left=full_saving_info_left,
saving_info_right=full_saving_info_right,
)
# Remove tile with all nan
if not in_sensor_left_array[row, col]:
epipolar_images_left[row, col] = None
if not in_sensor_right_array[row, col]:
epipolar_images_right[row, col] = None
return epipolar_images_left, epipolar_images_right
# pylint: disable=too-many-positional-arguments
[docs]
def generate_epipolar_images_wrapper(
left_overlaps,
right_overlaps,
window,
epipolar_size_x,
epipolar_size_y,
left_imgs,
right_imgs,
grid1,
grid2,
interpolator_image,
interpolator_classif,
interpolator_mask,
interpolators_edges,
step=None,
used_disp_min=None,
used_disp_max=None,
add_classif=True,
add_edges=True,
mask1=None,
mask2=None,
edges1=None,
edges2=None,
left_classifs=None,
right_classifs=None,
nodata1=0,
nodata2=0,
saving_info_left=None,
saving_info_right=None,
) -> Dict[str, Tuple[xr.Dataset, xr.Dataset]]:
"""
Compute disparity maps from image objects. This function will be run
as a delayed task. If user want to correctly save dataset, the user must
provide saving_info_left and right. See cars_dataset.fill_dataset.
:param left_overlaps: Overlaps of left image, with row_min, row_max,
col_min and col_max keys.
:type left_overlaps: dict
:param right_overlaps: Overlaps of right image, with row_min, row_max,
col_min and col_max keys.
:type right_overlaps: dict
:param window: Window considered in generation, with row_min, row_max,
col_min and col_max keys.
:type window: dict
:return: Left image object, Right image object (if exists)
Returned objects are composed of dataset with :
- cst.EPI_IMAGE
- cst.EPI_MSK (if given)
- cst.EPI_TEXTURE (for left, if given)
"""
# Transform CarsDict back to dict
grid1 = grid1.data
grid2 = grid2.data
region, margins = format_transformation.region_margins_from_window(
window,
left_overlaps,
right_overlaps,
used_disp_min=used_disp_min,
used_disp_max=used_disp_max,
)
# Rectify images
(
left_dataset,
right_dataset,
left_classif_dataset,
right_classif_dataset,
left_edges_datasets,
right_edges_datasets,
) = resampling_algo.epipolar_rectify_images(
left_imgs,
right_imgs,
grid1,
grid2,
region,
margins,
epipolar_size_x,
epipolar_size_y,
interpolator_image,
interpolator_classif,
interpolator_mask,
interpolators_edges,
step=step,
mask1=mask1,
mask2=mask2,
edges1=edges1,
edges2=edges2,
left_classifs=left_classifs,
right_classifs=right_classifs,
nodata1=nodata1,
nodata2=nodata2,
add_classif=add_classif,
add_edges=add_edges,
)
# Add classification layers to dataset
if add_classif:
if left_classif_dataset:
left_dataset.coords[cst.BAND_CLASSIF] = left_classif_dataset.attrs[
cst.BAND_NAMES
]
left_dataset[cst.EPI_CLASSIFICATION] = xr.DataArray(
left_classif_dataset[cst.EPI_IMAGE].values,
dims=[cst.BAND_CLASSIF, cst.ROW, cst.COL],
).astype(bool)
if right_classif_dataset:
right_dataset.coords[cst.BAND_CLASSIF] = (
right_classif_dataset.attrs[cst.BAND_NAMES]
)
right_dataset[cst.EPI_CLASSIFICATION] = xr.DataArray(
right_classif_dataset[cst.EPI_IMAGE].values,
dims=[cst.BAND_CLASSIF, cst.ROW, cst.COL],
).astype(bool)
# Add edges layer to dataset
if add_edges:
for key, band_name, epi_name in [
("edges_mask", cst.BAND_EDGES_MASK, cst.EPI_EDGES_MASK),
("normals", cst.BAND_EDGES_NORMALS, cst.EPI_EDGES_NORMALS),
("depth_map", cst.BAND_EDGES_DEPTH_MAP, cst.EPI_EDGES_DEPTH_MAP),
("tile_id", cst.BAND_EDGES_TILE_ID, cst.EPI_EDGES_TILE_ID),
]:
if left_edges_datasets[key]:
left_dataset.coords[band_name] = left_edges_datasets[key].attrs[
cst.BAND_NAMES
]
left_dataset[epi_name] = xr.DataArray(
# pylint: disable=unsubscriptable-object
left_edges_datasets[key][cst.EPI_IMAGE].values,
dims=[band_name, cst.ROW, cst.COL],
)
if right_edges_datasets[key]:
right_dataset.coords[band_name] = right_edges_datasets[
key
].attrs[cst.BAND_NAMES]
right_dataset[epi_name] = xr.DataArray(
# pylint: disable=unsubscriptable-object
right_edges_datasets[key][cst.EPI_IMAGE].values,
dims=[band_name, cst.ROW, cst.COL],
)
# Add attributes info
attributes = {}
# fill datasets with saving info, window, profile, overlaps for correct
# saving
cars_dataset.fill_dataset(
left_dataset,
saving_info=saving_info_left,
window=window,
profile=None,
attributes=attributes,
overlaps=left_overlaps,
)
cars_dataset.fill_dataset(
right_dataset,
saving_info=saving_info_right,
window=window,
profile=None,
attributes=attributes,
overlaps=right_overlaps,
)
return left_dataset, right_dataset