Source code for cars.applications.grid_generation.grid_generation_algo

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Grids module:
contains functions used for epipolar grid creation and correction
"""

# Standard imports
from __future__ import absolute_import

import logging
import math
import os

# Third party imports
import numpy as np
import pandas
import rasterio as rio
import xarray as xr
from affine import Affine

# TODO depends on another step (and a later one) : make it independent
from cars.applications.triangulation.triangulation_algo import (
    triangulate_matches,
)
from cars.core import constants as cst
from cars.core import inputs, projection, tiling
from cars.orchestrator.cluster.log_wrapper import cars_profile


[docs] def get_new_path(path): """ Check path, if exists, creates new one :param path: path to check :type path: str :return : new path :rtype: str """ current_increment = 0 head, tail = os.path.splitext(path) current_path = path while os.path.exists(current_path): current_increment += 1 current_path = head + "_" + repr(current_increment) + tail return current_path
[docs] def write_grid(grid, fname, origin, spacing): """ Write an epipolar rectification grid to file :param grid: the grid to write :type grid: 3D numpy array :param fname: the filename to which the grid will be written :type fname: string :param origin: origin of the grid :type origin: (float, float) :param spacing: spacing of the grid :type spacing: (float, float) """ geotransform = ( origin[0] - 0.5 * spacing[0], spacing[0], 0.0, origin[1] - 0.5 * spacing[1], 0.0, spacing[1], ) transform = Affine.from_gdal(*geotransform) with rio.open( fname, "w", height=grid.shape[0], width=grid.shape[1], count=2, driver="GTiff", dtype=grid.dtype, transform=transform, ) as dst: dst.write_band(1, grid[:, :, 0]) dst.write_band(2, grid[:, :, 1])
[docs] def generate_epipolar_grids( # pylint: disable=too-many-positional-arguments sensor1, sensor2, geomodel1, geomodel2, geometry_plugin, epipolar_step ): """ Computes the left and right epipolar grids :param sensor1: path to left sensor image :param sensor2: path to right sensor image :param geomodel1: path and attributes for left geomodel :param geomodel2: path and attributes for right geomodel :param geometry_plugin: geometry plugin to use :type geometry_plugin: AbstractGeometry :param epipolar_step: step to use to construct the epipolar grids :return: Tuple composed of : - the left epipolar grid as a numpy array - the right epipolar grid as a numpy array - the left grid origin as a list of float - the left grid spacing as a list of float - the epipolar image size as a list of int\ (x-axis size is given with the index 0, y-axis size with index 1) - the disparity to altitude ratio as a float """ logging.info("Generating epipolar rectification grid ...") return geometry_plugin.generate_epipolar_grids( sensor1, sensor2, geomodel1, geomodel2, epipolar_step=epipolar_step, )
# pylint: disable=too-many-positional-arguments @cars_profile(name="Compute epipolar grid min max", interval=0.5) def compute_epipolar_grid_min_max( geometry_plugin, grid, sensor1, sensor2, geomodel1, geomodel2, grid1, grid2, epsg, disp_min_tiling, disp_max_tiling, ): """ Compute ground terrain location of epipolar grids at disp_min and disp_max :param geometry_plugin: geometry plugin to use :type geometry_plugin: AbstractGeometry :param grid: The epipolar grid to project :type grid: np.ndarray of shape (N,M,2) :param sensor1: path to left sensor image :type sensor1: str :param sensor2: path to right sensor image :type sensor2: str :param geomodel1: path and attributes for left geomodel :type geomodel1: dict :param geomodel2: path and attributes for right geomodel :type geomodel2: dict :param grid1: dict of the reference image grid file :type grid1: dict :param grid2: dict of the secondary image grid file :type grid2: dict :param epsg: EPSG code of the terrain projection :type epsg: Int :param disp_min_tiling: Minimum disparity tiling :type disp_min_tiling: np ndarray or int :param disp_max_tiling: Maximum disparity tiling :type disp_max_tiling: np ndarray or int :return: a tuple of location grid at disp_min and disp_max :rtype: Tuple(np.ndarray, np.ndarray) same shape as grid param """ if isinstance(disp_min_tiling, np.ndarray): disp_min_tiling = np.floor(disp_min_tiling).flatten() else: disp_min_tiling = math.floor(disp_min_tiling) if isinstance(disp_max_tiling, np.ndarray): disp_max_tiling = np.ceil(disp_max_tiling).flatten() else: disp_max_tiling = math.ceil(disp_max_tiling) image_bounds = inputs.rasterio_get_bounds(grid2["path"]) min_px_right = image_bounds[0] max_px_right = image_bounds[2] # Generate disp_min and disp_max matches right_cols_min = grid[:, :, 0].flatten() + disp_min_tiling matches_min = np.stack( ( grid[:, :, 0].flatten(), grid[:, :, 1].flatten(), np.clip(right_cols_min, min_px_right, max_px_right), grid[:, :, 1].flatten(), ), axis=1, ) right_cols_max = grid[:, :, 0].flatten() + disp_max_tiling matches_max = np.stack( ( grid[:, :, 0].flatten(), grid[:, :, 1].flatten(), np.clip(right_cols_max, min_px_right, max_px_right), grid[:, :, 1].flatten(), ), axis=1, ) # If the matches had to be clipped, log it clipped_matches_min = np.argwhere(right_cols_min != matches_min[:, 2]) clipped_matches_max = np.argwhere(right_cols_max != matches_max[:, 2]) nb_elems_clipped = len(clipped_matches_min) + len(clipped_matches_max) if nb_elems_clipped > 0: logging.info( f"{nb_elems_clipped} points were clipped to the right " "image's bounds." ) # Generate corresponding point clouds pc_min = triangulate_matches( geometry_plugin, sensor1, sensor2, geomodel1, geomodel2, grid1, grid2, matches_min, interpolation_method="linear", ) pc_max = triangulate_matches( geometry_plugin, sensor1, sensor2, geomodel1, geomodel2, grid1, grid2, matches_max, interpolation_method="linear", ) # Convert to correct EPSG projection.point_cloud_conversion_dataset(pc_min, epsg) projection.point_cloud_conversion_dataset(pc_max, epsg) # Form grid_min and grid_max grid_min = None grid_max = None if isinstance(pc_min, xr.Dataset): grid_min = np.concatenate( (pc_min[cst.X].values, pc_min[cst.Y].values), axis=1 ) grid_max = np.concatenate( (pc_max[cst.X].values, pc_max[cst.Y].values), axis=1 ) elif isinstance(pc_min, pandas.DataFrame): grid_min = np.stack( (pc_min[cst.X].to_numpy(), pc_min[cst.Y].to_numpy()), axis=-1 ) grid_max = np.stack( (pc_max[cst.X].to_numpy(), pc_max[cst.Y].to_numpy()), axis=-1 ) else: logging.error("pc min/max error: point cloud is unknown") return grid_min, grid_max
[docs] def terrain_region_to_epipolar( # pylint: disable=too-many-positional-arguments region, sensor1, sensor2, geomodel1, geomodel2, grid_left, grid_right, geometry_plugin, epsg=4326, disp_min=0, disp_max=0, tile_size=100, epipolar_size_x=None, epipolar_size_y=None, ): """ Transform terrain region to epipolar region :param region: terrain region to use :param sensor1: path to left sensor image :param sensor2: path to right sensor image :param geomodel1: path and attributes for left geomodel :param geomodel2: path and attributes for right geomodel :param grid1: dataset of the reference image grid file :param grid2: dataset of the secondary image grid file :param geometry_plugin: geometry plugin to use :param epsg: epsg :param disp_min: minimum disparity :param disp_max: maximum disparity :param tile_size: tile size for grid :param epipolar_size_x: epipolar_size_x :param epipolar_size_y: epipolar_size_y :return: epipolar region to use, with tile_size a sample """ disp_min = int(disp_min) disp_max = int(disp_max) # Generate terrain grid only on roi xmin = region[0] xmax = region[2] ymin = region[1] ymax = region[3] opt_terrain_size = max((xmax - xmin), (ymax - ymin)) region_grid = tiling.generate_tiling_grid( xmin, ymin, xmax, ymax, opt_terrain_size, opt_terrain_size, ) # Generate fake epipolar grid epipolar_grid = tiling.generate_tiling_grid( 0, 0, epipolar_size_y, epipolar_size_x, tile_size, tile_size, ) # Compute disp_min and disp_max location for epipolar grid ( epipolar_grid_min, epipolar_grid_max, ) = compute_epipolar_grid_min_max( geometry_plugin, tiling.transform_four_layers_to_two_layers_grid(epipolar_grid), sensor1, sensor2, geomodel1, geomodel2, grid_left, grid_right, epsg, disp_min, disp_max, ) # Compute epipolar points min and max on terrain region points_min, points_max = tiling.terrain_grid_to_epipolar( region_grid, epipolar_grid, epipolar_grid_min, epipolar_grid_max, epsg ) # Bouding region of corresponding cell epipolar_region_minx = np.min(points_min[:, :, 0]) epipolar_region_miny = np.min(points_min[:, :, 1]) epipolar_region_maxx = np.max(points_max[:, :, 0]) epipolar_region_maxy = np.max(points_max[:, :, 1]) # Generate epipolar region epipolar_region = [ epipolar_region_miny, epipolar_region_maxy, epipolar_region_minx, epipolar_region_maxx, ] return epipolar_region