#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
cars-devibrate: devibrate a high resolution DSM using a low resolution DSM
"""
import argparse
import json
import logging
import math
# Standard imports
import os
import pickle
from typing import List, Union
# Third party imports
import numpy as np
import pandas as pd
import pyproj
import rasterio as rio
import xarray as xr
from rasterio.windows import bounds as to_bounds
from rasterio.windows import from_bounds
from scipy import interpolate
from scipy.signal import butter, filtfilt, lfilter, lfilter_zi
# CARS / SHARELOC imports
from shareloc.dtm_reader import interpolate_geoid_height
from shareloc.geofunctions import triangulation
from cars.applications.rasterization import rasterization_algo as rasterization
from cars.core.geometry.abstract_geometry import AbstractGeometry
from cars.core.geometry.shareloc_geometry import SharelocGeometry
# Get full path geoid
package_path = os.path.dirname(__file__)
GEOID_DEFAULT = os.path.join(package_path, "conf", "geoid/egm96.grd")
[docs]
def acquisition_direction(
sensor1, geomodel1, sensor2, geomodel2, geometry_plugin
):
"""
Computes the mean acquisition of the input images pair
:param sensor1: sensor image name of the first product
:param geomodel1: geomodel name of the first product
:param sensor2: sensor image name of the second product
:param geomodel2: geomodel name of the second product
:return: a tuple composed of :
- the mean acquisition direction as a numpy array
- the acquisition direction of the first product as a numpy array
- the acquisition direction of the second product as a numpy array
"""
vec1 = get_time_ground_direction(sensor1, geomodel1, geometry_plugin)
vec2 = get_time_ground_direction(sensor2, geomodel2, geometry_plugin)
time_direction_vector = (vec1 + vec2) / 2
def display_angle(vec):
"""
Display angle in degree from a vector x
:param vec: vector to display
:return: angle in degree
"""
return 180 * math.atan2(vec[1], vec[0]) / math.pi
logging.info(
"Time direction average azimuth: "
"{}deg (img1: {}deg, img2: {}deg)".format(
display_angle(time_direction_vector),
display_angle(vec1),
display_angle(vec2),
)
)
return time_direction_vector, vec1, vec2
[docs]
def get_time_ground_direction( # pylint: disable=too-many-positional-arguments
sensor,
geomodel,
geometry_plugin,
x_loc: float = None,
y_loc: float = None,
y_offset: float = None,
) -> np.ndarray:
"""
For a given image, compute the direction of increasing acquisition
time on ground.
Done by two localizations at "y" and "y+y_offset" values.
:param sensor: sensor image name
:param geomodel: geomodel name
:param x_loc: x location in image for estimation (default=center)
:param y_loc: y location in image for estimation (default=1/4)
:param y_offset: y location in image for estimation (default=1/2)
:param dem: DEM for direct localisation function
:param geoid: path to geoid file
:return: normalized direction vector as a numpy array
"""
# Define x: image center, y: 1/4 of image,
# y_offset: 3/4 of image if not defined
with rio.open(sensor) as src:
img_size_x, img_size_y = src.width, src.height
if x_loc is None:
x_loc = img_size_x / 2
if y_loc is None:
y_loc = img_size_y / 4
if y_offset is None:
y_offset = img_size_y / 2
# Check x, y, y_offset to be in image
assert x_loc >= 0
assert x_loc <= img_size_x
assert y_loc >= 0
assert y_loc <= img_size_y
assert y_offset > 0
assert y_loc + y_offset <= img_size_y
# Get coordinates of time direction vectors
lat1, lon1, __ = geometry_plugin.direct_loc(sensor, geomodel, x_loc, y_loc)
lat2, lon2, __ = geometry_plugin.direct_loc(
sensor, geomodel, x_loc, y_loc + y_offset
)
# Create and normalize the time direction vector
vec = np.array([lon1 - lon2, lat1 - lat2])
vec = vec / np.linalg.norm(vec)
return vec
[docs]
def project_coordinates_on_line(
x_coord: Union[float, np.ndarray],
y_coord: Union[float, np.ndarray],
origin: Union[List[float], np.ndarray],
vec: Union[List[float], np.ndarray],
) -> Union[float, np.ndarray]:
"""
Project coordinates (x, y) on a line starting from origin with a
direction vector vec, and return the euclidean distances between
projected points and origin.
:param x_coord: scalar or vector of coordinates x
:param y_coord: scalar or vector of coordinates y
:param origin: coordinates of origin point for line
:param vec: direction vector of line
:return: vector of distances of projected points to origin
"""
assert len(x_coord) == len(y_coord)
assert len(origin) == 2
assert len(vec) == 2
vec_angle = math.atan2(vec[1], vec[0])
point_angle = np.arctan2(y_coord - origin[1], x_coord - origin[0])
proj_angle = point_angle - vec_angle
dist_to_origin = np.sqrt(
np.square(x_coord - origin[0]) + np.square(y_coord - origin[1])
)
return dist_to_origin * np.cos(proj_angle)
# pylint: disable=too-many-positional-arguments
[docs]
def lowres_initial_dem_splines_fit(
lowres_dsm_from_matches: xr.Dataset,
lowres_initial_dem: xr.Dataset,
origin: np.ndarray,
time_direction_vector: np.ndarray,
ext: int = 3,
order: int = 3,
min_pts_per_time: int = 100,
min_pts_along_time_direction: int = 100,
butterworth_filter_order: int = 3,
butterworth_critical_frequency: float = 0.05,
):
"""
This function takes 2 datasets containing DSM and models the
difference between the two as an UnivariateSpline along the
direction given by origin and time_direction_vector. Internally,
it looks for the highest smoothing factor that satisfies the rmse threshold.
:param lowres_dsm_from_matches: Dataset containing the low resolution DSM
obtained from matches, as returned by the
rasterization.simple_rasterization_dataset function.
:param lowres_initial_dem: Dataset containing the low resolution DEM,
on the same grid as lowres_dsm_from_matches
:param origin: coordinates of origin point for line
:type origin: list(float) or np.array(float) of size 2
:param time_direction_vector: direction vector of line
:type time_direction_vector: list(float) or np.array(float) of size 2
:param ext: behavior outside of interpolation domain
:param order: spline order
:param min_pts_per_time: minimum number of points for
each measurement
:param min_pts_along_time_direction: minimum number of points for
time direction
:param butterworth_filter_order: Order of the filter.
See scipy.signal.butter
:param butterworth_critical_frequency: The filter critical frequency
or frequencies. See scipy.signal.butter
"""
# Initial DSM difference
dsm_diff = lowres_initial_dem.hgt - lowres_dsm_from_matches.hgt
# Form arrays of coordinates
x_values_2d, y_values_2d = np.meshgrid(dsm_diff.x, dsm_diff.y)
# Project coordinates on the line
# of increasing acquisition time to form 1D coordinates
# If there are sensor oscillations, they will occur in this direction
linear_coords = project_coordinates_on_line(
x_values_2d, y_values_2d, origin, time_direction_vector
)
# Form a 1D array with diff values indexed with linear coords
linear_diff_array = xr.DataArray(
dsm_diff.values.ravel(), coords={"l": linear_coords.ravel()}, dims=("l")
)
linear_diff_array = linear_diff_array.dropna(dim="l")
linear_diff_array = linear_diff_array.sortby("l")
# Apply median filtering within cell matching input dsm resolution
min_l = np.min(linear_diff_array.l)
max_l = np.max(linear_diff_array.l)
nbins = int(
math.ceil((max_l - min_l) / (lowres_dsm_from_matches.resolution))
)
median_linear_diff_array = linear_diff_array.groupby_bins(
"l", nbins
).median()
median_linear_diff_array = median_linear_diff_array.rename({"l_bins": "l"})
median_linear_diff_array = median_linear_diff_array.assign_coords(
{"l": np.array([d.mid for d in median_linear_diff_array.l.data])}
)
count_linear_diff_array = linear_diff_array.groupby_bins("l", nbins).count()
count_linear_diff_array = count_linear_diff_array.rename({"l_bins": "l"})
count_linear_diff_array = count_linear_diff_array.assign_coords(
{"l": np.array([d.mid for d in count_linear_diff_array.l.data])}
)
# Filter measurements with insufficient amount of points
median_linear_diff_array = median_linear_diff_array.where(
count_linear_diff_array > min_pts_per_time
).dropna(dim="l")
if len(median_linear_diff_array) < min_pts_along_time_direction:
raise RuntimeError(
"Insufficient amount of points ({} < {}) along time direction "
"after measurements filtering to estimate correction "
"to fit initial DEM".format(
len(median_linear_diff_array), min_pts_along_time_direction
)
)
# Apply butterworth lowpass filter to retrieve only the low frequency
# (from example of doc: https://docs.scipy.org/doc/scipy/reference/
# generated/scipy.signal.butter.html#scipy.signal.butter )
b, a = butter(butterworth_filter_order, butterworth_critical_frequency)
zi_filter = lfilter_zi(b, a)
z_filter, _ = lfilter(
b,
a,
median_linear_diff_array.values,
zi=zi_filter * median_linear_diff_array.values[0],
)
lfilter(b, a, z_filter, zi=zi_filter * z_filter[0])
filtered_median_linear_diff_array = xr.DataArray(
filtfilt(b, a, median_linear_diff_array.values),
coords=median_linear_diff_array.coords,
)
# Estimate performances of spline s = 100 * length of diff array
smoothing_factor = 100 * len(filtered_median_linear_diff_array.l)
splines = interpolate.UnivariateSpline(
filtered_median_linear_diff_array.l,
filtered_median_linear_diff_array.values,
ext=ext,
k=order,
s=smoothing_factor,
)
estimated_correction = xr.DataArray(
splines(filtered_median_linear_diff_array.l),
coords=filtered_median_linear_diff_array.coords,
)
rmse = (filtered_median_linear_diff_array - estimated_correction).std(
dim="l"
)
target_rmse = 0.3
# If RMSE is too high, try to decrease smoothness until it fits
while rmse > target_rmse and smoothing_factor > 0.001:
# divide s by 2
smoothing_factor /= 2
# Estimate splines
splines = interpolate.UnivariateSpline(
filtered_median_linear_diff_array.l,
filtered_median_linear_diff_array.values,
ext=ext,
k=order,
s=smoothing_factor,
)
# Compute RMSE
estimated_correction = xr.DataArray(
splines(filtered_median_linear_diff_array.l),
coords=filtered_median_linear_diff_array.coords,
)
rmse = (filtered_median_linear_diff_array - estimated_correction).std(
dim="l"
)
logging.debug(
"Best smoothing factor for splines regression: "
"{} (rmse={})".format(smoothing_factor, rmse)
)
# Return estimated spline
return splines
[docs]
def read_lowres_dsm(srtm_path, startx, starty, endx, endy):
"""
Read an extract of the low resolution input DSM and return it as an Array
"""
with rio.open(srtm_path) as src:
window = from_bounds(startx, starty, endx, endy, src.transform)
window = window.round_offsets()
window = window.round_lengths()
bounds = to_bounds(window, src.transform)
resolution = src.res[0]
dem_as_array = src.read(1, window=window)
newstartx, newstarty = bounds[0], bounds[-1]
sizex, sizey = window.width, window.height
x_values_1d = np.linspace(
newstartx + 0.5 * resolution,
newstartx + resolution * (sizex + 0.5),
sizex,
endpoint=False,
)
y_values_1d = np.linspace(
newstarty - 0.5 * resolution,
newstarty - resolution * (sizey + 0.5),
sizey,
endpoint=False,
)
dims = ["y", "x"]
coords = {"x": x_values_1d, "y": y_values_1d}
dsm_as_ds = xr.Dataset({"hgt": (dims, dem_as_array)}, coords=coords)
dsm_as_ds["epsg"] = 4326
dsm_as_ds["resolution"] = resolution
return dsm_as_ds, newstartx, newstarty, sizex, sizey, resolution
[docs]
def compute_splines( # pylint: disable=too-many-positional-arguments
sensor1,
geomodel1,
sensor2,
geomodel2,
matches,
srtm_path,
geoid_path,
out_dir,
min_pts_per_time: int = 100,
min_pts_along_time_direction: int = 100,
butterworth_filter_order: int = 3,
butterworth_critical_frequency: float = 0.05,
):
"""
Compute a spline dict containing estimated splines, origin
and time_direction_vector
"""
geometry_plugin = AbstractGeometry( # pylint: disable=E0110
"SharelocGeometry"
)
# retrieve time direction from models
time_direction_vector, _, _ = acquisition_direction(
sensor1, geomodel1, sensor2, geomodel2, geometry_plugin
)
# load matches and triangulate them
corrected_matches = np.load(matches)
disp = corrected_matches[:, 0] - corrected_matches[:, 2]
mini = np.percentile(disp, 5.0)
maxi = np.percentile(disp, 95)
corrected_matches = corrected_matches[
np.logical_and(disp >= mini, disp <= maxi), :
]
shareloc_model1 = SharelocGeometry.load_geom_model(geomodel1)
shareloc_model2 = SharelocGeometry.load_geom_model(geomodel2)
matches_sensor = corrected_matches[:, 4:8]
[__, pc_from_matches, __] = triangulation.sensor_triangulation(
matches_sensor, shareloc_model1, shareloc_model2
)
positions = np.array(pc_from_matches[:, 0:2])
geoid_height = interpolate_geoid_height(geoid_path, positions)
pc_from_matches[:, 2] = pc_from_matches[:, 2] - geoid_height
# deduce area from sift
pc_xx = pc_from_matches[:, 0]
pc_yy = pc_from_matches[:, 1]
startx, endx = pc_xx.min(), pc_xx.max()
starty, endy = pc_yy.min(), pc_yy.max()
# read corresponding dem
lowres_initial_dem, startx, starty, sizex, sizey, resolution = (
read_lowres_dsm(srtm_path, startx, starty, endx, endy)
)
points_cloud_from_matches = pd.DataFrame(
data=pc_from_matches, columns=["x", "y", "z"]
)
# rasterize point cloud (superimposable image with lowres initial dem)
points_cloud_from_matches.attrs["attributes"] = {"number_of_pc": 0}
lowres_dsm = rasterization.simple_rasterization_dataset_wrapper(
points_cloud_from_matches,
resolution,
4326,
xstart=startx,
ystart=starty,
xsize=sizex,
ysize=sizey,
)
lowres_dsm_diff = lowres_initial_dem - lowres_dsm
origin = [
float(lowres_dsm_diff["x"][0].values),
float(lowres_dsm_diff["y"][0].values),
]
# fit initial dem and low res dsm from sift and deduce splines correction
splines = lowres_initial_dem_splines_fit(
lowres_dsm,
lowres_initial_dem,
origin,
time_direction_vector,
min_pts_per_time=min_pts_per_time,
min_pts_along_time_direction=min_pts_along_time_direction,
butterworth_filter_order=butterworth_filter_order,
butterworth_critical_frequency=butterworth_critical_frequency,
)
# save intermediate data
lowres_initial_dem_file = os.path.join(out_dir, "lowres_initial_dem.nc")
lowres_initial_dem.to_netcdf(lowres_initial_dem_file)
lowres_dsm_file = os.path.join(out_dir, "lowres_dsm.nc")
lowres_dsm.to_netcdf(lowres_dsm_file)
lowres_dsm_diff_file = os.path.join(out_dir, "lowres_dsm_diff.nc")
lowres_dsm_diff.to_netcdf(lowres_dsm_diff_file)
# use splines for correction introduced in rasterization
lowres_dsm_as_df = lowres_dsm.to_dataframe().reset_index()
distance_vector = project_coordinates_on_line(
lowres_dsm_as_df["x"],
lowres_dsm_as_df["y"],
origin,
time_direction_vector,
)
correction = splines(distance_vector)
lowres_dsm["hgt"] = lowres_dsm["hgt"] + correction.reshape(
lowres_dsm["hgt"].shape
)
new_lowres_dsm_file = os.path.join(out_dir, "new_lowres_dsm.nc")
lowres_dsm.to_netcdf(new_lowres_dsm_file)
new_lowres_dsm_diff = lowres_initial_dem - lowres_dsm
new_lowres_dsm_diff_file = os.path.join(out_dir, "new_lowres_dsm_diff.nc")
new_lowres_dsm_diff.to_netcdf(new_lowres_dsm_diff_file)
return {
"splines": splines,
"origin": origin,
"time_direction_vector": time_direction_vector,
}
[docs]
def cars_devibrate( # pylint: disable=too-many-positional-arguments
used_conf,
srtm_path,
geoid_path,
min_pts_per_time: int = 100,
min_pts_along_time_direction: int = 100,
butterworth_filter_order: int = 3,
butterworth_critical_frequency: float = 0.05,
):
"""
Main fonction. Expects a dictionary from the CLI
or directly the input parameters.
"""
out_dir = os.path.dirname(used_conf)
with open(used_conf, "r", encoding="utf8") as jsonfile:
data = json.load(jsonfile)
pairing = data["inputs"]["pairing"]
assert len(pairing) == 1
sensor1 = data["inputs"]["sensors"][pairing[0][0]]["image"]
geomodel1 = data["inputs"]["sensors"][pairing[0][0]]["geomodel"]
sensor2 = data["inputs"]["sensors"][pairing[0][1]]["image"]
geomodel2 = data["inputs"]["sensors"][pairing[0][1]]["geomodel"]
matches = os.path.join(
out_dir,
"dump_dir",
"sparse_matching.sift",
"_".join(pairing[0]),
"filtered_matches.npy",
)
if not os.path.isfile(matches):
raise RuntimeError(
"Matches must be saved: Set applications.sparse_matching."
"sift.save_intermediate_data to true in CARS configuration file."
)
dsm_path = os.path.join(out_dir, "dsm", "dsm.tif")
if not os.path.isfile(dsm_path):
raise RuntimeError("DSM must be generated: set product level to `dsm`")
splines_path = os.path.join(out_dir, "splines.pck")
if os.path.exists(splines_path) is False:
splines_dict = compute_splines(
sensor1,
geomodel1,
sensor2,
geomodel2,
matches,
srtm_path,
geoid_path,
out_dir,
min_pts_per_time=min_pts_per_time,
min_pts_along_time_direction=min_pts_along_time_direction,
butterworth_filter_order=butterworth_filter_order,
butterworth_critical_frequency=butterworth_critical_frequency,
)
with open(splines_path, "wb") as writer:
pickle.dump(splines_dict, writer)
else:
with open(splines_path, "rb") as reader:
splines_dict = pickle.load(reader)
with rio.open(dsm_path) as src:
bounds = src.bounds
startx, starty = bounds[0], bounds[-1]
sizex, sizey = src.width, src.height
resolution = src.res[0]
x_values_1d = np.linspace(
startx + 0.5 * resolution,
startx + resolution * (sizex + 0.5),
sizex,
endpoint=False,
)
y_values_1d = np.linspace(
starty - 0.5 * resolution,
starty - resolution * (sizey + 0.5),
sizey,
endpoint=False,
)
x_values_2d, y_values_2d = np.meshgrid(x_values_1d, y_values_1d)
positions = None
if src.crs != "EPSG:4326":
transformer = pyproj.Transformer.from_crs(
src.crs, "EPSG:4326", always_xy=True
)
positions = np.array(
transformer.transform(x_values_2d, y_values_2d)
)
distance_vector = project_coordinates_on_line(
positions[0],
positions[1],
splines_dict["origin"],
splines_dict["time_direction_vector"],
)
bloclen = int(distance_vector.shape[0] / 2)
correction_1 = splines_dict["splines"](distance_vector[:bloclen, :])
correction_2 = splines_dict["splines"](distance_vector[bloclen:, :])
correction = np.vstack((correction_1, correction_2))
profile = src.profile
# User can apply correction with:
# otbcli_BandMath -il dsm.tif correction.tif \
# -exp "im1b1==-32768?-32768:im1b1+im2b1" -out corrected_dsm.tif
with rio.open(
os.path.join(out_dir, "correction.tif"), "w", **profile
) as dst:
dst.write(correction, 1)
[docs]
def cli():
"""
Main cars-devibrate entrypoint (Command Line Interface)
"""
parser = argparse.ArgumentParser(
"cars-devibrate",
description="Devibrate a DSM produced from stereo images",
)
parser.add_argument(
"used_conf", type=str, help="CARS Used Configuration File"
)
parser.add_argument(
"srtm_path",
type=str,
help="SRTM path",
)
parser.add_argument(
"--geoid_path",
type=str,
help="Geoid path",
default=GEOID_DEFAULT,
)
parser.add_argument(
"--min_pts_per_time",
type=int,
help="minimum number of points for" "each measurement",
default=100,
)
parser.add_argument(
"--min_pts_along_time_direction",
type=int,
help="minimum number of points for" "time direction",
default=100,
)
parser.add_argument(
"--butterworth_filter_order",
type=int,
help="Order of the butterworth filter",
default=3,
)
parser.add_argument(
"--butterworth_critical_frequency",
type=float,
help=" The butterworth filter critical frequency",
default=0.05,
)
args = parser.parse_args()
cars_devibrate(**vars(args))
if __name__ == "__main__":
cli()