Source code for cars.applications.dsm_filling.exogenous_filling

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the exogenous dsm filling application class.
"""

import logging
import os
import shutil

import numpy as np
import rasterio as rio
from json_checker import Checker, Or
from rasterio.enums import Resampling
from rasterio.warp import reproject
from shapely import Polygon

from cars.core import inputs, projection
from cars.orchestrator.cluster.log_wrapper import cars_profile

from .dsm_filling import DsmFilling


[docs]class ExogenousFilling(DsmFilling, short_name="exogenous_filling"): """ Exogenous filling """ def __init__(self, conf=None): """ Init function of ExogenousFilling :param conf: configuration for ExogenousFilling :return: an application_to_use object """ super().__init__(conf=conf) # check conf self.used_method = self.used_config["method"] self.activated = self.used_config["activated"] self.classification = self.used_config["classification"] self.fill_with_geoid = self.used_config["fill_with_geoid"] self.interpolation_method = self.used_config["interpolation_method"] self.save_intermediate_data = self.used_config["save_intermediate_data"]
[docs] def check_conf(self, conf): # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "bulldozer") overloaded_conf["activated"] = conf.get("activated", False) overloaded_conf["classification"] = conf.get("classification", None) overloaded_conf["fill_with_geoid"] = conf.get("fill_with_geoid", None) overloaded_conf["interpolation_method"] = conf.get( "interpolation_method", "bilinear" ) if overloaded_conf["interpolation_method"] not in ["bilinear", "cubic"]: raise RuntimeError( "Invalid interpolation method" f"{overloaded_conf['interpolation_method']}, " "supported modes are bilinear and cubic." ) overloaded_conf["save_intermediate_data"] = conf.get( "save_intermediate_data", False ) rectification_schema = { "method": str, "activated": bool, "classification": Or(None, [str]), "fill_with_geoid": Or(None, [str]), "interpolation_method": str, "save_intermediate_data": bool, } # Check conf checker = Checker(rectification_schema) checker.validate(overloaded_conf) return overloaded_conf
@cars_profile(name="Exogeneous filling") def run( # noqa C901 self, dsm_file, classif_file, filling_file, dump_dir, roi_polys, roi_epsg, output_geoid, geom_plugin, ): """ Run dsm filling using initial elevation and the current dsm Replaces dsm.tif by the filled dsm. Adds a new band to filling.tif if it exists. The old dsm is saved in dump_dir. roi_poly can any of these objects : - a list of Shapely Polygons - a Shapely Polygon """ if not self.activated: return if self.classification is None: self.classification = ["nodata"] if self.fill_with_geoid is None: self.fill_with_geoid = [] interpolation_methods_dict = { "bilinear": Resampling.bilinear, "cubic": Resampling.cubic, } interpolation_method = interpolation_methods_dict.get( self.interpolation_method, Resampling.bilinear ) if geom_plugin is None: logging.error( "No DEM was provided, exogenous_filling will not run." ) return if not os.path.exists(dump_dir): os.makedirs(dump_dir) old_dsm_path = os.path.join(dump_dir, "dsm_not_filled.tif") new_dsm_path = os.path.join(dump_dir, "dsm_filled.tif") # get dsm to be filled and its metadata with rio.open(dsm_file) as in_dsm: dsm = in_dsm.read(1) dsm_tr = in_dsm.transform dsm_crs = in_dsm.crs dsm_meta = in_dsm.meta roi_raster = np.ones(dsm.shape) if isinstance(roi_polys, list): roi_polys_outepsg = [] for poly in roi_polys: if isinstance(poly, Polygon): roi_poly_outepsg = projection.polygon_projection( poly, roi_epsg, dsm_crs.to_epsg() ) roi_polys_outepsg.append(roi_poly_outepsg) roi_raster = rio.features.rasterize( roi_polys_outepsg, out_shape=roi_raster.shape, transform=dsm_tr ) elif isinstance(roi_polys, Polygon): roi_poly_outepsg = projection.polygon_projection( roi_polys, roi_epsg, dsm_crs.to_epsg() ) roi_raster = rio.features.rasterize( [roi_poly_outepsg], out_shape=roi_raster.shape, transform=dsm_tr ) # Get the initial elevation with rio.open(geom_plugin.dem) as in_elev: # Reproject the elevation data to match the DSM elev_data = np.empty(dsm.shape, dtype=in_elev.dtypes[0]) reproject( source=rio.band(in_elev, 1), destination=elev_data, src_transform=in_elev.transform, src_crs=in_elev.crs, dst_transform=dsm_tr, dst_crs=dsm_crs, resampling=interpolation_method, ) if self.save_intermediate_data: reprojected_dem_path = os.path.join(dump_dir, "reprojected_dem.tif") with rio.open(reprojected_dem_path, "w", **dsm_meta) as out_elev: out_elev.write(elev_data, 1) with rio.open(geom_plugin.geoid) as in_geoid: # Reproject the geoid data to match the DSM input_geoid_data = np.empty(dsm.shape, dtype=in_geoid.dtypes[0]) reproject( source=rio.band(in_geoid, 1), destination=input_geoid_data, src_transform=in_geoid.transform, src_crs=in_geoid.crs, dst_transform=dsm_tr, dst_crs=dsm_crs, resampling=interpolation_method, ) if self.save_intermediate_data: reprojected_geoid_path = os.path.join( dump_dir, "reprojected_input_geoid.tif" ) with rio.open(reprojected_geoid_path, "w", **dsm_meta) as out_geoid: out_geoid.write(input_geoid_data, 1) if isinstance(output_geoid, str): with rio.open(output_geoid) as in_geoid: # Reproject the geoid data to match the DSM output_geoid_data = np.empty( dsm.shape, dtype=in_geoid.dtypes[0] ) reproject( source=rio.band(in_geoid, 1), destination=output_geoid_data, src_transform=in_geoid.transform, src_crs=in_geoid.crs, dst_transform=dsm_tr, dst_crs=dsm_crs, resampling=interpolation_method, ) if self.save_intermediate_data: reprojected_geoid_path = os.path.join( dump_dir, "reprojected_output_geoid.tif" ) with rio.open( reprojected_geoid_path, "w", **dsm_meta ) as out_geoid: out_geoid.write(input_geoid_data, 1) # Save old dsm if self.save_intermediate_data: with rio.open(old_dsm_path, "w", **dsm_meta) as out_dsm: out_dsm.write(dsm, 1) # Fill DSM for every label combined_mask = np.zeros_like(dsm).astype(np.uint8) if classif_file is not None: classif_descriptions = inputs.get_descriptions_bands(classif_file) else: classif_descriptions = [] for label in self.classification: if label in classif_descriptions: index_classif = classif_descriptions.index(label) + 1 with rio.open(classif_file) as in_classif: classif = in_classif.read(index_classif) classif_msk = in_classif.read_masks(1) classif[classif_msk == 0] = 0 filling_mask = np.logical_and(classif, roi_raster > 0) elif label == "nodata": if classif_file is not None: with rio.open(classif_file) as in_classif: classif_msk = in_classif.read_masks(1) classif = ~classif_msk else: with rio.open(dsm_file) as in_dsm: dsm_msk = in_dsm.read_masks(1) classif = ~dsm_msk filling_mask = np.logical_and(classif, roi_raster > 0) else: logging.error( "Label {} not found in classification " "descriptions {}".format(label, classif_descriptions) ) continue if label in self.fill_with_geoid: logging.info("Filling of {} with geoid".format(label)) dsm[filling_mask] = 0 else: logging.info("Filling of {} with DEM and geoid".format(label)) dsm[filling_mask] = elev_data[filling_mask] # apply offset to project on geoid if needed if output_geoid is not True: if isinstance(output_geoid, bool) and output_geoid is False: # out geoid is ellipsoid: add geoid-ellipsoid distance dsm[filling_mask] += input_geoid_data[filling_mask] elif isinstance(output_geoid, str): # out geoid is a new geoid whose path is in output_geoid: # add carsgeoid-ellipsoid then add ellipsoid-outgeoid dsm[filling_mask] += input_geoid_data[filling_mask] dsm[filling_mask] -= output_geoid_data[filling_mask] combined_mask = np.logical_or(combined_mask, filling_mask) with rio.open(dsm_file, "w", **dsm_meta) as out_dsm: out_dsm.write(dsm, 1) if self.save_intermediate_data: shutil.copy2(dsm_file, new_dsm_path) if filling_file is not None: with rio.open(filling_file, "r") as src: fill_meta = src.meta bands = [src.read(i + 1) for i in range(src.count)] bands_desc = [src.descriptions[i] for i in range(src.count)] fill_meta["count"] += 1 bands.append(combined_mask) bands_desc.append("filling_exogenous") with rio.open(filling_file, "w", **fill_meta) as out: for i, band in enumerate(bands): out.write(band, i + 1) out.set_band_description(i + 1, bands_desc[i])