Source code for cars.applications.sensors_subsampling.rasterio_subsampling_app

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the dense_matching application class.
"""
import copy
import os
from pathlib import Path

# Standard imports
from typing import Dict, Tuple

import numpy as np
import rasterio
import xarray as xr
from affine import Affine
from json_checker import Checker

import cars.orchestrator.orchestrator as ocht
from cars.applications.resampling import (
    resampling_wrappers,
)
from cars.applications.sensors_subsampling import (
    abstract_subsampling_app as ssa,
)
from cars.applications.sensors_subsampling import (
    subsampling_algo,
)
from cars.core import constants as cst
from cars.core import inputs, tiling
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst

# pylint: disable= C0302


[docs] class RasterioSubsampling(ssa.SensorsSubsampling, short_name=["rasterio"]): """ SensorsSubsampling """ def __init__(self, conf=None): """ Init function of SensorsSubsampling :param conf: configuration for SensorsSubsampling :return: an application_to_use object """ super().__init__(conf=conf) # check conf self.used_method = self.used_config["method"] self.tile_size = self.used_config["tile_size"] self.interpolator_image = self.used_config["interpolator_image"] self.interpolator_classif = self.used_config["interpolator_classif"] self.interpolator_mask = self.used_config["interpolator_mask"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "rasterio") overloaded_conf["tile_size"] = conf.get("tile_size", 10000) overloaded_conf["interpolator_image"] = conf.get( "interpolator_image", "bilinear" ) overloaded_conf["interpolator_classif"] = conf.get( "interpolator_classif", "nearest" ) overloaded_conf["interpolator_mask"] = conf.get( "interpolator_mask", "nearest" ) subsampling_schema = { "method": str, "tile_size": int, "interpolator_image": str, "interpolator_mask": str, "interpolator_classif": str, } # Check conf checker = Checker(subsampling_schema) checker.validate(overloaded_conf) return overloaded_conf
[docs] def update_profile(self, img_path, scale_factor): """ Update rasterio profile :param img_path: the image path :type img_path: str :param scale_factor: the scaling factor :type scale_factor: float """ with rasterio.open(img_path) as src: height = src.height width = src.width new_height = int(height * scale_factor) new_width = int(width * scale_factor) transform = src.transform new_transform = transform * Affine.scale( (width / new_width), (height / new_height) ) profile = src.profile.copy() profile.update( { "height": new_height, "width": new_width, "transform": new_transform, "driver": "GTiff", "compress": "lzw", } ) return profile, height, width
[docs] def get_paths_dictionary(self, sensor_dict): """ Get the paths dictionary :param sensor_dict: the sensor dictionary (classification, image...) :type sensor_dict: dict """ # get images, no data and classifs image_dict = resampling_wrappers.get_paths_and_bands_from_image( sensor_dict[sens_cst.INPUT_IMG], None, ) image = next(iter(image_dict)) mask = sensor_dict.get(sens_cst.INPUT_MSK, None) classif = sensor_dict.get(sens_cst.INPUT_CLASSIFICATION, None) classif_path = None if classif is not None: classif = resampling_wrappers.get_path_and_values_from_classif( classif ) classif_path = next(iter(classif)) paths_dictionary = {"im": image, "mask": mask, "classif": classif_path} for step, key in enumerate(list(image_dict.keys())[1:]): paths_dictionary[f"texture_{step}"] = key for key, path in paths_dictionary.items(): paths_dictionary[key] = {"path": path} paths_dictionary[key]["nodata"] = None paths_dictionary[key]["dtype"] = None if path is not None: paths_dictionary[key]["nodata"] = inputs.rasterio_get_nodata( path ) paths_dictionary[key]["dtype"] = inputs.rasterio_get_dtype(path) return paths_dictionary
[docs] def run( # pylint: disable=too-many-positional-arguments self, id_image, sensor_dict, resolution, out_directory, orchestrator, ): """ Run subsampling using rasterio :param id_image: the id of the image :type id_image: str :param sensor_dict: the sensor dictionnary (image, classification...) :type sensor_dict: dict :param resolution: the subsampling resolution :type resolution: int :param out_directory: the output directory :type out_directory: str :param orchestrator: orchestrator used """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator # Create CarsDataset # images image_subsampled = cars_dataset.CarsDataset( "arrays", name="subsampling_left_" + str(resolution) ) # Define the scale factor scale_factor = 1 / resolution # put a multiple of the resolution for tiling to avoid artefacts self.tile_size = int(self.tile_size * scale_factor) * resolution # Define the path of each image img_path = sensor_dict[sens_cst.INPUT_IMG]["bands"]["b0"]["path"] # update profile of each image new_profile, height, width = self.update_profile(img_path, scale_factor) # Get saving infos in order to save tiles when they are computed [ saving_info, ] = self.orchestrator.get_saving_infos([image_subsampled]) # Build paths dictionnary paths_dictionary = self.get_paths_dictionary(sensor_dict) # Define tiling grid tiling_grid = tiling.generate_tiling_grid( 0, 0, height, width, self.tile_size, self.tile_size, ) # Compute tiling grid image_subsampled.tiling_grid = tiling_grid # Save files safe_makedirs(os.path.join(out_directory, id_image)) for key, val in paths_dictionary.items(): path = val["path"] dtype_val = val["dtype"] nodata_val = val["nodata"] nbits = np.dtype(dtype_val).itemsize * 8 if key == "classif": dtype = dtype_val optional_data = True nodata = nodata_val else: dtype = dtype_val nodata = nodata_val optional_data = False if key == "mask": nbits = 1 dtype = "uint8" if path is not None: name_file = Path(path).stem orchestrator.add_to_save_lists( os.path.join(out_directory, id_image, name_file + ".tif"), key, image_subsampled, dtype=dtype, nodata=nodata, optional_data=optional_data, cars_ds_name="subsampling_" + key + "_res_" + str(resolution), nbits=nbits, ) # Generate Image pair for col in range(image_subsampled.shape[1]): for row in range(image_subsampled.shape[0]): # update saving infos for potential replacement full_saving_info = ocht.update_saving_infos( saving_info, row=row, col=col ) window = image_subsampled.get_window_as_dict(row, col) # Compute images ( image_subsampled[row, col] ) = self.orchestrator.cluster.create_task( generate_subsampled_images_wrapper, nout=1 )( paths_dictionary, new_profile, scale_factor, self.tile_size, window=window, saving_info=full_saving_info, ) return image_subsampled
# pylint: disable=too-many-positional-arguments
[docs] def generate_subsampled_images_wrapper( paths_dictionary, profile, scale_factor=1, tile_size=10000, window=None, saving_info=None, ) -> Dict[str, Tuple[xr.Dataset, xr.Dataset]]: """ Subsampling wrapper :param paths_dictionary: the paths dictionary :type paths_dictionary: dict :param profile: the new profile :param scale_factor: the scaling_factor :type scale_factor: float :param tile_size: the tile size :type tile_size: int :param window: the current window :type window: Window :param saving_info: the saving information """ global_dataset = None for key, val in paths_dictionary.items(): path = val["path"] # Rectify images if path is not None: interpolator = "bilinear" if key in ("classif", "mask"): interpolator = "nearest" dataset = subsampling_algo.resample_image( path, window, tile_size, key, scale_factor=scale_factor, interpolator=interpolator, ) if key == "im": global_dataset = copy.deepcopy(dataset) if key == "classif": global_dataset[key] = xr.DataArray( dataset[cst.EPI_IMAGE].values, dims=[cst.ROW, cst.COL], ) if key == "mask": global_dataset[key] = xr.DataArray( dataset[cst.EPI_IMAGE].values, dims=[cst.ROW, cst.COL], ) if "texture" in key: global_dataset.coords[cst.BAND_IM] = dataset.attrs[ cst.BAND_NAMES ] global_dataset[key] = xr.DataArray( dataset[cst.EPI_IMAGE].values, dims=[cst.BAND_NAMES, cst.ROW, cst.COL], ) window_out_left = { "row_min": global_dataset.region[1], "row_max": global_dataset.region[3], "col_min": global_dataset.region[0], "col_max": global_dataset.region[2], } # Add attributes info attributes = {} # fill datasets with saving info, window, profile, overlaps for correct # saving cars_dataset.fill_dataset( global_dataset, saving_info=saving_info, window=window_out_left, profile=profile, attributes=attributes, ) return global_dataset