#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the exogenous dsm filling application class.
"""
import logging
import os
import shutil
import numpy as np
import rasterio as rio
import xarray as xr
from json_checker import Checker, Or
from pyproj import CRS
from rasterio.enums import Resampling
from rasterio.warp import reproject
from rasterio.windows import Window
from rasterio.windows import transform as row_col_to_coords
from shapely import Polygon
import cars.orchestrator.orchestrator as ocht
from cars.core import projection, tiling
from cars.data_structures import cars_dataset
from cars.orchestrator.cluster.log_wrapper import cars_profile
from .abstract_dsm_filling_app import DsmFilling
[docs]
class ExogenousFilling(DsmFilling, short_name="exogenous_filling"):
"""
Exogenous filling
"""
def __init__(self, conf=None):
"""
Init function of ExogenousFilling
:param conf: configuration for ExogenousFilling
:return: an application_to_use object
"""
super().__init__(conf=conf)
# check conf
self.used_method = self.used_config["method"]
self.fill_classification = self.used_config["fill_classification"]
self.fill_nodata = self.used_config["fill_nodata"]
self.fill_with_geoid = self.used_config["fill_with_geoid"]
self.interpolation_method = self.used_config["interpolation_method"]
self.save_intermediate_data = self.used_config["save_intermediate_data"]
[docs]
def check_conf(self, conf):
# init conf
if conf is not None:
overloaded_conf = conf.copy()
else:
conf = {}
overloaded_conf = {}
# Overload conf
overloaded_conf["method"] = conf.get("method", "exogenous_filling")
overloaded_conf["fill_classification"] = conf.get(
"fill_classification", "nodata"
)
if isinstance(overloaded_conf["fill_classification"], str):
overloaded_conf["fill_classification"] = [
overloaded_conf["fill_classification"]
]
overloaded_conf["fill_nodata"] = conf.get("fill_nodata", None)
if isinstance(overloaded_conf["fill_nodata"], str):
overloaded_conf["fill_nodata"] = [overloaded_conf["fill_nodata"]]
overloaded_conf["fill_with_geoid"] = conf.get("fill_with_geoid", None)
overloaded_conf["interpolation_method"] = conf.get(
"interpolation_method", "bilinear"
)
if overloaded_conf["interpolation_method"] not in ["bilinear", "cubic"]:
# pylint: disable=inconsistent-quotes
raise RuntimeError(
f"Invalid interpolation method"
f"{overloaded_conf['interpolation_method']}, "
f"supported modes are bilinear and cubic."
)
overloaded_conf["save_intermediate_data"] = conf.get(
"save_intermediate_data", False
)
rectification_schema = {
"method": str,
"fill_classification": Or(None, [str]),
"fill_nodata": Or(None, [str]),
"fill_with_geoid": Or(None, [str]),
"interpolation_method": str,
"save_intermediate_data": bool,
}
# Check conf
checker = Checker(rectification_schema)
checker.validate(overloaded_conf)
return overloaded_conf
@cars_profile(name="Exogeneous filling")
def run( # pylint: disable=too-many-positional-arguments # noqa C901
self,
dsm_file,
classif_file,
filling_file,
invalidity_mask_file,
classif_values,
dump_dir,
roi_polys,
roi_epsg,
output_geoid,
geom_plugin,
dsm_dir=None,
tile_size=10000,
orchestrator=None,
):
"""
Run dsm filling using initial elevation and the current dsm
Replaces dsm.tif by the filled dsm. Adds a new band
to filling.tif if it exists.
The old dsm is saved in dump_dir.
roi_poly can any of these objects :
- a list of Shapely Polygons
- a Shapely Polygon
"""
if orchestrator is None:
orchestrator = ocht.Orchestrator(
orchestrator_conf={"mode": "sequential"}
)
if dsm_dir is not None:
dsm_path_out = os.path.join(dsm_dir, "dsm.tif")
filling_path_out = os.path.join(dsm_dir, "filling.tif")
else:
dsm_path_out = dsm_file
filling_path_out = filling_file
if self.fill_classification is None:
self.fill_classification = ["nodata"]
if self.fill_with_geoid is None:
self.fill_with_geoid = []
interpolation_methods_dict = {
"bilinear": Resampling.bilinear,
"cubic": Resampling.cubic,
}
interpolation_method = interpolation_methods_dict.get(
self.interpolation_method, Resampling.bilinear
)
if geom_plugin is None:
logging.error(
"No DEM was provided, exogenous_filling will not run."
)
return None
if not os.path.exists(dump_dir):
os.makedirs(dump_dir)
# get dsm to be filled and its metadata
with rio.open(dsm_file) as in_dsm:
profile = in_dsm.profile
# Transform to wtk for serialization.
profile["crs"] = profile["crs"].to_wkt()
height = in_dsm.height
width = in_dsm.width
dsm_dtype = in_dsm.dtypes[0]
nodata_value = in_dsm.nodata
filled_dsm_cars_ds = cars_dataset.CarsDataset(
"arrays", name="Monoband Filling"
)
# Compute tiling grid
filled_dsm_cars_ds.tiling_grid = tiling.generate_tiling_grid(
0,
0,
height,
width,
tile_size,
tile_size,
)
# Saving infos
[
saving_info,
] = orchestrator.get_saving_infos([filled_dsm_cars_ds])
# save list
orchestrator.add_to_save_lists(
dsm_path_out,
"exogenous_filled_dsm",
filled_dsm_cars_ds,
dtype=dsm_dtype,
nodata=nodata_value,
optional_data=False,
cars_ds_name="exogenous_filled_dsm",
)
if filling_file is not None:
with rio.open(filling_file, "r") as src:
filling_dtype = src.dtypes[0]
filling_nodata_value = src.nodata
# count will be count += 1
band_description = [
(i + 1, src.descriptions[i]) for i in range(src.count)
]
band_description.append(
(len(band_description) + 1, "filling_exogenous")
)
orchestrator.add_to_save_lists(
filling_path_out,
"exogenous_filled_filling",
filled_dsm_cars_ds,
dtype=filling_dtype,
nodata=filling_nodata_value,
optional_data=False,
cars_ds_name="exogenous_filled_filling",
rio_band_description=band_description,
)
old_dsm_path = os.path.join(dump_dir, "dsm_not_filled.tif")
new_dsm_path = os.path.join(dump_dir, "dsm_filled.tif")
old_filling_path = None
# Save old dsm
shutil.copy(dsm_file, old_dsm_path)
if filling_file is not None:
old_filling_path = os.path.join(dump_dir, "filling_not_filled.tif")
shutil.copy(filling_file, old_filling_path)
if self.save_intermediate_data:
# save new
orchestrator.add_to_save_lists(
new_dsm_path,
"exogenous_filled_dsm",
filled_dsm_cars_ds,
dtype=dsm_dtype,
nodata=nodata_value,
optional_data=False,
cars_ds_name="exogenous_filled_dsm",
)
# saved reprojected dem / intial elevation
reprojected_dem_path = os.path.join(dump_dir, "reprojected_dem.tif")
orchestrator.add_to_save_lists(
reprojected_dem_path,
"reprojected_dem",
filled_dsm_cars_ds,
dtype=dsm_dtype,
nodata=nodata_value,
optional_data=False,
cars_ds_name="reprojected_dem",
)
# saved reprojected input geoid
reprojected_input_geoid_path = os.path.join(
dump_dir, "reprojected_input_geoid.tif"
)
orchestrator.add_to_save_lists(
reprojected_input_geoid_path,
"reprojected_input_geoid",
filled_dsm_cars_ds,
dtype=dsm_dtype,
nodata=nodata_value,
optional_data=False,
cars_ds_name="reprojected_input_geoid",
)
if output_geoid not in (False, None):
# saved reprojected input geoid
reprojected_output_geoid_path = os.path.join(
dump_dir, "reprojected_output_geoid.tif"
)
orchestrator.add_to_save_lists(
reprojected_output_geoid_path,
"reprojected_output_geoid",
filled_dsm_cars_ds,
dtype=dsm_dtype,
nodata=nodata_value,
optional_data=False,
cars_ds_name="reprojected_output_geoid",
)
for row in range(filled_dsm_cars_ds.shape[0]):
for col in range(filled_dsm_cars_ds.shape[1]):
# update saving infos for potential replacement
full_saving_info = ocht.update_saving_infos(
saving_info, row=row, col=col
)
window = filled_dsm_cars_ds.get_window_as_dict(row, col)
# Compute images
(
filled_dsm_cars_ds[row, col]
) = orchestrator.cluster.create_task(
exogenous_filling_wrapper, nout=1
)(
old_dsm_path,
old_filling_path,
classif_file,
invalidity_mask_file,
classif_values,
geom_plugin.dem,
geom_plugin.geoid,
output_geoid,
roi_polys,
roi_epsg,
self.fill_with_geoid,
interpolation_method,
self.fill_classification,
self.fill_nodata,
window=window,
saving_info=full_saving_info,
profile=profile,
)
return filled_dsm_cars_ds
[docs]
def exogenous_filling_wrapper( # noqa C901 # pylint: disable=R0917
dsm_file,
filling_file,
classif_file,
invalidity_mask_file,
classif_values,
geo_plugin_dem,
geo_plugin_geoid,
output_geoid,
roi_polys,
roi_epsg,
fill_with_geoid,
interpolation_method,
fill_classification,
fill_nodata,
window=None,
saving_info=None,
profile=None,
):
"""
Wrapper for exogenous filling, to be applied on each tile of the DSM.
:param dsm_file: dsm file to fill
:param filling_file: filling file
:param classif_file: classification file
:param geo_plugin_dem: initial elevation dem
:param geo_plugin_geoid: initial elevation geoid
:param output_geoid: output geoid
:return: filled dsm xarray dataset
"""
# Get rasterio window
col_min = window["col_min"]
row_min = window["row_min"]
col_max = window["col_max"]
row_max = window["row_max"]
rasterio_window = Window(
col_off=col_min,
row_off=row_min,
width=(col_max - col_min),
height=(row_max - row_min),
)
with rio.open(dsm_file) as in_dsm:
dsm = in_dsm.read(1, window=rasterio_window)
dsm_crs = in_dsm.crs
# Get local transform for window
window_transform = row_col_to_coords(rasterio_window, in_dsm.transform)
roi_raster = np.ones(dsm.shape)
if isinstance(roi_polys, list):
roi_polys_outepsg = []
for poly in roi_polys:
if isinstance(poly, Polygon):
roi_poly_outepsg = projection.polygon_projection_crs(
poly, CRS(roi_epsg), dsm_crs
)
roi_polys_outepsg.append(roi_poly_outepsg)
roi_raster = rio.features.rasterize(
roi_polys_outepsg,
out_shape=roi_raster.shape,
transform=window_transform,
)
elif isinstance(roi_polys, Polygon):
roi_poly_outepsg = projection.polygon_projection_crs(
roi_polys, CRS(roi_epsg), dsm_crs
)
roi_raster = rio.features.rasterize(
[roi_poly_outepsg],
out_shape=roi_raster.shape,
transform=window_transform,
)
# Get the initial elevation
with rio.open(geo_plugin_dem) as in_elev:
# Reproject the elevation data to match the DSM
elev_data = np.empty(dsm.shape, dtype=in_elev.dtypes[0])
reproject(
source=rio.band(in_elev, 1),
destination=elev_data,
src_transform=in_elev.transform,
src_crs=in_elev.crs,
dst_transform=window_transform,
dst_crs=dsm_crs,
resampling=interpolation_method,
)
with rio.open(geo_plugin_geoid) as in_geoid:
# Reproject the geoid data to match the DSM
input_geoid_data = np.empty(dsm.shape, dtype=in_geoid.dtypes[0])
reproject(
source=rio.band(in_geoid, 1),
destination=input_geoid_data,
src_transform=in_geoid.transform,
src_crs=in_geoid.crs,
dst_transform=window_transform,
dst_crs=dsm_crs,
resampling=interpolation_method,
)
output_geoid_data = None
if isinstance(output_geoid, str):
with rio.open(output_geoid) as in_geoid:
# Reproject the geoid data to match the DSM
output_geoid_data = np.empty(dsm.shape, dtype=in_geoid.dtypes[0])
reproject(
source=rio.band(in_geoid, 1),
destination=output_geoid_data,
src_transform=in_geoid.transform,
src_crs=in_geoid.crs,
dst_transform=window_transform,
dst_crs=dsm_crs,
resampling=interpolation_method,
)
# Fill DSM for every label
combined_mask = np.zeros_like(dsm).astype(np.uint8)
classif = None
classif_msk = None
dsm_msk = None
if classif_file is not None:
with rio.open(classif_file) as in_classif:
classif = in_classif.read(1, window=rasterio_window)
classif_msk = in_classif.read_masks(1, window=rasterio_window)
else:
with rio.open(dsm_file) as in_dsm:
dsm_msk = in_dsm.read_masks(1, window=rasterio_window)
for label in fill_classification:
if label in classif_values:
filling_mask = np.logical_and(classif == int(label), roi_raster > 0)
elif label == "nodata":
if classif_msk is not None:
filling_mask = ~classif_msk
else:
filling_mask = ~dsm_msk
filling_mask = np.logical_and(filling_mask, roi_raster > 0)
else:
logging.error(
"Label {} not found in classification "
"descriptions {}".format(label, classif_values)
)
continue
if label in fill_with_geoid:
logging.info("Filling of {} with geoid".format(label))
dsm[filling_mask] = 0
else:
logging.info("Filling of {} with DEM and geoid".format(label))
dsm[filling_mask] = elev_data[filling_mask]
# apply offset to project on geoid if needed
if output_geoid is not True:
if isinstance(output_geoid, bool) and output_geoid is False:
# out geoid is ellipsoid: add geoid-ellipsoid distance
dsm[filling_mask] += input_geoid_data[filling_mask]
elif isinstance(output_geoid, str):
# out geoid is a new geoid whose path is in output_geoid:
# add carsgeoid-ellipsoid then add ellipsoid-outgeoid
dsm[filling_mask] += input_geoid_data[filling_mask]
dsm[filling_mask] -= output_geoid_data[filling_mask]
combined_mask = np.logical_or(combined_mask, filling_mask)
invalidity_mask = None
if fill_nodata is not None:
if invalidity_mask_file is not None:
with rio.open(invalidity_mask_file) as src:
invalidity_mask = src.read(1, window=rasterio_window)
for label in fill_nodata:
filling_mask = np.logical_and(
invalidity_mask == int(label), roi_raster > 0
)
if label in fill_with_geoid:
logging.info("Filling of {} with geoid".format(label))
dsm[filling_mask] = 0
else:
logging.info("Filling of {} with DEM and geoid".format(label))
dsm[filling_mask] = elev_data[filling_mask]
# apply offset to project on geoid if needed
if output_geoid is not True:
if isinstance(output_geoid, bool) and output_geoid is False:
# out geoid is ellipsoid: add geoid-ellipsoid distance
dsm[filling_mask] += input_geoid_data[filling_mask]
elif isinstance(output_geoid, str):
# out geoid is a new geoid whose path is in output_geoid:
# add carsgeoid-ellipsoid then add ellipsoid-outgeoid
dsm[filling_mask] += input_geoid_data[filling_mask]
dsm[filling_mask] -= output_geoid_data[filling_mask]
combined_mask = np.logical_or(combined_mask, filling_mask)
data = {
"exogenous_filled_dsm": (["row", "col"], dsm),
"reprojected_dem": (["row", "col"], elev_data),
"reprojected_input_geoid": (["row", "col"], input_geoid_data),
}
coords = {
"row": np.arange(dsm.shape[0]),
"col": np.arange(dsm.shape[1]),
}
if filling_file is not None:
# create combined filling
with rio.open(filling_file) as in_filling:
nb_bands_filling = in_filling.count + 1
filling = in_filling.read(window=rasterio_window)
# add layer combined_mask to filling
combined_mask = combined_mask[np.newaxis, :, :]
filling = np.concatenate((filling, combined_mask), axis=0)
data["exogenous_filled_filling"] = (
["band_filling", "row", "col"],
filling,
)
coords["band_filling"] = np.arange(1, nb_bands_filling + 1)
if output_geoid_data is not None:
data["reprojected_output_geoid"] = (["row", "col"], output_geoid_data)
output_dataset = xr.Dataset(
data_vars=data,
coords=coords,
)
cars_dataset.fill_dataset(
output_dataset,
saving_info=saving_info,
window=window,
profile=profile,
attributes=None,
overlaps=None,
)
return output_dataset