Source code for cars.applications.dsm_filling.exogenous_filling_app

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the exogenous dsm filling application class.
"""

import logging
import os
import shutil

import numpy as np
import rasterio as rio
import xarray as xr
from json_checker import Checker, Or
from pyproj import CRS
from rasterio.enums import Resampling
from rasterio.warp import reproject
from rasterio.windows import Window
from rasterio.windows import transform as row_col_to_coords
from shapely import Polygon

import cars.orchestrator.orchestrator as ocht
from cars.core import projection, tiling
from cars.data_structures import cars_dataset
from cars.orchestrator.cluster.log_wrapper import cars_profile

from .abstract_dsm_filling_app import DsmFilling


[docs] class ExogenousFilling(DsmFilling, short_name="exogenous_filling"): """ Exogenous filling """ def __init__(self, conf=None): """ Init function of ExogenousFilling :param conf: configuration for ExogenousFilling :return: an application_to_use object """ super().__init__(conf=conf) # check conf self.used_method = self.used_config["method"] self.fill_classification = self.used_config["fill_classification"] self.fill_nodata = self.used_config["fill_nodata"] self.fill_with_geoid = self.used_config["fill_with_geoid"] self.interpolation_method = self.used_config["interpolation_method"] self.save_intermediate_data = self.used_config["save_intermediate_data"]
[docs] def check_conf(self, conf): # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "exogenous_filling") overloaded_conf["fill_classification"] = conf.get( "fill_classification", "nodata" ) if isinstance(overloaded_conf["fill_classification"], str): overloaded_conf["fill_classification"] = [ overloaded_conf["fill_classification"] ] overloaded_conf["fill_nodata"] = conf.get("fill_nodata", None) if isinstance(overloaded_conf["fill_nodata"], str): overloaded_conf["fill_nodata"] = [overloaded_conf["fill_nodata"]] overloaded_conf["fill_with_geoid"] = conf.get("fill_with_geoid", None) overloaded_conf["interpolation_method"] = conf.get( "interpolation_method", "bilinear" ) if overloaded_conf["interpolation_method"] not in ["bilinear", "cubic"]: # pylint: disable=inconsistent-quotes raise RuntimeError( f"Invalid interpolation method" f"{overloaded_conf['interpolation_method']}, " f"supported modes are bilinear and cubic." ) overloaded_conf["save_intermediate_data"] = conf.get( "save_intermediate_data", False ) rectification_schema = { "method": str, "fill_classification": Or(None, [str]), "fill_nodata": Or(None, [str]), "fill_with_geoid": Or(None, [str]), "interpolation_method": str, "save_intermediate_data": bool, } # Check conf checker = Checker(rectification_schema) checker.validate(overloaded_conf) return overloaded_conf
@cars_profile(name="Exogeneous filling") def run( # pylint: disable=too-many-positional-arguments # noqa C901 self, dsm_file, classif_file, filling_file, invalidity_mask_file, classif_values, dump_dir, roi_polys, roi_epsg, output_geoid, geom_plugin, dsm_dir=None, tile_size=10000, orchestrator=None, ): """ Run dsm filling using initial elevation and the current dsm Replaces dsm.tif by the filled dsm. Adds a new band to filling.tif if it exists. The old dsm is saved in dump_dir. roi_poly can any of these objects : - a list of Shapely Polygons - a Shapely Polygon """ if orchestrator is None: orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) if dsm_dir is not None: dsm_path_out = os.path.join(dsm_dir, "dsm.tif") filling_path_out = os.path.join(dsm_dir, "filling.tif") else: dsm_path_out = dsm_file filling_path_out = filling_file if self.fill_classification is None: self.fill_classification = ["nodata"] if self.fill_with_geoid is None: self.fill_with_geoid = [] interpolation_methods_dict = { "bilinear": Resampling.bilinear, "cubic": Resampling.cubic, } interpolation_method = interpolation_methods_dict.get( self.interpolation_method, Resampling.bilinear ) if geom_plugin is None: logging.error( "No DEM was provided, exogenous_filling will not run." ) return None if not os.path.exists(dump_dir): os.makedirs(dump_dir) # get dsm to be filled and its metadata with rio.open(dsm_file) as in_dsm: profile = in_dsm.profile # Transform to wtk for serialization. profile["crs"] = profile["crs"].to_wkt() height = in_dsm.height width = in_dsm.width dsm_dtype = in_dsm.dtypes[0] nodata_value = in_dsm.nodata filled_dsm_cars_ds = cars_dataset.CarsDataset( "arrays", name="Monoband Filling" ) # Compute tiling grid filled_dsm_cars_ds.tiling_grid = tiling.generate_tiling_grid( 0, 0, height, width, tile_size, tile_size, ) # Saving infos [ saving_info, ] = orchestrator.get_saving_infos([filled_dsm_cars_ds]) # save list orchestrator.add_to_save_lists( dsm_path_out, "exogenous_filled_dsm", filled_dsm_cars_ds, dtype=dsm_dtype, nodata=nodata_value, optional_data=False, cars_ds_name="exogenous_filled_dsm", ) if filling_file is not None: with rio.open(filling_file, "r") as src: filling_dtype = src.dtypes[0] filling_nodata_value = src.nodata # count will be count += 1 band_description = [ (i + 1, src.descriptions[i]) for i in range(src.count) ] band_description.append( (len(band_description) + 1, "filling_exogenous") ) orchestrator.add_to_save_lists( filling_path_out, "exogenous_filled_filling", filled_dsm_cars_ds, dtype=filling_dtype, nodata=filling_nodata_value, optional_data=False, cars_ds_name="exogenous_filled_filling", rio_band_description=band_description, ) old_dsm_path = os.path.join(dump_dir, "dsm_not_filled.tif") new_dsm_path = os.path.join(dump_dir, "dsm_filled.tif") old_filling_path = None # Save old dsm shutil.copy(dsm_file, old_dsm_path) if filling_file is not None: old_filling_path = os.path.join(dump_dir, "filling_not_filled.tif") shutil.copy(filling_file, old_filling_path) if self.save_intermediate_data: # save new orchestrator.add_to_save_lists( new_dsm_path, "exogenous_filled_dsm", filled_dsm_cars_ds, dtype=dsm_dtype, nodata=nodata_value, optional_data=False, cars_ds_name="exogenous_filled_dsm", ) # saved reprojected dem / intial elevation reprojected_dem_path = os.path.join(dump_dir, "reprojected_dem.tif") orchestrator.add_to_save_lists( reprojected_dem_path, "reprojected_dem", filled_dsm_cars_ds, dtype=dsm_dtype, nodata=nodata_value, optional_data=False, cars_ds_name="reprojected_dem", ) # saved reprojected input geoid reprojected_input_geoid_path = os.path.join( dump_dir, "reprojected_input_geoid.tif" ) orchestrator.add_to_save_lists( reprojected_input_geoid_path, "reprojected_input_geoid", filled_dsm_cars_ds, dtype=dsm_dtype, nodata=nodata_value, optional_data=False, cars_ds_name="reprojected_input_geoid", ) if output_geoid not in (False, None): # saved reprojected input geoid reprojected_output_geoid_path = os.path.join( dump_dir, "reprojected_output_geoid.tif" ) orchestrator.add_to_save_lists( reprojected_output_geoid_path, "reprojected_output_geoid", filled_dsm_cars_ds, dtype=dsm_dtype, nodata=nodata_value, optional_data=False, cars_ds_name="reprojected_output_geoid", ) for row in range(filled_dsm_cars_ds.shape[0]): for col in range(filled_dsm_cars_ds.shape[1]): # update saving infos for potential replacement full_saving_info = ocht.update_saving_infos( saving_info, row=row, col=col ) window = filled_dsm_cars_ds.get_window_as_dict(row, col) # Compute images ( filled_dsm_cars_ds[row, col] ) = orchestrator.cluster.create_task( exogenous_filling_wrapper, nout=1 )( old_dsm_path, old_filling_path, classif_file, invalidity_mask_file, classif_values, geom_plugin.dem, geom_plugin.geoid, output_geoid, roi_polys, roi_epsg, self.fill_with_geoid, interpolation_method, self.fill_classification, self.fill_nodata, window=window, saving_info=full_saving_info, profile=profile, ) return filled_dsm_cars_ds
[docs] def exogenous_filling_wrapper( # noqa C901 # pylint: disable=R0917 dsm_file, filling_file, classif_file, invalidity_mask_file, classif_values, geo_plugin_dem, geo_plugin_geoid, output_geoid, roi_polys, roi_epsg, fill_with_geoid, interpolation_method, fill_classification, fill_nodata, window=None, saving_info=None, profile=None, ): """ Wrapper for exogenous filling, to be applied on each tile of the DSM. :param dsm_file: dsm file to fill :param filling_file: filling file :param classif_file: classification file :param geo_plugin_dem: initial elevation dem :param geo_plugin_geoid: initial elevation geoid :param output_geoid: output geoid :return: filled dsm xarray dataset """ # Get rasterio window col_min = window["col_min"] row_min = window["row_min"] col_max = window["col_max"] row_max = window["row_max"] rasterio_window = Window( col_off=col_min, row_off=row_min, width=(col_max - col_min), height=(row_max - row_min), ) with rio.open(dsm_file) as in_dsm: dsm = in_dsm.read(1, window=rasterio_window) dsm_crs = in_dsm.crs # Get local transform for window window_transform = row_col_to_coords(rasterio_window, in_dsm.transform) roi_raster = np.ones(dsm.shape) if isinstance(roi_polys, list): roi_polys_outepsg = [] for poly in roi_polys: if isinstance(poly, Polygon): roi_poly_outepsg = projection.polygon_projection_crs( poly, CRS(roi_epsg), dsm_crs ) roi_polys_outepsg.append(roi_poly_outepsg) roi_raster = rio.features.rasterize( roi_polys_outepsg, out_shape=roi_raster.shape, transform=window_transform, ) elif isinstance(roi_polys, Polygon): roi_poly_outepsg = projection.polygon_projection_crs( roi_polys, CRS(roi_epsg), dsm_crs ) roi_raster = rio.features.rasterize( [roi_poly_outepsg], out_shape=roi_raster.shape, transform=window_transform, ) # Get the initial elevation with rio.open(geo_plugin_dem) as in_elev: # Reproject the elevation data to match the DSM elev_data = np.empty(dsm.shape, dtype=in_elev.dtypes[0]) reproject( source=rio.band(in_elev, 1), destination=elev_data, src_transform=in_elev.transform, src_crs=in_elev.crs, dst_transform=window_transform, dst_crs=dsm_crs, resampling=interpolation_method, ) with rio.open(geo_plugin_geoid) as in_geoid: # Reproject the geoid data to match the DSM input_geoid_data = np.empty(dsm.shape, dtype=in_geoid.dtypes[0]) reproject( source=rio.band(in_geoid, 1), destination=input_geoid_data, src_transform=in_geoid.transform, src_crs=in_geoid.crs, dst_transform=window_transform, dst_crs=dsm_crs, resampling=interpolation_method, ) output_geoid_data = None if isinstance(output_geoid, str): with rio.open(output_geoid) as in_geoid: # Reproject the geoid data to match the DSM output_geoid_data = np.empty(dsm.shape, dtype=in_geoid.dtypes[0]) reproject( source=rio.band(in_geoid, 1), destination=output_geoid_data, src_transform=in_geoid.transform, src_crs=in_geoid.crs, dst_transform=window_transform, dst_crs=dsm_crs, resampling=interpolation_method, ) # Fill DSM for every label combined_mask = np.zeros_like(dsm).astype(np.uint8) classif = None classif_msk = None dsm_msk = None if classif_file is not None: with rio.open(classif_file) as in_classif: classif = in_classif.read(1, window=rasterio_window) classif_msk = in_classif.read_masks(1, window=rasterio_window) else: with rio.open(dsm_file) as in_dsm: dsm_msk = in_dsm.read_masks(1, window=rasterio_window) for label in fill_classification: if label in classif_values: filling_mask = np.logical_and(classif == int(label), roi_raster > 0) elif label == "nodata": if classif_msk is not None: filling_mask = ~classif_msk else: filling_mask = ~dsm_msk filling_mask = np.logical_and(filling_mask, roi_raster > 0) else: logging.error( "Label {} not found in classification " "descriptions {}".format(label, classif_values) ) continue if label in fill_with_geoid: logging.info("Filling of {} with geoid".format(label)) dsm[filling_mask] = 0 else: logging.info("Filling of {} with DEM and geoid".format(label)) dsm[filling_mask] = elev_data[filling_mask] # apply offset to project on geoid if needed if output_geoid is not True: if isinstance(output_geoid, bool) and output_geoid is False: # out geoid is ellipsoid: add geoid-ellipsoid distance dsm[filling_mask] += input_geoid_data[filling_mask] elif isinstance(output_geoid, str): # out geoid is a new geoid whose path is in output_geoid: # add carsgeoid-ellipsoid then add ellipsoid-outgeoid dsm[filling_mask] += input_geoid_data[filling_mask] dsm[filling_mask] -= output_geoid_data[filling_mask] combined_mask = np.logical_or(combined_mask, filling_mask) invalidity_mask = None if fill_nodata is not None: if invalidity_mask_file is not None: with rio.open(invalidity_mask_file) as src: invalidity_mask = src.read(1, window=rasterio_window) for label in fill_nodata: filling_mask = np.logical_and( invalidity_mask == int(label), roi_raster > 0 ) if label in fill_with_geoid: logging.info("Filling of {} with geoid".format(label)) dsm[filling_mask] = 0 else: logging.info("Filling of {} with DEM and geoid".format(label)) dsm[filling_mask] = elev_data[filling_mask] # apply offset to project on geoid if needed if output_geoid is not True: if isinstance(output_geoid, bool) and output_geoid is False: # out geoid is ellipsoid: add geoid-ellipsoid distance dsm[filling_mask] += input_geoid_data[filling_mask] elif isinstance(output_geoid, str): # out geoid is a new geoid whose path is in output_geoid: # add carsgeoid-ellipsoid then add ellipsoid-outgeoid dsm[filling_mask] += input_geoid_data[filling_mask] dsm[filling_mask] -= output_geoid_data[filling_mask] combined_mask = np.logical_or(combined_mask, filling_mask) data = { "exogenous_filled_dsm": (["row", "col"], dsm), "reprojected_dem": (["row", "col"], elev_data), "reprojected_input_geoid": (["row", "col"], input_geoid_data), } coords = { "row": np.arange(dsm.shape[0]), "col": np.arange(dsm.shape[1]), } if filling_file is not None: # create combined filling with rio.open(filling_file) as in_filling: nb_bands_filling = in_filling.count + 1 filling = in_filling.read(window=rasterio_window) # add layer combined_mask to filling combined_mask = combined_mask[np.newaxis, :, :] filling = np.concatenate((filling, combined_mask), axis=0) data["exogenous_filled_filling"] = ( ["band_filling", "row", "col"], filling, ) coords["band_filling"] = np.arange(1, nb_bands_filling + 1) if output_geoid_data is not None: data["reprojected_output_geoid"] = (["row", "col"], output_geoid_data) output_dataset = xr.Dataset( data_vars=data, coords=coords, ) cars_dataset.fill_dataset( output_dataset, saving_info=saving_info, window=window, profile=profile, attributes=None, overlaps=None, ) return output_dataset