Source code for cars.core.tiling

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Tiling module:
contains functions related to regions and tiles management
"""
# pylint: disable=too-many-lines

import logging

# Standard imports
import math
from typing import Dict, List, Tuple

# Third party imports
import numpy as np
from pyproj import CRS
from scipy.ndimage import generic_filter
from scipy.spatial import Delaunay  # pylint: disable=no-name-in-module
from scipy.spatial import cKDTree  # pylint: disable=no-name-in-module
from scipy.spatial import tsearch  # pylint: disable=no-name-in-module
from shapely.geometry import box, mapping
from shapely.geometry.multipolygon import MultiPolygon

from cars.orchestrator.cluster.log_wrapper import cars_profile


[docs] def grid( # pylint: disable=too-many-positional-arguments xmin: float, ymin: float, xmax: float, ymax: float, xsplit: int, ysplit: int ) -> np.ndarray: """ Generate grid of positions by splitting [xmin, xmax]x[ymin, ymax] in splits of xsplit x ysplit size :param xmin : xmin of the bounding box of the region to split :param ymin : ymin of the bounding box of the region to split :param xmax : xmax of the bounding box of the region to split :param ymax : ymax of the bounding box of the region to split :param xsplit: width of splits :param ysplit: height of splits :return: The output ndarray grid with nb_ysplits splits in first direction and nb_xsplits in second direction for 2 dimensions 0:x, 1:y :rtype: numpy array """ nb_xsplits = math.ceil((xmax - xmin) / xsplit) nb_ysplits = math.ceil((ymax - ymin) / ysplit) out_grid = np.ndarray( shape=(nb_ysplits + 1, nb_xsplits + 1, 2), dtype=float ) for i in range(0, nb_xsplits + 1): for j in range(0, nb_ysplits + 1): out_grid[j, i, 0] = min(xmax, xmin + i * xsplit) out_grid[j, i, 1] = min(ymax, ymin + j * ysplit) return out_grid
@cars_profile(name="Transform four layers to two layers grid", interval=0.5) def transform_four_layers_to_two_layers_grid(tiling_grid, terrain=False): """ Transform a 4 layer grid: (N, M, 4) containing [rowmin, rowmax, colmin, colmax] when epipolar and [xmin, xmax, ymin, ymax] with x = col and y = row into a 2 layer grid: (N+1, M+1, 2) containing x and y defined like : grid[j, i, 0] = min(xmax, xmin + i * xsplit) and grid[j, i, 1] = min(ymax, ymin + j * ysplit) :param tiling_grid: tiling grid :type tiling_grid: np.ndarray :return: 2D grid :rtype: np.ndarray """ if terrain is False: tiling_grid_ = tiling_grid.copy() tiling_grid_[:, :, [0, 1, 2, 3]] = tiling_grid_[:, :, [2, 3, 0, 1]] else: tiling_grid_ = tiling_grid.transpose(1, 0, 2).copy() arr = np.ndarray( shape=(tiling_grid_.shape[0] + 1, tiling_grid_.shape[1] + 1, 2), dtype=float, ) # Fill x arr[0:-1, 0:-1, 0] = tiling_grid_[:, :, 0] arr[0:-1, -1, 0] = tiling_grid_[:, -1, 1] arr[-1, :, 0] = arr[0, :, 0] # All rows are identical # Fill y arr[0:-1, 0:-1, 1] = tiling_grid_[:, :, 2] arr[-1, 0:-1, 1] = tiling_grid_[-1, :, 3] arr[:, -1, 1] = arr[:, 0, 1] # All cols are identical return arr @cars_profile(name="Transform disp range grid to two layers", interval=0.5) def transform_disp_range_grid_to_two_layers(disp_min_grid, disp_max_grid): """ Transform tiling disp min and max to N+1 M+1 array corresponding to N+1, M+1 , 2 tiling grid :param disp_min_grid: disp min tiling :type disp_min_grid: np ndarray :param disp_max_grid: disp max tiling :type disp_max_grid: np ndarray :return: disp_min_grid, disp_max_grid :rtype: np ndarray , np ndarray """ # Create a 2xN+1, 2xM+1 matrix to apply filter on it nb_row = 2 * disp_min_grid.shape[0] + 1 nb_col = 2 * disp_min_grid.shape[1] + 1 disp_min = np.full((nb_row, nb_col), np.nan) disp_max = np.full((nb_row, nb_col), np.nan) disp_min[1::2, 1::2] = disp_min_grid disp_max[1::2, 1::2] = disp_max_grid # Apply filter min and max: # as each cell represent a node of 4 tiles from a regular grid # we want for each node to # represent the min and max of 4 cells disp_min = generic_filter(disp_min, np.nanmin, [3, 3]) disp_max = generic_filter(disp_max, np.nanmax, [3, 3]) # eliminate odd indexes disp_min = disp_min[::2, ::2] disp_max = disp_max[::2, ::2] return disp_min, disp_max
[docs] def generate_tiling_grid( # pylint: disable=too-many-positional-arguments row_min: float, col_min: float, row_max: float, col_max: float, row_split: int, col_split: int, ) -> np.ndarray: """ Generate grid of positions by splitting [row_min, row_max] x [col_min, col_max] in splits of row_split x col_split size :param row_min : row_min of the bounding box of the region to split :param col_min : col_min of the bounding box of the region to split :param row_max : row_max of the bounding box of the region to split :param col_max : col_max of the bounding box of the region to split :param row_split: height of splits :param col_split: width of splits :return: The output ndarray grid with nb_row_split splits in first direction and nb_col_split in second direction for 2 dimensions 0:y, 1:x [row, col, :] containing [row_min, row_max, col_min, col_max] :rtype: numpy array """ nb_col_split = math.ceil((col_max - col_min) / col_split) nb_row_split = math.ceil((row_max - row_min) / row_split) out_grid = np.ndarray(shape=(nb_row_split, nb_col_split, 4), dtype=float) for row in range(0, nb_row_split): for col in range(0, nb_col_split): out_grid[row, col, 0] = min(row_max, row_min + row * row_split) out_grid[row, col, 1] = min( row_max, row_min + (row + 1) * row_split ) out_grid[row, col, 2] = min(col_max, col_min + col * col_split) out_grid[row, col, 3] = min( col_max, col_min + (col + 1) * col_split ) return out_grid
[docs] def compute_local_disp_range_from_grids( # pylint: disable=R0917 row_min, row_max, col_min, col_max, disp_min_grid_arr, disp_max_grid_arr, step_row, step_col, row_range, col_range, ): """ Compute local disparity min and max from disparity grids for a tile ROI. :param row_min: row min :param row_max: row max :param col_min: col min :param col_max: col max :param disp_min_grid_arr: disparity minimum grid :param disp_max_grid_arr: disparity maximum grid :param step_row: disparity grid row step :param step_col: disparity grid col step :param row_range: disparity grid row coordinates :param col_range: disparity grid col coordinates :return: rounded local disparity min and max :rtype: tuple(int, int) """ assert row_min < row_max assert col_min < col_max # Get region in grid grid_row_min = max(0, int(np.floor((row_min - 1) / step_row)) - 1) grid_row_max = min( len(row_range), int(np.ceil((row_max + 1) / step_row) + 1) ) grid_col_min = max(0, int(np.floor((col_min - 1) / step_col)) - 1) grid_col_max = min( len(col_range), int(np.ceil((col_max + 1) / step_col)) + 1 ) # Compute disp min and max in row disp_min = np.min( disp_min_grid_arr[grid_row_min:grid_row_max, grid_col_min:grid_col_max] ) disp_max = np.max( disp_max_grid_arr[grid_row_min:grid_row_max, grid_col_min:grid_col_max] ) # Round disp min and max return int(math.floor(disp_min)), int(math.ceil(disp_max))
[docs] def split( xmin, ymin, xmax, ymax, xsplit, ysplit ): # pylint: disable=too-many-positional-arguments """ Split a region defined by [xmin, xmax] x [ymin, ymax] in splits of xsplit x ysplit size :param xmin : xmin of the bounding box of the region to split :type xmin: float :param ymin : ymin of the bounding box of the region to split :type ymin: float :param xmax : xmax of the bounding box of the region to split :type xmax: float :param ymax : ymax of the bounding box of the region to split :type ymax: float :param xsplit: width of splits :type xsplit: int :param ysplit: height of splits :type ysplit: int :return: A list of splits represented by arrays of 4 elements [xmin, ymin, xmax, ymax] :rtype: list of 4 float """ nb_xsplits = math.ceil((xmax - xmin) / xsplit) nb_ysplits = math.ceil((ymax - ymin) / ysplit) terrain_regions = [] for i in range(0, nb_xsplits): for j in range(0, nb_ysplits): region = [ xmin + i * xsplit, ymin + j * ysplit, xmin + (i + 1) * xsplit, ymin + (j + 1) * ysplit, ] # Crop to largest region region = crop(region, [xmin, ymin, xmax, ymax]) terrain_regions.append(region) return terrain_regions
[docs] def crop(region1, region2): """ Crop a region by another one :param region1: The region to crop as an array [xmin, ymin, xmax, ymax] :type region1: list of four float :param region2: The region used for cropping as an array [xmin, ymin, xmax, ymax] :type region2: list of four float :return: The cropped regiona as an array [xmin, ymin, xmax, ymax]. If region1 is outside region2, might result in inconsistent region :rtype: list of four float """ out = region1[:] out[0] = min(region2[2], max(region2[0], region1[0])) out[2] = min(region2[2], max(region2[0], region1[2])) out[1] = min(region2[3], max(region2[1], region1[1])) out[3] = min(region2[3], max(region2[1], region1[3])) return out
[docs] def pad(region, margins): """ Pad region according to a margin :param region: The region to pad :type region: list of four floats :param margins: Margin to add :type margins: list of four floats :return: padded region :rtype: list of four float """ out = region[:] out[0] -= margins[0] out[1] -= margins[1] out[2] += margins[2] out[3] += margins[3] return out
[docs] def empty(region): """ Check if a region is empty or inconsistent :param region: region as an array [xmin, ymin, xmax, ymax] :type region: list of four float :return: True if the region is considered empty (no pixels inside), False otherwise :rtype: bool""" return region[0] >= region[2] or region[1] >= region[3]
[docs] def union(regions): """ Returns union of all regions :param regions: list of region as an array [xmin, ymin, xmax, ymax] :type regions: list of list of four float :return: xmin, ymin, xmax, ymax :rtype: list of 4 float """ xmin = min((r[0] for r in regions)) xmax = max((r[2] for r in regions)) ymin = min((r[1] for r in regions)) ymax = max((r[3] for r in regions)) return xmin, ymin, xmax, ymax
[docs] def list_tiles(region, largest_region, tile_size, margin=1): """ Given a region, cut largest_region into tiles of size tile_size and return tiles that intersect region within margin pixels. :param region: The region to list intersecting tiles :type region: list of four float :param largest_region: The region to split :type largest_region: list of four float :param tile_size: Width of tiles for splitting (squared tiles) :type tile_size: int :param margin: Also include margin neighboring tiles :type margin: int :return: A list of tiles as dicts containing idx and idy :rtype: list of dict """ # Find tile indices covered by region min_tile_idx_x = int(math.floor(region[0] / tile_size)) max_tile_idx_x = int(math.ceil(region[2] / tile_size)) min_tile_idx_y = int(math.floor(region[1] / tile_size)) max_tile_idx_y = int(math.ceil(region[3] / tile_size)) # Include additional tiles min_tile_idx_x -= margin min_tile_idx_y -= margin max_tile_idx_x += margin max_tile_idx_y += margin out = [] # Loop on tile idx for tile_idx_x in range(min_tile_idx_x, max_tile_idx_x): for tile_idx_y in range(min_tile_idx_y, max_tile_idx_y): # Derive tile coordinates tile = [ tile_idx_x * tile_size, tile_idx_y * tile_size, (tile_idx_x + 1) * tile_size, (tile_idx_y + 1) * tile_size, ] # Crop to largest region tile = crop(tile, largest_region) # Check if tile is empty if not empty(tile): out.append({"idx": tile_idx_x, "idy": tile_idx_y, "tile": tile}) return out
[docs] def roi_to_start_and_size(region, resolution): """ Convert roi as array of [xmin, ymin, xmax, ymax] to xmin, ymin, xsize, ysize given a resolution Beware that a negative spacing is considered for y axis, and thus returned ystart is in fact ymax :param region: The region to convert :type region: list of four float :param resolution: The resolution to use to determine sizes :type resolution: float :return: xstart, ystart, xsize, ysize tuple :rtype: list of two float + two int """ xstart = region[0] ystart = region[3] xsize = int(np.round((region[2] - region[0]) / resolution)) ysize = int(np.round((region[3] - region[1]) / resolution)) return xstart, ystart, xsize, ysize
[docs] def snap_to_grid(xmin, ymin, xmax, ymax, resolution): """ Given a roi as xmin, ymin, xmax, ymax, snap values to entire step of resolution :param xmin: xmin of the roi :type xmin: float :param ymin: ymin of the roi :type ymin: float :param xmax: xmax of the roi :type xmax: float :param ymax: ymax of the roi :type ymax: float :param resolution: size of cells for snapping :type resolution: float :return: xmin, ymin, xmax, ymax snapped tuple :rtype: list of four float """ xmin = math.floor(xmin / resolution) * resolution xmax = math.ceil(xmax / resolution) * resolution ymin = math.floor(ymin / resolution) * resolution ymax = math.ceil(ymax / resolution) * resolution return xmin, ymin, xmax, ymax
[docs] def filter_simplices_on_the_edges( original_grid_shape: Tuple, tri: Delaunay, simplices: np.ndarray ): """ Filter simplices on the edges which allows to cut triangles out of the concave Delaunay triangulation. :param original_grid_shape: shape of the original grid (almost regular) used to create delaunay triangulation :param tri: Delaunay triangulation :param simplices: Selected simplices to filter: set -1 if selected simplex is on the edges """ # Filter simplices on the edges edges = np.zeros((4, *original_grid_shape)) # left, bottom, right, top edges[0, :, 0] = 1 edges[1, -1, :] = 1 edges[2, :, -1] = 1 edges[3, 0, :] = 1 for idx in range(edges.shape[0]): edges_ravel = np.ravel(edges[idx, :, :]) # simplices filtered if all points are on an edge edges_simplices = np.sum(edges_ravel[tri.simplices], axis=1) == 3 simplices[edges_simplices[simplices]] = -1
[docs] def terrain_grid_to_epipolar( terrain_tiling_grid, epipolar_tiling_grid, epipolar_grid_min, epipolar_grid_max, epsg, ): """ Transform terrain grid to epipolar region """ terrain_regions_grid = transform_four_layers_to_two_layers_grid( terrain_tiling_grid, terrain=True ) epipolar_regions_grid = transform_four_layers_to_two_layers_grid( epipolar_tiling_grid ) epipolar_regions_grid_shape = np.shape(epipolar_regions_grid)[:2] epipolar_regions_grid_flat = epipolar_regions_grid.reshape( -1, epipolar_regions_grid.shape[-1] ) # in the following code a factor is used to increase the precision spatial_ref = CRS.from_epsg(epsg) if spatial_ref.is_geographic: precision_factor = 1000.0 else: precision_factor = 1.0 # Build delaunay_triangulation tri_min = Delaunay(epipolar_grid_min * precision_factor) tri_max = Delaunay(epipolar_grid_max * precision_factor) # Build kdtrees tree_min = cKDTree(epipolar_grid_min * precision_factor) tree_max = cKDTree(epipolar_grid_max * precision_factor) # Look-up terrain_regions_grid with Delaunay s_min = tsearch(tri_min, terrain_regions_grid * precision_factor) s_max = tsearch(tri_max, terrain_regions_grid * precision_factor) # Filter simplices on the edges filter_simplices_on_the_edges(epipolar_regions_grid_shape, tri_min, s_min) filter_simplices_on_the_edges(epipolar_regions_grid_shape, tri_max, s_max) points_disp_min = epipolar_regions_grid_flat[tri_min.simplices[s_min]] points_disp_max = epipolar_regions_grid_flat[tri_max.simplices[s_max]] nn_disp_min = epipolar_regions_grid_flat[ tree_min.query(terrain_regions_grid * precision_factor)[1] ] nn_disp_max = epipolar_regions_grid_flat[ tree_max.query(terrain_regions_grid * precision_factor)[1] ] points_disp_min_min = np.min(points_disp_min, axis=2) points_disp_min_max = np.max(points_disp_min, axis=2) points_disp_max_min = np.min(points_disp_max, axis=2) points_disp_max_max = np.max(points_disp_max, axis=2) # Use either Delaunay search or NN search # if delaunay search fails (point outside triangles) points_disp_min_min = np.where( np.stack((s_min, s_min), axis=-1) != -1, points_disp_min_min, nn_disp_min, ) points_disp_min_max = np.where( np.stack((s_min, s_min), axis=-1) != -1, points_disp_min_max, nn_disp_min, ) points_disp_max_min = np.where( np.stack((s_max, s_max), axis=-1) != -1, points_disp_max_min, nn_disp_max, ) points_disp_max_max = np.where( np.stack((s_max, s_max), axis=-1) != -1, points_disp_max_max, nn_disp_max, ) points = np.stack( ( points_disp_min_min, points_disp_min_max, points_disp_max_min, points_disp_max_max, ), axis=0, ) points_min = np.min(points, axis=0) points_max = np.max(points, axis=0) return points_min, points_max
[docs] def region_hash_string(region: Tuple): """ This lambda will allow to derive a key to index region in the previous dictionary :param region: region to hash """ return "{}_{}_{}_{}".format(region[0], region[1], region[2], region[3])
# pylint: disable=too-many-positional-arguments
[docs] def get_corresponding_tiles_row_col( terrain_tiling_grid: np.ndarray, row: int, col: int, list_point_clouds: list, list_epipolar_points_min: list, list_epipolar_points_max: list, ) -> Tuple[List, List, List]: """ This function allows to get required point cloud for each terrain region. :param terrain_tiling_grid: terrain grid positions :param row: row :param col: column epipolar input tiles where keys are image pairs index and values are epipolar_points_min, epipolar_points_max, largest_epipolar_region, opt_epipolar_tile_size :return: Terrain regions Corresponding tiles selected from delayed_point_clouds with associated id Terrain regions "rank" allowing to sorting tiles for dask processing """ logging.debug( "Processing tile located at {},{} in tile grid".format(row, col) ) # Terrain grid [row, j, :] = [xmin, xmax, ymin, ymax] # terrain region = [xmin, ymin, xmax, ymax] terrain_region = [ terrain_tiling_grid[row, col, 0], terrain_tiling_grid[row, col, 2], terrain_tiling_grid[row, col, 1], terrain_tiling_grid[row, col, 3], ] # reverse convention as row and col correspond to new format # Former format is transposed row, col = col, row logging.debug("Corresponding terrain region: {}".format(terrain_region)) # This list will hold the required points clouds for this terrain tile required_point_clouds = [] # This list contains indexes of tiles (debug purpose) list_indexes = [] # For each stereo configuration for pc_id, ( point_cloud, epipolar_points_min, epipolar_points_max, ) in enumerate( zip( # noqa: B905 list_point_clouds, list_epipolar_points_min, list_epipolar_points_max, ) ): largest_epipolar_region = point_cloud.attributes[ "largest_epipolar_region" ] opt_epipolar_tile_size = point_cloud.attributes[ "opt_epipolar_tile_size" ] tile_min = np.minimum( np.minimum( np.minimum( epipolar_points_min[row, col], epipolar_points_min[row + 1, col], ), np.minimum( epipolar_points_min[row + 1, col + 1], epipolar_points_min[row, col + 1], ), ), np.minimum( np.minimum( epipolar_points_max[row, col], epipolar_points_max[row + 1, col], ), np.minimum( epipolar_points_max[row + 1, col + 1], epipolar_points_max[row, col + 1], ), ), ) tile_max = np.maximum( np.maximum( np.maximum( epipolar_points_min[row, col], epipolar_points_min[row + 1, col], ), np.maximum( epipolar_points_min[row + 1, col + 1], epipolar_points_min[row, col + 1], ), ), np.maximum( np.maximum( epipolar_points_max[row, col], epipolar_points_max[row + 1, col], ), np.maximum( epipolar_points_max[row + 1, col + 1], epipolar_points_max[row, col + 1], ), ), ) # Bounding region of corresponding cell epipolar_region_minx = tile_min[0] epipolar_region_miny = tile_min[1] epipolar_region_maxx = tile_max[0] epipolar_region_maxy = tile_max[1] epipolar_region = [ epipolar_region_minx, epipolar_region_miny, epipolar_region_maxx, epipolar_region_maxy, ] # Crop epipolar region to largest region epipolar_region = crop(epipolar_region, largest_epipolar_region) logging.debug( "Corresponding epipolar region: {}".format(epipolar_region) ) # Check if the epipolar region contains any pixels to process if empty(epipolar_region): logging.debug( "Skipping terrain region " "because corresponding epipolar region is empty" ) else: # Loop on all epipolar tiles covered by epipolar region for epipolar_tile in list_tiles( epipolar_region, largest_epipolar_region, opt_epipolar_tile_size, ): id_x = epipolar_tile["idx"] id_y = epipolar_tile["idy"] epi_grid_shape = point_cloud.tiling_grid.shape if ( 0 <= id_x < epi_grid_shape[1] and 0 <= id_y < epi_grid_shape[0] ): required_point_clouds.append( (point_cloud[id_y, id_x], pc_id) ) list_indexes.append([id_y, id_x]) rank = col * col + row * row return ( terrain_region, required_point_clouds, rank, list_indexes, )
[docs] def get_paired_regions_as_geodict( terrain_regions: List, epipolar_regions: List ) -> Tuple[Dict, Dict]: """ Get paired regions (terrain/epipolar) as "geodictionnaries": these objects can be dumped into geojson files to be visualized. :param terrain_regions: terrain region respecting cars tiling :param epipolar_regions: corresponding epipolar regions :return: Terrain dictionary and Epipolar dictionary containing respectively Terrain tiles in terrain projection and Epipolar tiles in epipolar projection """ ter_geodict = {"type": "FeatureCollection", "features": []} epi_geodict = {"type": "FeatureCollection", "features": []} for idx, (ter, epi_list) in enumerate( zip(terrain_regions, epipolar_regions) # noqa: B905 ): feature = {} feature["type"] = "Feature" feature["properties"] = {"id": idx, "nb_epi": len(epi_list)} feature["geometry"] = mapping(box(*ter)) ter_geodict["features"].append(feature.copy()) feature["geometry"] = mapping(MultiPolygon(box(*x) for x in epi_list)) epi_geodict["features"].append(feature.copy()) return ter_geodict, epi_geodict