Source code for cars.applications.dtm_generation.bulldozer_app

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the bulldozer dsm filling application class.
"""

import contextlib
import logging
import os
from pathlib import Path

import rasterio as rio
import yaml
from bulldozer.pipeline.bulldozer_pipeline import dsm_to_dtm
from json_checker import Checker

import cars.orchestrator.orchestrator as ocht
from cars.applications.dem_generation.dem_generation_wrappers import (
    edit_transform,
)
from cars.core import preprocessing
from cars.orchestrator.cluster.log_wrapper import cars_profile

from .abstract_dtm_generation_app import DtmGeneration


[docs] class Bulldozer(DtmGeneration, short_name="bulldozer"): """ Bulldozer app """ def __init__(self, conf=None): """ Init function of Bulldozer :param conf: configuration for Bulldozer :return: an application_to_use object """ super().__init__(conf=conf) # check conf self.used_method = self.used_config["method"] self.save_intermediate_data = self.used_config["save_intermediate_data"]
[docs] def check_conf(self, conf): # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "bulldozer") overloaded_conf["save_intermediate_data"] = conf.get( "save_intermediate_data", False ) rectification_schema = { "method": str, "save_intermediate_data": bool, } # Check conf checker = Checker(rectification_schema) checker.validate(overloaded_conf) return overloaded_conf
@cars_profile(name="Bulldozer filling") def run( # pylint: disable=too-many-positional-arguments # noqa C901 self, dsm_file, dump_dir, orchestrator=None, dsm_dir=None, ): """ Run bulldozer to get the DTM roi_poly can any of these objects : - a list of Shapely Polygons - a Shapely Polygon """ if orchestrator is None: orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) if not os.path.exists(dump_dir): os.makedirs(dump_dir) # create the config for the bulldozer execution bull_conf_path = os.path.join( os.path.dirname(__file__), "bulldozer_config/base_config.yaml" ) with open(bull_conf_path, "r", encoding="utf8") as bull_conf_file: bull_conf = yaml.safe_load(bull_conf_file) bull_conf["dsm_path"] = dsm_file bull_conf["output_dir"] = os.path.join(dump_dir, "bulldozer") if orchestrator is not None: if ( orchestrator.get_conf()["mode"] == "multiprocessing" or orchestrator.get_conf()["mode"] == "local_dask" ): bull_conf["nb_max_workers"] = orchestrator.get_conf()[ "nb_workers" ] bull_conf_path = os.path.join(dump_dir, "bulldozer_config.yaml") with open(bull_conf_path, "w", encoding="utf8") as bull_conf_file: yaml.dump(bull_conf, bull_conf_file) # Modify DSM for dtm to be used with rio.open(dsm_file) as in_dsm: dsm_tr = in_dsm.transform dsm_crs = in_dsm.crs dsm_bounds = in_dsm.bounds saved_transform = None if dsm_crs.is_geographic: xmin = dsm_bounds.left ymin = dsm_bounds.bottom utm_epsg = preprocessing.get_utm_zone_as_epsg_code(xmin, ymin) conversion_factor = preprocessing.get_conversion_factor( dsm_bounds, utm_epsg, dsm_crs.to_epsg() ) resolution = dsm_tr.a * conversion_factor saved_transform = edit_transform(dsm_file, resolution=resolution) dtm_path = os.path.join(bull_conf["output_dir"], "dtm.tif") # Launch Bulldozer try: try: # suppress prints in bulldozer by redirecting stdout&stderr with open(os.devnull, "w", encoding="utf8") as devnull: with ( contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull), ): dsm_to_dtm(bull_conf_path) except Exception: logging.info( "Bulldozer failed on its first execution. Retrying" ) # suppress prints in bulldozer by redirecting stdout&stderr with open(os.devnull, "w", encoding="utf8") as devnull: with ( contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull), ): dsm_to_dtm(bull_conf_path) except Exception: logging.warning( "Bulldozer failed on its second execution." + " The DSM could not be filled." ) # Change back dsm and dtm to previous ref if saved_transform is not None: edit_transform(dtm_path, transform=saved_transform) edit_transform(dsm_file, transform=saved_transform) for element in Path(bull_conf["output_dir"]).iterdir(): if element.name == "dtm.tif": os.replace(element, Path(dsm_dir) / element.name)