Source code for cars.applications.dem_generation.bulldozer_dem_app

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the dichotomic dem generation application class.
"""
# Standard library
import logging
import math
import os
import shutil

# Third-party imports
import numpy as np
import rasterio as rio
import skimage
from json_checker import And, Checker, Or
from rasterio.enums import Resampling
from rasterio.warp import reproject

# CARS imports - Applications
from cars.applications import application_constants
from cars.applications.dem_generation.abstract_dem_generation_app import (
    DemGeneration,
)
from cars.applications.dem_generation.dem_generation_algo import (
    launch_bulldozer,
)
from cars.applications.dem_generation.dem_generation_wrappers import (
    compute_stats,
    downsample_dem,
    edit_transform,
    fit_initial_elevation_on_dem_median,
    reproject_dem,
    reverse_dem,
)

# CARS imports - Core
from cars.core import inputs
from cars.orchestrator.cluster.log_wrapper import cars_profile


[docs] class BulldozerDem(DemGeneration, short_name="bulldozer_on_raster"): """ Rasterization """ # pylint: disable=too-many-instance-attributes def __init__(self, scaling_coeff, conf=None): """ Init function of Rasterization :param scaling_coeff: scaling factor for resolution :type scaling_coeff: float :param conf: configuration for Rasterization :return: an application_to_use object """ super().__init__(scaling_coeff, conf=conf) # check conf self.used_method = self.used_config["method"] self.resolution = self.used_config["resolution"] self.margin = self.used_config["margin"] height_margin = self.used_config["height_margin"] if isinstance(height_margin, list): self.min_height_margin = height_margin[0] self.max_height_margin = height_margin[1] else: self.min_height_margin = height_margin self.max_height_margin = height_margin self.morphological_filters_size = self.used_config[ "morphological_filters_size" ] self.preprocessing_median_filter_size = self.used_config[ "preprocessing_median_filter_size" ] self.postprocessing_median_filter_size = self.used_config[ "postprocessing_median_filter_size" ] self.dem_median_downscale = self.used_config["dem_median_downscale"] self.dem_min_max_downscale = self.used_config["dem_min_max_downscale"] self.fillnodata_max_search_distance = self.used_config[ "fillnodata_max_search_distance" ] self.min_dem = self.used_config["min_dem"] self.max_dem = self.used_config["max_dem"] self.bulldozer_max_object_size = self.used_config[ "bulldozer_max_object_size" ] self.disable_bulldozer = self.used_config["disable_bulldozer"] self.compute_stats = self.used_config["compute_stats"] self.coregistration = self.used_config["coregistration"] self.coregistration_max_shift = self.used_config[ "coregistration_max_shift" ] self.save_intermediate_data = self.used_config["save_intermediate_data"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "bulldozer_on_raster") overloaded_conf["resolution"] = conf.get( "resolution", float(self.scaling_coeff * 0.5) ) overloaded_conf["margin"] = conf.get( "margin", [0.1, float(math.sqrt(self.scaling_coeff) * 500)] ) overloaded_conf["morphological_filters_size"] = conf.get( "morphological_filters_size", 30 ) overloaded_conf["preprocessing_median_filter_size"] = conf.get( "preprocessing_median_filter_size", 5 ) overloaded_conf["postprocessing_median_filter_size"] = conf.get( "postprocessing_median_filter_size", 7 ) overloaded_conf["dem_median_downscale"] = conf.get( "dem_median_downscale", 10 ) overloaded_conf["dem_min_max_downscale"] = conf.get( "dem_min_max_downscale", 2 ) overloaded_conf["fillnodata_max_search_distance"] = conf.get( "fillnodata_max_search_distance", 50 ) overloaded_conf["min_dem"] = conf.get("min_dem", -500) overloaded_conf["max_dem"] = conf.get("max_dem", 1000) overloaded_conf["height_margin"] = conf.get( "height_margin", float(math.sqrt(self.scaling_coeff) * 20) ) overloaded_conf["bulldozer_max_object_size"] = conf.get( "bulldozer_max_object_size", 8 ) overloaded_conf["disable_bulldozer"] = conf.get( "disable_bulldozer", False ) overloaded_conf["compute_stats"] = conf.get("compute_stats", True) overloaded_conf["coregistration"] = conf.get("coregistration", True) overloaded_conf["coregistration_max_shift"] = conf.get( "coregistration_max_shift", 180 ) overloaded_conf[application_constants.SAVE_INTERMEDIATE_DATA] = ( conf.get(application_constants.SAVE_INTERMEDIATE_DATA, False) ) rectification_schema = { "method": str, "resolution": And(Or(float, int), lambda x: x > 0), application_constants.SAVE_INTERMEDIATE_DATA: bool, "margin": [Or(float, int)], "morphological_filters_size": And(int, lambda x: x > 0), "preprocessing_median_filter_size": And(int, lambda x: x > 0), "postprocessing_median_filter_size": And(int, lambda x: x > 0), "dem_median_downscale": And(int, lambda x: x > 0), "dem_min_max_downscale": And(int, lambda x: x > 0), "fillnodata_max_search_distance": And(int, lambda x: x > 0), "min_dem": And(Or(int, float), lambda x: x < 0), "max_dem": And(Or(int, float), lambda x: x > 0), "height_margin": Or(list, float, int), "bulldozer_max_object_size": And(int, lambda x: x > 0), "disable_bulldozer": bool, "compute_stats": bool, "coregistration": bool, "coregistration_max_shift": And(Or(int, float), lambda x: x > 0), "save_intermediate_data": bool, } # Check conf checker = Checker(rectification_schema) checker.validate(overloaded_conf) return overloaded_conf
@cars_profile(name="DEM Generation") def run( # pylint: disable=too-many-positional-arguments # noqa: C901 self, dsm_file_name, output_dir, input_geoid, output_geoid, initial_elevation=None, default_alt=0, cars_orchestrator=None, ): """ Run dichotomic dem generation using matches :param dsm_file_name: The dsm path :type dsm_file_name: CarsDataset :param output_dir: directory to save dem :type output_dir: str :param input_geoid: input geoid path :param output_geoid: output geoid path :param dem_roi_to_use: dem roi polygon to use as roi :return: dem data computed with mean, min and max. dem is also saved in disk, and paths are available in attributes. (DEM_MEDIAN_PATH, DEM_MIN_PATH, DEM_MAX_PATH) :rtype: CarsDataset """ # Generate point cloud epsg = 4326 # Optimize the case when input and output geoid are the same if output_geoid is True: input_geoid = False output_geoid = False resolution_in_meters = self.resolution # rasterize point cloud # File is not part of the official product, write it in dump_dir dem_min_path = os.path.join(output_dir, "dem_min.tif") # File is not part of the official product, write it in dump_dir dem_max_path = os.path.join(output_dir, "dem_max.tif") # File is not part of the official product, write it in dump_dir dem_median_path_out = os.path.join(output_dir, "dem_median.tif") dem_median_path_in = dsm_file_name dem_epsg = inputs.rasterio_get_epsg(dsm_file_name) if dem_epsg != epsg: dem_median_path_in = os.path.join(output_dir, "dem_median.tif") reproject_dem(dsm_file_name, "EPSG:4326", dem_median_path_in) with rio.open(dem_median_path_in) as src: dem = src.read(1) profile = src.profile nodata = src.nodata if self.save_intermediate_data: raw_dsm_path = os.path.join(output_dir, "raw_dsm.tif") shutil.copy2(dem_median_path_in, raw_dsm_path) dem_data = dem mask = dem_data == nodata # fill nodata max_search_distance = ( self.fillnodata_max_search_distance + self.morphological_filters_size ) # a margin is added for following morphological operations # pixels further than self.fillnodata_max_search_distance # will later be turned into nodata by eroded_mask dem_data = rio.fill.fillnodata( dem_data, mask=~mask, max_search_distance=max_search_distance, ) not_filled_pixels = dem_data == nodata # Add geoid if input_geoid: with rio.open(input_geoid) as in_geoid: # Reproject the geoid data to match the DSM input_geoid_data = np.empty( dem_data.shape, dtype=in_geoid.dtypes[0] ) logging.info("Reprojection of geoid data") reproject( source=rio.band(in_geoid, 1), destination=input_geoid_data, src_transform=in_geoid.transform, src_crs=in_geoid.crs, dst_transform=profile["transform"], dst_crs=profile["crs"], resampling=Resampling.bilinear, ) dem_data -= input_geoid_data if output_geoid: with rio.open(input_geoid) as in_geoid: # Reproject the geoid data to match the DSM input_geoid_data = np.empty( dem_data.shape, dtype=in_geoid.dtypes[0] ) logging.info("Reprojection of geoid data") reproject( source=rio.band(in_geoid, 1), destination=input_geoid_data, src_transform=in_geoid.transform, src_crs=in_geoid.crs, dst_transform=profile["transform"], dst_crs=profile["crs"], resampling=Resampling.bilinear, ) dem_data += input_geoid_data # apply morphological filters and height margin footprint = skimage.morphology.disk( self.morphological_filters_size // 2, decomposition="sequence" ) logging.info("Generation of DEM min") dem_data[not_filled_pixels] = -nodata dem_min = ( skimage.morphology.erosion(dem_data, footprint=footprint) - self.min_height_margin ) dem_data[not_filled_pixels] = nodata logging.info("Generation of DEM max") dem_max = ( skimage.morphology.dilation(dem_data, footprint=footprint) + self.max_height_margin ) logging.info("Generation of DEM median") dem_median = skimage.filters.median( dem_data, footprint=np.ones( ( self.preprocessing_median_filter_size, self.preprocessing_median_filter_size, ) ), ) footprint = skimage.morphology.disk( self.fillnodata_max_search_distance // 2, decomposition="sequence" ) eroded_mask = skimage.morphology.erosion(mask, footprint=footprint) dem_min[eroded_mask] = nodata dem_median[eroded_mask] = nodata dem_max[eroded_mask] = nodata # Rectify pixels where DEM min < DEM median + min_depth dem_min = np.where( dem_min - dem_median < self.min_dem, dem_median + self.min_dem, dem_min, ) # Rectify pixels where DEM max > DEM median + max_height dem_max = np.where( dem_max - dem_median > self.max_dem, dem_median + self.max_dem, dem_max, ) # save with rio.open(dem_min_path, "w", **profile) as out_dem: out_dem.write(dem_min, 1) with rio.open(dem_median_path_out, "w", **profile) as out_dem: out_dem.write(dem_median, 1) with rio.open(dem_max_path, "w", **profile) as out_dem: out_dem.write(dem_max, 1) dem_filled_path = os.path.join(output_dir, "dem_filled.tif") with rio.open(dem_filled_path, "w", **profile) as out_dem: out_dem.write(dem_data, 1) if self.save_intermediate_data: intermediate_dem_min_path = os.path.join( output_dir, "dem_min_before_downsampling.tif" ) shutil.copy2(dem_min_path, intermediate_dem_min_path) intermediate_dem_max_path = os.path.join( output_dir, "dem_max_before_downsampling.tif" ) shutil.copy2(dem_max_path, intermediate_dem_max_path) intermediate_dem_median_path = os.path.join( output_dir, "dem_median_before_downsampling.tif" ) shutil.copy2(dem_median_path_out, intermediate_dem_median_path) # Downsample dems downsample_dem( dem_median_path_out, scale=self.dem_median_downscale, interpolator="median", median_filter_size=self.postprocessing_median_filter_size, default_alt=default_alt, ) downsample_dem( dem_min_path, scale=self.dem_min_max_downscale, interpolator="min", default_alt=default_alt, ) downsample_dem( dem_max_path, scale=self.dem_min_max_downscale, interpolator="max", default_alt=default_alt, ) downsample_dem( dem_filled_path, scale=self.dem_min_max_downscale, interpolator="nearest", default_alt=default_alt, ) if self.save_intermediate_data: intermediate_dem_min_path = os.path.join( output_dir, "dem_min_before_bulldozer.tif" ) shutil.copy2(dem_min_path, intermediate_dem_min_path) intermediate_dem_max_path = os.path.join( output_dir, "dem_max_before_bulldozer.tif" ) shutil.copy2(dem_max_path, intermediate_dem_max_path) if not self.disable_bulldozer: dem_min_max_res = resolution_in_meters * self.dem_min_max_downscale # Launch Bulldozer on dem min saved_transform = edit_transform( dem_min_path, resolution=dem_min_max_res ) logging.info("Launch Bulldozer on DEM min") temp_output_path = launch_bulldozer( dem_min_path, os.path.join(output_dir, "dem_min_bulldozer"), cars_orchestrator, self.bulldozer_max_object_size, ) if temp_output_path is not None: shutil.copy2(temp_output_path, dem_min_path) edit_transform(dem_min_path, transform=saved_transform) # Inverse dem max and launch bulldozer saved_transform = edit_transform( dem_max_path, resolution=dem_min_max_res ) reverse_dem(dem_max_path) logging.info("Launch Bulldozer on DEM max") temp_output_path = launch_bulldozer( dem_max_path, os.path.join(output_dir, "dem_max_bulldozer"), cars_orchestrator, self.bulldozer_max_object_size, ) if temp_output_path is not None: shutil.copy2(temp_output_path, dem_max_path) reverse_dem(dem_max_path) edit_transform(dem_max_path, transform=saved_transform) else: # Fill nodata def fill_dem_nodata(data): """ Fill nodata """ # Apply max_iter times rasterio fill nodata max_iter = 5 current_iter = 0 while current_iter < max_iter: data = rio.fill.fillnodata( data, mask=data != nodata, max_search_distance=max_search_distance, ) current_iter += 1 if np.sum(data == nodata) == 0: # no nodata left, exit loop continue # fill the rest with median median_value = np.nanmedian(data) data[data == nodata] = median_value return data with rio.open(dem_min_path, "r+", **profile) as out_dem: dem_min = out_dem.read() dem_min = fill_dem_nodata(dem_min) out_dem.write(dem_min) with rio.open(dem_median_path_out, "r+", **profile) as out_dem: dem_median = out_dem.read() dem_median = fill_dem_nodata(dem_median) out_dem.write(dem_median) with rio.open(dem_max_path, "r+", **profile) as out_dem: dem_max = out_dem.read() dem_max = fill_dem_nodata(dem_max) out_dem.write(dem_max) # Check DEM min and max with rio.open(dem_min_path, "r") as in_dem: dem_min = in_dem.read() dem_min_metadata = in_dem.meta nodata = in_dem.nodata with rio.open(dem_max_path, "r") as in_dem: dem_max = in_dem.read() dem_max_metadata = in_dem.meta with rio.open(dem_filled_path, "r") as in_dem: dem_data = in_dem.read() dem_data[dem_data == nodata] = np.nan dem_min[dem_min == nodata] = np.nan dem_max[dem_max == nodata] = np.nan if self.compute_stats: diff = dem_data - dem_min diff = diff[dem_data != 0] logging.info( "Statistics of difference between subsampled " "DSM and DEM min (in meters)" ) compute_stats(diff) diff = dem_max - dem_data diff = diff[dem_data != 0] logging.info( "Statistics of difference between DEM max " "and subsampled DSM (in meters)" ) compute_stats(diff) diff = dem_max - dem_min diff = diff[dem_data != 0] logging.info( "Statistics of difference between DEM max " "and DEM min (in meters)" ) compute_stats(diff) # Rectify pixels where DEM min > DEM - margin dem_min = np.where( dem_min > dem_data - self.min_height_margin, dem_data - self.min_height_margin, dem_min, ) # Rectify pixels where DEM max < DEM + margin dem_max = np.where( dem_max < dem_data + self.max_height_margin, dem_data + self.max_height_margin, dem_max, ) # Rectify pixels where DEM min > DEM max - margin, to ensure that # DEM min < DEM max even on filled pixels dem_min = np.where( dem_min > dem_max - self.min_height_margin, dem_max - self.min_height_margin, dem_min, ) with rio.open(dem_min_path, "w", **dem_min_metadata) as out_dem: out_dem.write(dem_min) with rio.open(dem_max_path, "w", **dem_max_metadata) as out_dem: out_dem.write(dem_max) paths = { "dem_median": dem_median_path_out, "dem_min": dem_min_path, "dem_max": dem_max_path, } # after saving, fit initial elevation if required if initial_elevation is not None and self.coregistration: initial_elevation_out_path = os.path.join( output_dir, "initial_elevation_fit.tif" ) coreg_offsets = fit_initial_elevation_on_dem_median( initial_elevation, dem_median_path_out, initial_elevation_out_path, ) coreg_info = { application_constants.APPLICATION_TAG: { "dem_generation": {"coregistration": coreg_offsets} } } # save the coreg shift info in cars's main orchestrator cars_orchestrator.update_out_info(coreg_info) if ( coreg_offsets is None or abs(coreg_offsets["shift_x"]) > self.coregistration_max_shift or abs(coreg_offsets["shift_y"]) > self.coregistration_max_shift ): logging.warning( "The initial elevation will be used as-is because " "coregistration failed or gave inconsistent results" ) return dem, paths, None return dem, paths, initial_elevation_out_path return dem, paths, None