Source code for cars.core.geometry.shareloc_geometry

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2023 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Shareloc geometry sub class : CARS geometry wrappers functions to shareloc ones
"""

import logging
from typing import List, Tuple, Union

import bindings_cpp  # pylint: disable=E0401
import numpy as np
import rasterio as rio
import shareloc.geofunctions.rectification as rectif
import xarray as xr
from json_checker import Checker
from shareloc.dtm_reader import dtm_reader
from shareloc.geofunctions import localization
from shareloc.geofunctions.rectification_grid import RectificationGrid
from shareloc.geofunctions.triangulation import epipolar_triangulation
from shareloc.geomodels.geomodel import GeoModel
from shareloc.geomodels.grid import Grid
from shareloc.geomodels.rpc import RPC
from shareloc.image import Image

from cars.core import constants as cst
from cars.core import inputs, projection
from cars.core.geometry.abstract_geometry import AbstractGeometry
from cars.data_structures import cars_dataset

GRID_TYPE = "GRID"
RPC_TYPE = "RPC"
GEO_MODEL_PATH_TAG = "path"
GEO_MODEL_TYPE_TAG = "model_type"


[docs]@AbstractGeometry.register_subclass("SharelocGeometry") class SharelocGeometry(AbstractGeometry): """ Shareloc geometry class """ def __init__( self, geometry_plugin_conf, dem=None, geoid=None, default_alt=None, pairs_for_roi=None, ): super().__init__( geometry_plugin_conf, dem=dem, geoid=geoid, default_alt=default_alt, pairs_for_roi=pairs_for_roi, ) self.dem_roi = None self.roi_shareloc = None self.elevation = None # a margin is needed for cubic interpolation self.rectification_grid_margin = 0 if self.interpolator == "cubic": self.rectification_grid_margin = 5 # compute roi only when generating geometry object with dem # even if dem is None if geoid is not None and pairs_for_roi is not None: self.dem_roi_epsg = 4326 if dem is not None: # Get dem epsg self.dem_roi_epsg = inputs.rasterio_get_epsg(dem) self.roi_shareloc = self.get_roi( pairs_for_roi, self.dem_roi_epsg, margin=0.012 ) # change convention self.dem_roi = [ self.roi_shareloc[1], self.roi_shareloc[0], self.roi_shareloc[3], self.roi_shareloc[2], ] if dem is not None: # fill_nodata option should be set when dealing with void in DTM # see shareloc DTM limitations in sphinx doc for further details dtm_image = dtm_reader( dem, geoid, roi=self.roi_shareloc, roi_is_in_physical_space=True, fill_nodata="mean", fill_value=0.0, ) self.elevation = ( bindings_cpp.DTMIntersection( # pylint: disable=I1101 dtm_image.epsg, dtm_image.alt_data, dtm_image.nb_rows, dtm_image.nb_columns, dtm_image.transform, ) ) else: self.elevation = default_alt
[docs] def get_roi(self, pairs_for_roi, epsg, margin=0.006): """ Compute region of interest for intersection of DEM :param pairs_for_roi: list of pairs of images and geomodels :type pairs_for_roi: List[(str, dict, str, dict)] :param dem_epsg: output EPSG code for ROI :type dem_epsg: int :param margin: margin for ROI in degrees :type margin: float """ coords_list = [] for image1, geomodel1, image2, geomodel2 in pairs_for_roi: # Footprint of left image coords_list.extend(self.image_envelope(image1, geomodel1)) # Footprint of right image coords_list.extend(self.image_envelope(image2, geomodel2)) # Footprint of rectification grid (with margins) image1 = SharelocGeometry.load_image(image1) geomodel1 = self.load_geom_model(geomodel1) geomodel2 = self.load_geom_model(geomodel2) epipolar_extent = rectif.get_epipolar_extent( image1, geomodel1, geomodel2, grid_margin=self.rectification_grid_margin, ) lat_min, lon_min, lat_max, lon_max = list(epipolar_extent) coords_list.extend([(lon_min, lat_min), (lon_max, lat_max)]) lon_list, lat_list = list(zip(*coords_list)) # noqa: B905 roi = [ min(lat_list) - margin, min(lon_list) - margin, max(lat_list) + margin, max(lon_list) + margin, ] points = np.array( [ (roi[1], roi[0], 0), (roi[3], roi[2], 0), (roi[1], roi[0], 0), (roi[3], roi[2], 0), ] ) new_points = projection.point_cloud_conversion(points, 4326, epsg) roi = [ min(new_points[:, 1]), min(new_points[:, 0]), max(new_points[:, 1]), max(new_points[:, 0]), ] return roi
[docs] @staticmethod def load_geom_model(model: dict) -> Union[Grid, RPC]: """ Load geometric model and returns it as a shareloc object :param model: Path and attributes for geometrical model :type model: dict with keys "path" and "model_type" :return: geometric model as a shareloc object (Grid or RPC) """ geomodel = model[GEO_MODEL_PATH_TAG] # Use RPC Type if none are used if model.get(GEO_MODEL_TYPE_TAG): geomodel_type = model[GEO_MODEL_TYPE_TAG] else: geomodel_type = RPC_TYPE # Use RPCoptim class to use optimized C++ direct localizations if geomodel_type == "RPC": geomodel_type = "RPCoptim" shareloc_model = GeoModel(geomodel, geomodel_type) if shareloc_model is None: raise ValueError(f"Model {geomodel} could not be read by shareloc") return shareloc_model
[docs] @staticmethod def load_image(img: str) -> Image: """ Load the image using the Image class of Shareloc :param img: path to the image :return: The Image object """ try: shareloc_img = Image(img, vertical_direction="north") except Exception as error: raise ValueError(f"Image type {img} is not supported") from error return shareloc_img
[docs] @staticmethod def check_product_consistency(sensor: str, geomodel: dict) -> bool: """ Test if the product is readable by the shareloc plugin TODO: not used - to evolve and use in CARS configuration early in pipeline process (new early check input common application ?) :param sensor: path to sensor image :param geomodel: path and attributes for geometrical model :return: sensor path and overloaded geomodel dict """ # Check geomodel schema consistency if isinstance(geomodel, str): geomodel = { "path": geomodel, } overloaded_geomodel = geomodel.copy() # If model_type is not defined, default is "RPC" overloaded_geomodel["model_type"] = geomodel.get("model_type", "RPC") geomodel_schema = {"path": str, "model_type": str} checker_geomodel = Checker(geomodel_schema) checker_geomodel.validate(overloaded_geomodel) # Try to read them using shareloc SharelocGeometry.load_image(sensor) SharelocGeometry.load_geom_model(overloaded_geomodel) return sensor, overloaded_geomodel
[docs] def triangulate( self, sensor1, sensor2, geomodel1, geomodel2, mode: str, matches: Union[xr.Dataset, np.ndarray], grid1: str, grid2: str, roi_key: Union[None, str] = None, ) -> np.ndarray: """ Performs triangulation from cars disparity or matches dataset :param sensor1: path to left sensor image :param sensor2: path to right sensor image :param geomodel1: path and attributes for left geomodel :param geomodel2: path and attributes for right geomodel :param mode: triangulation mode (constants.DISP_MODE or constants.MATCHES) :param matches: cars disparity dataset or matches as numpy array :param grid1: path or dataset for epipolar grid of sensor1 :param grid2: path or dataset for epipolar grid of sensor2 :param roi_key: dataset roi to use (can be cst.ROI or cst.ROI_WITH_MARGINS) :return: the long/lat/height numpy array in output of the triangulation """ # read geomodels using shareloc shareloc_model1 = SharelocGeometry.load_geom_model(geomodel1) shareloc_model2 = SharelocGeometry.load_geom_model(geomodel2) # get path if grid is of type CarsDataset TODO remove if isinstance(grid1, cars_dataset.CarsDataset): grid1 = grid1.attributes["path"] if isinstance(grid2, cars_dataset.CarsDataset): grid2 = grid2.attributes["path"] # interpolate if grid is a path if isinstance(grid1, str): grid1 = RectificationGrid( grid1, interpolator=self.interpolator, ) if isinstance(grid2, str): grid2 = RectificationGrid( grid2, interpolator=self.interpolator, ) # perform matches triangulation if mode is cst.MATCHES_MODE: __, point_wgs84, __ = epipolar_triangulation( matches=matches, mask=None, matches_type="sift", geometrical_model_left=shareloc_model1, geometrical_model_right=shareloc_model2, grid_left=grid1, grid_right=grid2, residues=True, fill_nan=True, ) llh = point_wgs84.reshape((point_wgs84.shape[0], 1, 3)) elif mode is cst.DISP_MODE: __, point_wgs84, __ = epipolar_triangulation( matches=matches, mask=None, matches_type="disp", geometrical_model_left=shareloc_model1, geometrical_model_right=shareloc_model2, grid_left=grid1, grid_right=grid2, residues=True, fill_nan=True, ) row = np.array( range(matches.attrs[roi_key][1], matches.attrs[roi_key][3]) ) col = np.array( range(matches.attrs[roi_key][0], matches.attrs[roi_key][2]) ) llh = point_wgs84.reshape((row.size, col.size, 3)) else: logging.error( "{} mode is not available in the " "shareloc plugin triangulation".format(mode) ) raise ValueError( f"{mode} mode is not available" " in the shareloc plugin triangulation" ) return llh
[docs] def generate_epipolar_grids( self, sensor1, sensor2, geomodel1, geomodel2, epipolar_step: int = 30, ) -> Tuple[ np.ndarray, np.ndarray, List[float], List[float], List[int], float ]: """ Computes the left and right epipolar grids :param sensor1: path to left sensor image :param sensor2: path to right sensor image :param geomodel1: path and attributes for left geomodel :param geomodel2: path and attributes for right geomodel :param epipolar_step: step to use to construct the epipolar grids :return: Tuple composed of : - the left epipolar grid as a numpy array - the right epipolar grid as a numpy array - the left grid origin as a list of float - the left grid spacing as a list of float - the epipolar image size as a list of int (x-axis size is given with the index 0, y-axis size with index 1) - the disparity to altitude ratio as a float """ # read inputs using shareloc shareloc_model1 = SharelocGeometry.load_geom_model(geomodel1) shareloc_model2 = SharelocGeometry.load_geom_model(geomodel2) image1 = SharelocGeometry.load_image(sensor1) image2 = SharelocGeometry.load_image(sensor2) # compute epipolar grids ( grid1, grid2, [epipolar_size_y, epipolar_size_x], alt_to_disp_ratio, ) = rectif.compute_stereorectification_epipolar_grids( image1, shareloc_model1, image2, shareloc_model2, self.elevation, epi_step=epipolar_step, margin=self.rectification_grid_margin, ) # rearrange output to match the expected structure of CARS # grid[:, :, 2] with altitudes is not used grid1 = grid1[:, :, 0:2][:, :, ::-1] grid2 = grid2[:, :, 0:2][:, :, ::-1] epipolar_size_x = int(np.floor(epipolar_size_x)) epipolar_size_y = int(np.floor(epipolar_size_y)) origin = [ float(-self.rectification_grid_margin * epipolar_step), float(-self.rectification_grid_margin * epipolar_step), ] spacing = [float(epipolar_step), float(epipolar_step)] # alt_to_disp_ratio does not consider image resolution with rio.open(sensor1, "r") as rio_dst: pixel_size_x, pixel_size_y = ( rio_dst.transform[0], rio_dst.transform[4], ) mean_size = (abs(pixel_size_x) + abs(pixel_size_y)) / 2 disp_to_alt_ratio = mean_size / alt_to_disp_ratio return ( grid1, grid2, origin, spacing, [epipolar_size_x, epipolar_size_y], disp_to_alt_ratio, )
[docs] def direct_loc( self, sensor, geomodel, x_coord: np.array, y_coord: np.array, z_coord: np.array = None, ) -> np.ndarray: """ For a given image point, compute the latitude, longitude, altitude Advice: to be sure, use x,y,z inputs only :param sensor: path to sensor image :param geomodel: path and attributes for geomodel :param x_coord: X Coordinate in input image sensor :param y_coord: Y Coordinate in input image sensor :param z_coord: Z Altitude coordinate to take the image :return: Latitude, Longitude, Altitude coordinates as a numpy array """ # load model and image with shareloc shareloc_model = SharelocGeometry.load_geom_model(geomodel) shareloc_image = SharelocGeometry.load_image(sensor) # perform direct localization operation loc = localization.Localization( shareloc_model, image=shareloc_image, elevation=self.elevation, epsg=4326, ) # Bug: y_coord and x_coord inversion to fit Shareloc standards row/col. # TODO: clean geometry convention calls in API lonlatalt = loc.direct( y_coord, x_coord, z_coord, using_geotransform=True ) lonlatalt = np.squeeze(lonlatalt) if len(lonlatalt.shape) == 1: latlonalt = np.array([lonlatalt[1], lonlatalt[0], lonlatalt[2]]) else: latlonalt = np.array( [lonlatalt[:, 1], lonlatalt[:, 0], lonlatalt[:, 2]] ) return latlonalt
[docs] def inverse_loc( self, sensor, geomodel, lat_coord: np.array, lon_coord: np.array, z_coord: np.array = None, ) -> np.ndarray: """ For a given image points list, compute the latitudes, longitudes, altitudes Advice: to be sure, use x,y,z list inputs only :param sensor: path to sensor image :param geomodel: path and attributes for geomodel :param lat_coord: latitute Coordinate list :param lon_coord: longitude Coordinates list :param z_coord: Z Altitude list :return: X / Y / Z Coordinates list in input image as a numpy array """ # load model and image with shareloc shareloc_model = SharelocGeometry.load_geom_model(geomodel) shareloc_image = SharelocGeometry.load_image(sensor) # perform inverse localization operation loc = localization.Localization( shareloc_model, image=shareloc_image, elevation=self.elevation, epsg=4326, ) # Rows and columns order is inversed col, row, alti = loc.inverse( lon_coord.astype(np.float64), lat_coord.astype(np.float64), h=z_coord.astype(np.float64), using_geotransform=True, ) return row, col, alti