#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the dichotomic dem generation application class.
"""
import contextlib
import logging
# Standard imports
import os
import shutil
# Third party imports
import numpy as np
import pandas
import rasterio as rio
import skimage
import xarray as xr
import yaml
from affine import Affine
from bulldozer.pipeline.bulldozer_pipeline import dsm_to_dtm
from json_checker import And, Checker, Or
from rasterio.enums import Resampling
from rasterio.warp import reproject
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.application import Application
from cars.applications.dem_generation import (
dem_generation_constants as dem_gen_cst,
)
from cars.applications.dem_generation.dem_generation import DemGeneration
from cars.applications.dem_generation.dem_generation_tools import (
fit_initial_elevation_on_dem_median,
)
# CARS imports
from cars.core import preprocessing, projection, tiling
from cars.data_structures import cars_dataset
from cars.orchestrator.cluster.log_wrapper import cars_profile
[docs]class Rasterization(DemGeneration, short_name="bulldozer_on_raster"):
"""
Rasterization
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, conf=None):
"""
Init function of Rasterization
:param conf: configuration for Rasterization
:return: an application_to_use object
"""
super().__init__(conf=conf)
# check conf
self.used_method = self.used_config["method"]
self.resolution = self.used_config["resolution"]
self.margin = self.used_config["margin"]
height_margin = self.used_config["height_margin"]
if isinstance(height_margin, list):
self.min_height_margin = height_margin[0]
self.max_height_margin = height_margin[1]
else:
self.min_height_margin = height_margin
self.max_height_margin = height_margin
self.morphological_filters_size = self.used_config[
"morphological_filters_size"
]
self.median_filter_size = self.used_config["median_filter_size"]
self.dem_median_output_resolution = self.used_config[
"dem_median_output_resolution"
]
self.fillnodata_max_search_distance = self.used_config[
"fillnodata_max_search_distance"
]
self.min_dem = self.used_config["min_dem"]
self.max_dem = self.used_config["max_dem"]
self.bulldozer_max_object_size = self.used_config[
"bulldozer_max_object_size"
]
self.compute_stats = self.used_config["compute_stats"]
self.coregistration = self.used_config["coregistration"]
self.coregistration_max_shift = self.used_config[
"coregistration_max_shift"
]
self.save_intermediate_data = self.used_config["save_intermediate_data"]
# Init orchestrator
self.orchestrator = None
[docs] def check_conf(self, conf):
"""
Check configuration
:param conf: configuration to check
:type conf: dict
:return: overloaded configuration
:rtype: dict
"""
# init conf
if conf is not None:
overloaded_conf = conf.copy()
else:
conf = {}
overloaded_conf = {}
# Overload conf
overloaded_conf["method"] = conf.get("method", "bulldozer_on_raster")
overloaded_conf["resolution"] = conf.get("resolution", 2)
overloaded_conf["margin"] = conf.get("margin", 500)
overloaded_conf["morphological_filters_size"] = conf.get(
"morphological_filters_size", 30
)
overloaded_conf["median_filter_size"] = conf.get(
"median_filter_size", 5
)
overloaded_conf["dem_median_output_resolution"] = conf.get(
"dem_median_output_resolution", 30
)
overloaded_conf["fillnodata_max_search_distance"] = conf.get(
"fillnodata_max_search_distance", 50
)
overloaded_conf["min_dem"] = conf.get("min_dem", -500)
overloaded_conf["max_dem"] = conf.get("max_dem", 1000)
overloaded_conf["height_margin"] = conf.get("height_margin", 20)
overloaded_conf["bulldozer_max_object_size"] = conf.get(
"bulldozer_max_object_size", 8
)
overloaded_conf["compute_stats"] = conf.get("compute_stats", True)
overloaded_conf["coregistration"] = conf.get("coregistration", True)
overloaded_conf["coregistration_max_shift"] = conf.get(
"coregistration_max_shift", 180
)
overloaded_conf[application_constants.SAVE_INTERMEDIATE_DATA] = (
conf.get(application_constants.SAVE_INTERMEDIATE_DATA, False)
)
rectification_schema = {
"method": str,
"resolution": And(Or(float, int), lambda x: x > 0),
application_constants.SAVE_INTERMEDIATE_DATA: bool,
"margin": And(Or(float, int), lambda x: x > 0),
"morphological_filters_size": And(int, lambda x: x > 0),
"median_filter_size": And(int, lambda x: x > 0),
"dem_median_output_resolution": And(int, lambda x: x > 0),
"fillnodata_max_search_distance": And(int, lambda x: x > 0),
"min_dem": And(Or(int, float), lambda x: x < 0),
"max_dem": And(Or(int, float), lambda x: x > 0),
"height_margin": And(Or(float, int), lambda x: x > 0),
"bulldozer_max_object_size": And(int, lambda x: x > 0),
"compute_stats": bool,
"coregistration": bool,
"coregistration_max_shift": And(Or(int, float), lambda x: x > 0),
"save_intermediate_data": bool,
}
# Check conf
checker = Checker(rectification_schema)
checker.validate(overloaded_conf)
return overloaded_conf
@cars_profile(name="DEM Generation")
def run(
self,
triangulated_matches_list,
output_dir,
geoid_path,
dem_roi_to_use=None,
initial_elevation=None,
cars_orchestrator=None,
):
"""
Run dichotomic dem generation using matches
:param triangulated_matches_list: list of triangulated matches
positions must be in a metric system
:type triangulated_matches_list: list(pandas.Dataframe)
:param output_dir: directory to save dem
:type output_dir: str
:param geoid_path: geoid path
:param dem_roi_to_use: dem roi polygon to use as roi
:return: dem data computed with mean, min and max.
dem is also saved in disk, and paths are available in attributes.
(DEM_MEDIAN_PATH, DEM_MIN_PATH, DEM_MAX_PATH)
:rtype: CarsDataset
"""
# Create sequential orchestrator for savings
self.orchestrator = ocht.Orchestrator(
orchestrator_conf={"mode": "sequential"}
)
# Generate point cloud
epsg = 4326
for pair_pc in triangulated_matches_list:
# convert to degrees for geoid offset
if pair_pc.attrs["epsg"] != epsg:
projection.point_cloud_conversion_dataset(pair_pc, epsg)
merged_point_cloud = pandas.concat(
triangulated_matches_list,
ignore_index=True,
sort=False,
)
merged_point_cloud.attrs["epsg"] = epsg
# Get bounds
if dem_roi_to_use is not None:
bounds_poly = dem_roi_to_use.bounds
xmin = min(bounds_poly[0], bounds_poly[2])
xmax = max(bounds_poly[0], bounds_poly[2])
ymin = min(bounds_poly[1], bounds_poly[3])
ymax = max(bounds_poly[1], bounds_poly[3])
else:
# Get min max with margin
mins = merged_point_cloud.min(skipna=True)
maxs = merged_point_cloud.max(skipna=True)
xmin = mins["x"]
ymin = mins["y"]
xmax = maxs["x"]
ymax = maxs["y"]
bounds_cloud = [xmin, ymin, xmax, ymax]
# Convert resolution and margin to degrees
utm_epsg = preprocessing.get_utm_zone_as_epsg_code(xmin, ymin)
conversion_factor = preprocessing.get_conversion_factor(
bounds_cloud, epsg, utm_epsg
)
margin_in_degrees = self.margin * conversion_factor
resolution_in_meters = self.resolution
resolution_in_degrees = self.resolution * conversion_factor
# Get borders, adding margin
xmin = xmin - margin_in_degrees
ymin = ymin - margin_in_degrees
xmax = xmax + margin_in_degrees
ymax = ymax + margin_in_degrees
# rasterize point cloud
dem_min_path = os.path.join(output_dir, "dem_min.tif")
dem_median_path = os.path.join(output_dir, "dem_median.tif")
dem_max_path = os.path.join(output_dir, "dem_max.tif")
point_clouds = cars_dataset.CarsDataset("points")
terrain_tiling_grid = tiling.generate_tiling_grid(
xmin,
ymin,
xmax,
ymax,
xmax - xmin,
ymax - ymin,
)
point_clouds.tiling_grid = terrain_tiling_grid
point_clouds.tiles[0][0] = merged_point_cloud
point_clouds.attributes["bounds"] = [xmin, ymin, xmax, ymax]
# Init rasterization app
rasterization_application = Application("point_cloud_rasterization")
dem = rasterization_application.run(
point_clouds,
epsg,
resolution=resolution_in_degrees,
orchestrator=self.orchestrator,
dsm_file_name=dem_median_path,
)
self.orchestrator.breakpoint()
if self.save_intermediate_data:
raw_dsm_path = os.path.join(output_dir, "raw_dsm.tif")
shutil.copy2(dem_median_path, raw_dsm_path)
dem_data = dem[0, 0]["hgt"].data
nodata = rasterization_application.dsm_no_data
mask = dem_data == nodata
# fill nodata
max_search_distance = (
self.fillnodata_max_search_distance
+ self.morphological_filters_size
)
# a margin is added for following morphological operations
# pixels further than self.fillnodata_max_search_distance
# will later be turned into nodata by eroded_mask
dem_data = rio.fill.fillnodata(
dem_data,
mask=~mask,
max_search_distance=max_search_distance,
)
not_filled_pixels = dem_data == nodata
# Add geoid
with rio.open(geoid_path) as in_geoid:
# Reproject the geoid data to match the DSM
input_geoid_data = np.empty(
dem_data.shape, dtype=in_geoid.dtypes[0]
)
logging.info("Reprojection of geoid data")
reproject(
source=rio.band(in_geoid, 1),
destination=input_geoid_data,
src_transform=in_geoid.transform,
src_crs=in_geoid.crs,
dst_transform=dem[0, 0].profile["transform"],
dst_crs=dem[0, 0].profile["crs"],
resampling=Resampling.bilinear,
)
dem_data -= input_geoid_data
# apply morphological filters and height margin
footprint = skimage.morphology.disk(
self.morphological_filters_size // 2, decomposition="sequence"
)
logging.info("Generation of DEM min")
dem_data[not_filled_pixels] = -nodata
dem_min = (
skimage.morphology.erosion(dem_data, footprint=footprint)
- self.min_height_margin
)
dem_data[not_filled_pixels] = nodata
logging.info("Generation of DEM max")
dem_max = (
skimage.morphology.dilation(dem_data, footprint=footprint)
+ self.max_height_margin
)
logging.info("Generation of DEM median")
dem_median = skimage.filters.median(
dem_data,
footprint=np.ones(
(self.median_filter_size, self.median_filter_size)
),
)
footprint = skimage.morphology.disk(
self.fillnodata_max_search_distance // 2, decomposition="sequence"
)
eroded_mask = skimage.morphology.binary_erosion(
mask, footprint=footprint
)
dem_min[eroded_mask] = nodata
dem_median[eroded_mask] = nodata
dem_max[eroded_mask] = nodata
# Rectify pixels where DEM min < DEM - min_depth
dem_min = np.where(
dem_median - dem_min < self.min_dem,
dem_median + self.min_dem,
dem_min,
)
# Rectify pixels where DEM max > DEM + max_height
dem_max = np.where(
dem_max - dem_median > self.max_dem,
dem_median + self.max_dem,
dem_max,
)
# saving infos
# dem mean
dem_median_path = os.path.join(output_dir, "dem_median.tif")
self.orchestrator.add_to_save_lists(
dem_median_path,
dem_gen_cst.DEM_MEDIAN,
dem,
dtype=np.float32,
nodata=-32768,
cars_ds_name="dem_median",
)
dem.attributes[dem_gen_cst.DEM_MEDIAN_PATH] = dem_median_path
# dem min
dem_min_path = os.path.join(output_dir, "dem_min.tif")
self.orchestrator.add_to_save_lists(
dem_min_path,
dem_gen_cst.DEM_MIN,
dem,
dtype=np.float32,
nodata=-32768,
cars_ds_name="dem_min",
)
dem.attributes[dem_gen_cst.DEM_MIN_PATH] = dem_min_path
# dem max
dem_max_path = os.path.join(output_dir, "dem_max.tif")
self.orchestrator.add_to_save_lists(
dem_max_path,
dem_gen_cst.DEM_MAX,
dem,
dtype=np.float32,
nodata=-32768,
cars_ds_name="dem_max",
)
dem.attributes[dem_gen_cst.DEM_MAX_PATH] = dem_max_path
# Generate dataset
dem_min_xr = dem[0, 0]["hgt"].copy()
dem_min_xr.data = dem_min
dem_median_xr = dem[0, 0]["hgt"].copy()
dem_median_xr.data = dem_median
dem_max_xr = dem[0, 0]["hgt"].copy()
dem_max_xr.data = dem_max
dem_tile = xr.Dataset(
data_vars={
"dem_min": dem_min_xr,
"dem_median": dem_median_xr,
"dem_max": dem_max_xr,
}
)
cars_dataset.fill_dataset(
dem_tile,
saving_info=dem[0, 0].saving_info,
window=dem[0, 0].window,
profile=dem[0, 0].profile,
attributes=None,
overlaps=None,
)
dem[0, 0] = dem_tile
# Save
self.orchestrator.breakpoint()
# cleanup
self.orchestrator.cleanup()
if self.save_intermediate_data:
intermediate_dem_min_path = os.path.join(
output_dir, "dem_min_before_bulldozer.tif"
)
shutil.copy2(dem_min_path, intermediate_dem_min_path)
intermediate_dem_max_path = os.path.join(
output_dir, "dem_max_before_bulldozer.tif"
)
shutil.copy2(dem_max_path, intermediate_dem_max_path)
intermediate_dem_median_path = os.path.join(
output_dir, "dem_median_before_downsampling.tif"
)
shutil.copy2(dem_median_path, intermediate_dem_median_path)
# Downsample median dem
downsample_dem(
dem_median_path,
scale=self.dem_median_output_resolution / resolution_in_meters,
)
# Launch Bulldozer on dem min
saved_transform = edit_transform(
dem_min_path, resolution=resolution_in_meters
)
temp_output_path = launch_bulldozer(
dem_min_path,
os.path.join(output_dir, "dem_min_bulldozer"),
cars_orchestrator,
self.bulldozer_max_object_size,
)
if temp_output_path is not None:
shutil.copy2(temp_output_path, dem_min_path)
edit_transform(dem_min_path, transform=saved_transform)
# Inverse dem max and launch bulldozer
saved_transform = edit_transform(
dem_max_path, resolution=resolution_in_meters
)
reverse_dem(dem_max_path)
temp_output_path = launch_bulldozer(
dem_max_path,
os.path.join(output_dir, "dem_max_bulldozer"),
cars_orchestrator,
self.bulldozer_max_object_size,
)
if temp_output_path is not None:
shutil.copy2(temp_output_path, dem_max_path)
reverse_dem(dem_max_path)
edit_transform(dem_max_path, transform=saved_transform)
# Check DEM min and max
with rio.open(dem_min_path, "r") as in_dem:
dem_min = in_dem.read()
dem_min_metadata = in_dem.meta
nodata = in_dem.nodata
with rio.open(dem_max_path, "r") as in_dem:
dem_max = in_dem.read()
dem_max_metadata = in_dem.meta
dem_data[dem_data == nodata] = np.nan
dem_min[dem_min == nodata] = np.nan
dem_max[dem_max == nodata] = np.nan
if self.compute_stats:
diff = dem_data - dem_min
logging.info(
"Statistics of difference between subsampled "
"DSM and DEM min (in meters)"
)
compute_stats(diff)
diff = dem_max - dem_data
logging.info(
"Statistics of difference between DEM max "
"and subsampled DSM (in meters)"
)
compute_stats(diff)
diff = dem_max - dem_min
logging.info(
"Statistics of difference between DEM max "
"and DEM min (in meters)"
)
compute_stats(diff)
# Rectify pixels where DEM min > DEM - margin
dem_min = np.where(
dem_min > dem_data - self.min_height_margin,
dem_data - self.min_height_margin,
dem_min,
)
# Rectify pixels where DEM max < DEM + margin
dem_max = np.where(
dem_max < dem_data + self.max_height_margin,
dem_data + self.max_height_margin,
dem_max,
)
with rio.open(dem_min_path, "w", **dem_min_metadata) as out_dem:
out_dem.write(dem_min)
with rio.open(dem_max_path, "w", **dem_max_metadata) as out_dem:
out_dem.write(dem_max)
# after saving, fit initial elevation if required
if initial_elevation is not None and self.coregistration:
initial_elevation_out_path = os.path.join(
output_dir, "initial_elevation_fit.tif"
)
coreg_offsets = fit_initial_elevation_on_dem_median(
initial_elevation, dem_median_path, initial_elevation_out_path
)
coreg_info = {
application_constants.APPLICATION_TAG: {
"dem_generation": {"coregistration": coreg_offsets}
}
}
# save the coreg shift info in cars's main orchestrator
cars_orchestrator.update_out_info(coreg_info)
if (
coreg_offsets is None
or abs(coreg_offsets["shift_x"]) > self.coregistration_max_shift
or abs(coreg_offsets["shift_y"]) > self.coregistration_max_shift
):
logging.warning(
"The initial elevation will be used as-is because "
"coregistration failed or gave inconsistent results"
)
return dem, None
return dem, initial_elevation_out_path
return dem, None
[docs]def reverse_dem(input_dem):
"""
Compute the opposite of a DEM :
Altitudes sign is changed
:param input_dem: path of DEM to reverse
:type input_dem: str
"""
with rio.open(input_dem, "r") as in_dem:
data = in_dem.read()
metadata = in_dem.meta
nodata = in_dem.nodata
with rio.open(input_dem, "w", **metadata) as out_dem:
out_dem.write(-data)
out_dem.nodata = -nodata
[docs]def downsample_dem(input_dem, scale):
"""
Downsample median DEM with median resampling
:param input_dem: path of DEM to downsample (only one band)
:type input_dem: str
"""
with rio.open(input_dem) as in_dem:
data = in_dem.read(1)
metadata = in_dem.meta
src_transform = in_dem.transform
width = in_dem.width
height = in_dem.height
crs = in_dem.crs
nodata = in_dem.nodata
dst_transform = src_transform * Affine.scale(scale)
dst_height = int(height // scale) + 1
dst_width = int(width // scale) + 1
metadata["transform"] = dst_transform
metadata["height"] = dst_height
metadata["width"] = dst_width
output = np.zeros((dst_height, dst_width))
reproject(
data,
output,
src_transform=src_transform,
src_crs=crs,
dst_transform=dst_transform,
dst_crs=crs,
dst_nodata=nodata,
resampling=Resampling.med,
)
with rio.open(input_dem, "w", **metadata) as dst:
dst.write(output, 1)
[docs]def launch_bulldozer(
input_dem,
output_dir,
cars_orchestrator,
max_object_size,
):
"""
Launch bulldozer on a DEM to smooth it
:param input_dem: path of DEM to reverse
:type input_dem: str
:param output_dir: directory where bulldozer output is dumped
:type output_dir: str
:param cars_orchestrator: orchestrator of the whole pipeline
(used to get number of workers)
:type cars_orchestrator: Orchestrator
:param max_object_size: bulldozer parameter "max_object_size"
:type max_object_size: int
"""
bull_conf_path = os.path.join(
os.path.dirname(__file__), "bulldozer_config/base_config.yaml"
)
with open(bull_conf_path, "r", encoding="utf8") as bull_conf_file:
bull_conf = yaml.safe_load(bull_conf_file)
bull_conf["dsm_path"] = input_dem
bull_conf["output_dir"] = output_dir
if cars_orchestrator is not None:
if (
cars_orchestrator.get_conf()["mode"] == "multiprocessing"
or cars_orchestrator.get_conf()["mode"] == "local_dask"
):
bull_conf["nb_max_workers"] = cars_orchestrator.get_conf()[
"nb_workers"
]
else:
bull_conf["nb_max_workers"] = 4
bull_conf["max_object_size"] = max_object_size
os.makedirs(output_dir, exist_ok=True)
bull_conf_path = os.path.join(output_dir, "bulldozer_config.yaml")
with open(bull_conf_path, "w", encoding="utf8") as bull_conf_file:
yaml.dump(bull_conf, bull_conf_file)
output_dem = os.path.join(bull_conf["output_dir"], "dtm.tif")
try:
try:
# suppress prints in bulldozer by redirecting stdout&stderr
with open(os.devnull, "w", encoding="utf8") as devnull:
with (
contextlib.redirect_stdout(devnull),
contextlib.redirect_stderr(devnull),
):
dsm_to_dtm(bull_conf_path)
except Exception:
logging.info("Bulldozer failed on its first execution. Retrying")
# suppress prints in bulldozer by redirecting stdout&stderr
with open(os.devnull, "w", encoding="utf8") as devnull:
with (
contextlib.redirect_stdout(devnull),
contextlib.redirect_stderr(devnull),
):
dsm_to_dtm(bull_conf_path)
except Exception:
logging.error(
"Bulldozer failed on its second execution."
+ " The DSM could not be smoothed."
)
output_dem = None
return output_dem
[docs]def compute_stats(diff):
"""
Compute and display statistics of difference between two DEM :
Minimum, median, percentiles and maximum
:param diff: altimetric difference between two DEM
:type diff: numpy.array
"""
mini = ("Min", np.nanmin(diff))
median = ("Median", np.nanmedian(diff))
p90 = ("p90", np.nanpercentile(diff, 90))
p95 = ("p95", np.nanpercentile(diff, 95))
p99 = ("p99", np.nanpercentile(diff, 99))
maxi = ("Max", np.nanmax(diff))
logging.info( # pylint: disable=logging-fstring-interpolation
f"| {mini[0]:6} | {median[0]:6} | {p90[0]:6} | "
f"{p95[0]:6} | {p99[0]:6} | {maxi[0]:6} |"
)
logging.info( # pylint: disable=logging-fstring-interpolation
f"| {mini[1]:6.2f} | {median[1]:6.2f} | {p90[1]:6.2f} | "
f"{p95[1]:6.2f} | {p99[1]:6.2f} | {maxi[1]:6.2f} |"
)