#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains tools for the dem generation
"""
import contextlib
import logging
import os
import numpy as np
import rasterio as rio
import xdem
# Third-party imports
from affine import Affine
from pyproj import CRS
from rasterio.coords import BoundingBox
from rasterio.enums import Resampling
from rasterio.warp import calculate_default_transform, reproject
from scipy.ndimage import median_filter
from cars.core import preprocessing
[docs]
def add_margin(bbox, ratio=1):
"""
Add margin to a bounding box
:param bbox: input bounding box
:type bbox: rasterio.coords.BoundingBox
:param ratio: factor of bbox size to add to each side of bbox
:type ratio: float
:return: bounding box with margins
:rtype: rasterio.coords.BoundingBox
"""
try:
assert bbox.left < bbox.right
assert bbox.bottom < bbox.top
width = bbox.right - bbox.left
height = bbox.top - bbox.bottom
new_left = bbox.left - ratio * width
new_right = bbox.right + ratio * width
new_bottom = bbox.bottom - ratio * height
new_top = bbox.top + ratio * height
new_bbox = BoundingBox(new_left, new_bottom, new_right, new_top)
except AssertionError:
logging.warning("Bounding box {} cannot be read".format(bbox))
new_bbox = bbox
return new_bbox
[docs]
def compute_stats(diff):
"""
Compute and display statistics of difference between two DEM :
Minimum, median, percentiles and maximum
:param diff: altimetric difference between two DEM
:type diff: numpy.array
"""
mini = ("Min", np.nanmin(diff))
median = ("Median", np.nanmedian(diff))
p90 = ("p90", np.nanpercentile(diff, 90))
p95 = ("p95", np.nanpercentile(diff, 95))
p99 = ("p99", np.nanpercentile(diff, 99))
maxi = ("Max", np.nanmax(diff))
logging.info( # pylint: disable=logging-fstring-interpolation
f"| {mini[0]:6} | {median[0]:6} | {p90[0]:6} | "
f"{p95[0]:6} | {p99[0]:6} | {maxi[0]:6} |"
)
logging.info( # pylint: disable=logging-fstring-interpolation
f"| {mini[1]:6.2f} | {median[1]:6.2f} | {p90[1]:6.2f} | "
f"{p95[1]:6.2f} | {p99[1]:6.2f} | {maxi[1]:6.2f} |"
)
[docs]
def reverse_dem(input_dem):
"""
Compute the opposite of a DEM :
Altitudes sign is changed
:param input_dem: path of DEM to reverse
:type input_dem: str
"""
with rio.open(input_dem, "r") as in_dem:
data = in_dem.read()
metadata = in_dem.meta
nodata = in_dem.nodata
with rio.open(input_dem, "w", **metadata) as out_dem:
out_dem.write(-data)
out_dem.nodata = -nodata
[docs]
def downsample_dem(
input_dem,
scale,
interpolator,
median_filter_size=None,
default_alt=0,
):
"""
Downsample median DEM with median resampling
:param input_dem: path of DEM to downsample (only one band)
:type input_dem: str
"""
with rio.open(input_dem) as in_dem:
data = in_dem.read(1)
metadata = in_dem.meta
src_transform = in_dem.transform
width = in_dem.width
height = in_dem.height
crs = in_dem.crs
nodata = in_dem.nodata
dst_transform = src_transform * Affine.scale(scale)
dst_height = int(height // scale) + 1
dst_width = int(width // scale) + 1
metadata["transform"] = dst_transform
metadata["height"] = dst_height
metadata["width"] = dst_width
dem_data = np.zeros((dst_height, dst_width))
interpolator_dict = {
"min": Resampling.min,
"median": Resampling.med,
"max": Resampling.max,
"nearest": Resampling.nearest,
}
interpolator = interpolator_dict[interpolator]
reproject(
data,
dem_data,
src_transform=src_transform,
src_crs=crs,
dst_transform=dst_transform,
dst_crs=crs,
dst_nodata=nodata,
resampling=interpolator,
)
# Post-processing
# Median filter
if median_filter_size is not None:
dem_data = median_filter(dem_data, size=median_filter_size)
# Fill nodata
dem_data = rio.fill.fillnodata(
dem_data,
mask=~(dem_data == nodata),
)
dem_data[dem_data == nodata] = default_alt
with rio.open(input_dem, "w", **metadata) as dst:
dst.write(dem_data, 1)
[docs]
def modify_terrain_bounds(terrain_bounds, linear_margin, constant_margin, epsg):
"""
Modify the terrain bounds
:param terrain_bounds: Input region of interest for DEM
:type terrain_bounds: list
:param margin: Margin of the output ROI in meters
:type margin: int
"""
crs = CRS.from_epsg(epsg)
if crs.is_geographic:
utm_epsg = preprocessing.get_utm_zone_as_epsg_code(
terrain_bounds[0], terrain_bounds[1]
)
conversion_factor = preprocessing.get_conversion_factor(
terrain_bounds, 4326, utm_epsg
)
linear_margin = linear_margin * conversion_factor
constant_margin = constant_margin * conversion_factor
xsize = terrain_bounds[2] - terrain_bounds[0]
ysize = terrain_bounds[3] - terrain_bounds[1]
xmin = terrain_bounds[0] - linear_margin * xsize - constant_margin
ymin = terrain_bounds[1] - linear_margin * ysize - constant_margin
xmax = terrain_bounds[2] + linear_margin * xsize + constant_margin
ymax = terrain_bounds[3] + linear_margin * ysize + constant_margin
terrain_bounds = [xmin, ymin, xmax, ymax]
return terrain_bounds
[docs]
def reproject_dem(dsm_file_name, epsg_out, out_file_name):
"""
Reproject the DEM
:param dsm_file_name: the path to dsm
:type dsm_file_name: str
:param epsg_out: the epsg code
:type epsg_out: int
:param out_file_name: the out path file
:type out_file_name: str
"""
with rio.open(dsm_file_name) as src:
transform, width, height = calculate_default_transform(
src.crs, epsg_out, src.width, src.height, *src.bounds
)
kwargs = src.meta.copy()
kwargs.update(
{
"crs": epsg_out,
"transform": transform,
"width": width,
"height": height,
}
)
with rio.open(out_file_name, "w", **kwargs) as dst:
for i in range(1, src.count + 1):
reproject(
source=rio.band(src, i),
destination=rio.band(dst, i),
src_transform=src.transform,
src_crs=src.crs,
dst_transform=transform,
dst_crs=epsg_out,
resampling=Resampling.nearest,
)