#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2023 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Shareloc geometry sub class : CARS geometry wrappers functions to shareloc ones
"""
import logging
from typing import List, Tuple, Union
import bindings_cpp # pylint: disable=E0401
import numpy as np
import rasterio as rio
import shareloc.geofunctions.rectification as rectif
import xarray as xr
from json_checker import And, Checker, Or
from shareloc.dtm_reader import dtm_reader
from shareloc.geofunctions import localization
from shareloc.geofunctions.rectification_grid import RectificationGrid
from shareloc.geofunctions.triangulation import epipolar_triangulation
from shareloc.geomodels.geomodel import GeoModel
from shareloc.geomodels.grid import Grid
from shareloc.geomodels.rpc import RPC
from shareloc.image import Image
from cars.core import constants as cst
from cars.core import projection
from cars.core.geometry.abstract_geometry import AbstractGeometry
GRID_TYPE = "GRID"
RPC_TYPE = "RPC"
GEO_MODEL_PATH_TAG = "path"
GEO_MODEL_TYPE_TAG = "model_type"
[docs]
@AbstractGeometry.register_subclass("SharelocGeometry")
class SharelocGeometry(AbstractGeometry):
"""
Shareloc geometry class
"""
def __init__( # pylint: disable=too-many-positional-arguments
self,
geometry_plugin_conf,
dem=None,
geoid=None,
default_alt=None,
pairs_for_roi=None,
scaling_coeff=1,
output_dem_dir=None,
):
super().__init__(
geometry_plugin_conf,
dem=dem,
geoid=geoid,
default_alt=default_alt,
pairs_for_roi=pairs_for_roi,
scaling_coeff=scaling_coeff,
output_dem_dir=output_dem_dir,
)
if dem is not None:
# fill_nodata option should be set when dealing with void in DTM
# see shareloc DTM limitations in sphinx doc for further detailsd
try:
if self.dem_roi is None:
roi_shareloc = None
else:
# From (x, y) to (y, x)
roi_shareloc = [
self.dem_roi[1],
self.dem_roi[0],
self.dem_roi[3],
self.dem_roi[2],
]
dtm_image = dtm_reader(
self.dem,
self.geoid,
roi=roi_shareloc,
roi_is_in_physical_space=True,
fill_nodata="mean",
fill_value=0.0,
)
except RuntimeError as err:
mss = "the roi bounds are"
if mss in str(err):
new_except_mss = (
f"The extent of the roi lies outside "
f"the extent of the initial elevation : {err}"
)
raise RuntimeError(new_except_mss) from err
raise
self.elevation = (
bindings_cpp.DTMIntersection( # pylint: disable=I1101
dtm_image.epsg,
dtm_image.alt_data,
dtm_image.nb_rows,
dtm_image.nb_columns,
dtm_image.transform,
)
)
else:
self.elevation = default_alt
[docs]
def check_conf(self, conf):
"""
Check configuration
:param conf: configuration to check
:type conf: str or dict
:return: full dict
:rtype: dict
"""
if conf is None:
raise RuntimeError("Geometry plugin configuration is None")
overloaded_conf = {}
if isinstance(conf, str):
conf = {"plugin_name": conf}
# overload conf
overloaded_conf["plugin_name"] = conf.get(
"plugin_name", "SharelocGeometry"
)
overloaded_conf["interpolator"] = conf.get("interpolator", "cubic")
overloaded_conf["dem_roi_margin_initial_elevation"] = conf.get(
"dem_roi_margin_initial_elevation", [0.25, 0.02]
)
overloaded_conf["dem_roi_margin_rectification"] = conf.get(
"dem_roi_margin_rectification", 0.5
)
geometry_schema = {
"plugin_name": str,
"interpolator": And(str, lambda x: x in ["cubic", "linear"]),
"dem_roi_margin_initial_elevation": [float],
"dem_roi_margin_rectification": And(
Or(float, int), lambda x: x > 0
),
}
# Check conf
checker = Checker(geometry_schema)
checker.validate(overloaded_conf)
return overloaded_conf
[docs]
def get_roi( # pylint: disable=too-many-positional-arguments
self,
pairs_for_roi,
epsg,
z_min=0,
z_max=0,
linear_margin=0,
constant_margin=0.012,
):
"""
Compute region of interest for intersection of DEM
:param pairs_for_roi: list of pairs of images and geomodels
:type pairs_for_roi: List[(str, dict, str, dict)]
:param dem_epsg: output EPSG code for ROI
:type dem_epsg: int
:param linear_margin: margin for ROI (factor of initial ROI size)
:type linear_margin: float
:param constant_margin: margin for ROI in degrees
:type constant_margin: float
"""
base_roi = super().get_roi(
pairs_for_roi, epsg, z_min, z_max, linear_margin, constant_margin
)
coords_list = []
for image1, geomodel1, _, geomodel2 in pairs_for_roi:
# Footprint of rectification grid (with margins)
image1 = SharelocGeometry.load_image(image1["bands"]["b0"]["path"])
geomodel1 = self.load_geom_model(geomodel1)
geomodel2 = self.load_geom_model(geomodel2)
# With altitude z_min
epipolar_extent = rectif.get_epipolar_extent(
image1,
geomodel1,
geomodel2,
elevation=z_min,
grid_margin=self.rectification_grid_margin,
)
lat_min, lon_min, lat_max, lon_max = list(epipolar_extent)
coords_list.extend([(lon_min, lat_min), (lon_max, lat_max)])
# With altitude z_max
epipolar_extent = rectif.get_epipolar_extent(
image1,
geomodel1,
geomodel2,
elevation=z_max,
grid_margin=self.rectification_grid_margin,
)
lat_min, lon_min, lat_max, lon_max = list(epipolar_extent)
coords_list.extend([(lon_min, lat_min), (lon_max, lat_max)])
lon_list, lat_list = list(zip(*coords_list)) # noqa: B905
roi = [
min(lon_list) - constant_margin,
min(lat_list) - constant_margin,
max(lon_list) + constant_margin,
max(lat_list) + constant_margin,
]
points = np.array(
[
(roi[0], roi[1], 0),
(roi[2], roi[3], 0),
(roi[0], roi[1], 0),
(roi[2], roi[3], 0),
]
)
new_points = projection.point_cloud_conversion(points, 4326, epsg)
roi = [
min(new_points[:, 0]),
min(new_points[:, 1]),
max(new_points[:, 0]),
max(new_points[:, 1]),
]
lon_size = roi[2] - roi[0]
lat_size = roi[3] - roi[1]
roi[0] -= linear_margin * lon_size
roi[1] -= linear_margin * lat_size
roi[2] += linear_margin * lon_size
roi[3] += linear_margin * lat_size
roi = [
min(roi[0], base_roi[0]),
min(roi[1], base_roi[1]),
max(roi[2], base_roi[2]),
max(roi[3], base_roi[3]),
]
return roi
[docs]
@staticmethod
def load_geom_model(model: dict) -> Union[Grid, RPC]:
"""
Load geometric model and returns it as a shareloc object
:param model: Path and attributes for geometrical model
:type model: dict with keys "path" and "model_type"
:return: geometric model as a shareloc object (Grid or RPC)
"""
geomodel = model[GEO_MODEL_PATH_TAG]
# Use RPC Type if none are used
if model.get(GEO_MODEL_TYPE_TAG):
geomodel_type = model[GEO_MODEL_TYPE_TAG]
else:
geomodel_type = RPC_TYPE
# Use RPCoptim class to use optimized C++ direct localizations
if geomodel_type == "RPC":
geomodel_type = "RPCoptim"
shareloc_model = GeoModel(geomodel, geomodel_type)
if shareloc_model is None:
raise ValueError(f"Model {geomodel} could not be read by shareloc")
return shareloc_model
[docs]
@staticmethod
def load_image(img: str) -> Image:
"""
Load the image using the Image class of Shareloc
:param img: path to the image
:return: The Image object
"""
try:
shareloc_img = Image(img, vertical_direction="north")
except Exception as error:
raise ValueError(f"Image type {img} is not supported") from error
return shareloc_img
[docs]
@staticmethod
def check_product_consistency(sensor: str, geomodel: dict) -> bool:
"""
Test if the product is readable by the shareloc plugin
TODO: not used
- to evolve and use in CARS configuration early in pipeline process
(new early check input common application ?)
:param sensor: path to sensor image
:param geomodel: path and attributes for geometrical model
:return: sensor path and overloaded geomodel dict
"""
# Check geomodel schema consistency
if isinstance(geomodel, str):
geomodel = {
"path": geomodel,
}
overloaded_geomodel = geomodel.copy()
# If model_type is not defined, default is "RPC"
overloaded_geomodel["model_type"] = geomodel.get("model_type", "RPC")
geomodel_schema = {"path": str, "model_type": str}
checker_geomodel = Checker(geomodel_schema)
checker_geomodel.validate(overloaded_geomodel)
# Try to read them using shareloc
for band in sensor["bands"]:
SharelocGeometry.load_image(sensor["bands"][band]["path"])
SharelocGeometry.load_geom_model(overloaded_geomodel)
return sensor, overloaded_geomodel
[docs]
def triangulate( # pylint: disable=too-many-positional-arguments
self,
sensor1,
sensor2,
geomodel1,
geomodel2,
mode: str,
matches: Union[xr.Dataset, np.ndarray],
grid1: Union[dict, RectificationGrid],
grid2: Union[dict, RectificationGrid],
roi_key: Union[None, str] = None,
interpolation_method=None,
) -> np.ndarray:
"""
Performs triangulation from cars disparity or matches dataset
:param sensor1: path to left sensor image
:param sensor2: path to right sensor image
:param geomodel1: path and attributes for left geomodel
:param geomodel2: path and attributes for right geomodel
:param mode: triangulation mode
(constants.DISP_MODE or constants.MATCHES)
:param matches: cars disparity dataset or matches as numpy array
:param grid1: dict or RectificationGrid for epipolar grid of sensor1
:param grid2: dict or RectificationGrid for epipolar grid of sensor2
:param roi_key: dataset roi to use
(can be cst.ROI or cst.ROI_WITH_MARGINS)
:return: the long/lat/height numpy array in output of the triangulation
"""
# read geomodels using shareloc
shareloc_model1 = SharelocGeometry.load_geom_model(geomodel1)
shareloc_model2 = SharelocGeometry.load_geom_model(geomodel2)
if isinstance(grid1, dict):
# create rectificationgrid
grid1 = RectificationGrid(
grid1["path"],
interpolator=self.interpolator,
)
grid2 = RectificationGrid(
grid2["path"],
interpolator=self.interpolator,
)
# perform matches triangulation
if mode is cst.MATCHES_MODE:
__, point_wgs84, __ = epipolar_triangulation(
matches=matches,
mask=None,
matches_type="sift",
geometrical_model_left=shareloc_model1,
geometrical_model_right=shareloc_model2,
grid_left=grid1,
grid_right=grid2,
residues=True,
fill_nan=True,
)
llh = point_wgs84.reshape((point_wgs84.shape[0], 1, 3))
elif mode is cst.DISP_MODE:
__, point_wgs84, __ = epipolar_triangulation(
matches=matches,
mask=None,
matches_type="disp",
geometrical_model_left=shareloc_model1,
geometrical_model_right=shareloc_model2,
grid_left=grid1,
grid_right=grid2,
residues=True,
fill_nan=True,
)
row = np.array(
range(matches.attrs[roi_key][1], matches.attrs[roi_key][3])
)
col = np.array(
range(matches.attrs[roi_key][0], matches.attrs[roi_key][2])
)
llh = point_wgs84.reshape((row.size, col.size, 3))
else:
logging.error(
"{} mode is not available in the "
"shareloc plugin triangulation".format(mode)
)
raise ValueError(
f"{mode} mode is not available"
" in the shareloc plugin triangulation"
)
return llh
# pylint: disable=too-many-positional-arguments
[docs]
def generate_epipolar_grids(
self,
sensor1,
sensor2,
geomodel1,
geomodel2,
epipolar_step: int = 30,
) -> Tuple[
np.ndarray, np.ndarray, List[float], List[float], List[int], float
]:
"""
Computes the left and right epipolar grids
:param sensor1: path to left sensor image
:param sensor2: path to right sensor image
:param geomodel1: path and attributes for left geomodel
:param geomodel2: path and attributes for right geomodel
:param epipolar_step: step to use to construct the epipolar grids
:return: Tuple composed of :
- the left epipolar grid as a numpy array
- the right epipolar grid as a numpy array
- the left grid origin as a list of float
- the left grid spacing as a list of float
- the epipolar image size as a list of int
(x-axis size is given with the index 0, y-axis size with index 1)
- the disparity to altitude ratio as a float
"""
# read inputs using shareloc
shareloc_model1 = SharelocGeometry.load_geom_model(geomodel1)
shareloc_model2 = SharelocGeometry.load_geom_model(geomodel2)
image1 = SharelocGeometry.load_image(sensor1)
image2 = SharelocGeometry.load_image(sensor2)
# compute epipolar grids
(
grid1,
grid2,
[epipolar_size_y, epipolar_size_x],
alt_to_disp_ratio,
) = rectif.compute_stereorectification_epipolar_grids(
image1,
shareloc_model1,
image2,
shareloc_model2,
self.elevation,
epi_step=epipolar_step,
margin=self.rectification_grid_margin,
)
# rearrange output to match the expected structure of CARS
# grid[:, :, 2] with altitudes is not used
grid1 = grid1[:, :, 0:2][:, :, ::-1]
grid2 = grid2[:, :, 0:2][:, :, ::-1]
epipolar_size_x = int(np.floor(epipolar_size_x))
epipolar_size_y = int(np.floor(epipolar_size_y))
origin = [
float(-self.rectification_grid_margin * epipolar_step),
float(-self.rectification_grid_margin * epipolar_step),
]
spacing = [float(epipolar_step), float(epipolar_step)]
# alt_to_disp_ratio does not consider image resolution
with rio.open(sensor1, "r") as rio_dst:
pixel_size_x, pixel_size_y = (
rio_dst.transform[0],
rio_dst.transform[4],
)
mean_size = (abs(pixel_size_x) + abs(pixel_size_y)) / 2
disp_to_alt_ratio = mean_size / alt_to_disp_ratio
return (
grid1,
grid2,
origin,
spacing,
[epipolar_size_x, epipolar_size_y],
disp_to_alt_ratio,
)
[docs]
def direct_loc( # pylint: disable=too-many-positional-arguments
self,
sensor,
geomodel,
x_coord: np.array,
y_coord: np.array,
z_coord: np.array = None,
) -> np.ndarray:
"""
For a given image point, compute the latitude, longitude, altitude
Advice: to be sure, use x,y,z inputs only
:param sensor: path to sensor image
:param geomodel: path and attributes for geomodel
:param x_coord: X Coordinate in input image sensor
:param y_coord: Y Coordinate in input image sensor
:param z_coord: Z Altitude coordinate to take the image
:return: Latitude, Longitude, Altitude coordinates as a numpy array
"""
# load model and image with shareloc
shareloc_model = SharelocGeometry.load_geom_model(geomodel)
shareloc_image = SharelocGeometry.load_image(sensor)
# perform direct localization operation
loc = localization.Localization(
shareloc_model,
image=shareloc_image,
elevation=self.elevation,
epsg=4326,
)
# Bug: y_coord and x_coord inversion to fit Shareloc standards row/col.
# TODO: clean geometry convention calls in API
lonlatalt = loc.direct(
y_coord, x_coord, z_coord, using_geotransform=True
)
lonlatalt = np.squeeze(lonlatalt)
if len(lonlatalt.shape) == 1:
latlonalt = np.array([lonlatalt[1], lonlatalt[0], lonlatalt[2]])
else:
latlonalt = np.array(
[lonlatalt[:, 1], lonlatalt[:, 0], lonlatalt[:, 2]]
)
return latlonalt
[docs]
def inverse_loc( # pylint: disable=too-many-positional-arguments
self,
sensor,
geomodel,
lat_coord: np.array,
lon_coord: np.array,
z_coord: np.array = None,
) -> np.ndarray:
"""
For a given image points list, compute the latitudes,
longitudes, altitudes
Advice: to be sure, use x,y,z list inputs only
:param sensor: path to sensor image
:param geomodel: path and attributes for geomodel
:param lat_coord: latitute Coordinate list
:param lon_coord: longitude Coordinates list
:param z_coord: Z Altitude list
:return: X / Y / Z Coordinates list in input image as a numpy array
"""
# load model and image with shareloc
shareloc_model = SharelocGeometry.load_geom_model(geomodel)
shareloc_image = SharelocGeometry.load_image(sensor)
# perform inverse localization operation
loc = localization.Localization(
shareloc_model,
image=shareloc_image,
elevation=self.elevation,
epsg=4326,
)
# Rows and columns order is inversed
col, row, alti = loc.inverse(
lon_coord.astype(np.float64),
lat_coord.astype(np.float64),
h=z_coord.astype(np.float64),
using_geotransform=True,
)
return row, col, alti