Source code for cars.core.preprocessing

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Preprocessing contains function used in pipelines
"""
# pylint: disable=too-many-lines

# Standard imports
from __future__ import print_function

import json
import logging
import math
import os

import numpy as np
import utm
from pyproj import CRS
from shapely.geometry import Polygon

import cars.orchestrator.orchestrator as ocht
from cars.applications.grid_generation import grid_generation_algo as grids_algo

# CARS imports
from cars.core import inputs, projection, tiling
from cars.core.utils import safe_makedirs
from cars.orchestrator.cluster.log_wrapper import cars_profile
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst

PREPROCESSING_TAG = "pair_preprocessing"
LEFT_ENVELOPE_TAG = "left_envelope"
RIGHT_ENVELOPE_TAG = "right_envelope"
ENVELOPES_INTERSECTION_TAG = "envelopes_intersection"
ENVELOPES_INTERSECTION_BB_TAG = "envelopes_intersection_bounding_box"


[docs] def get_utm_zone_as_epsg_code(lon, lat): """ Returns the EPSG code of the UTM zone where the lat, lon point falls in :param lon: longitude of the point :type lon: float :param lat: latitude of the point :type lat: float :return: The EPSG code corresponding to the UTM zone :rtype: int """ if lon is None or lat is None or np.isnan(lon) or np.isnan(lat): logging.warning( "An incorrect position was given when trying " "to select the right EPSG for computations. " "The default EPSG 32632 will be used." ) return 32632 if lat > 84: logging.warning( "Since the latitude is above 84°, the EPSG 32661 will be used." ) return 32661 if lat < -80: logging.warning( "Since the latitude is under -80°, the EPSG 32761 will be used." ) return 32761 zone = utm.from_latlon(lat, lon)[2] north_south = 600 if lat >= 0 else 700 return 32000 + north_south + zone
@cars_profile(name="Compute terrain bbox") def compute_terrain_bbox( # pylint: disable=too-many-positional-arguments # noqa: 751 sensor_image_left, sensor_image_right, epipolar_image_left, grid_left, grid_right, epsg, geometry_plugin, disp_min=-10, disp_max=10, resolution=0.5, roi_poly=None, pair_key="PAIR_0", pair_folder=None, orchestrator=None, check_inputs=False, ): """ Compute terrain bounding box of current pair :param srtm_dir: srtm directory :type srtm_dir: str :param default_alt: default altitude :type default_alt: int :param geoid: geoid path :type geoid: str :param sensor_image_left: left image Dict Must contain keys : "image", "texture", "geomodel", "no_data", "mask". Paths must be absolutes :type sensor_image_left: dict :param sensor_image_right: right image Dict Must contain keys : "image", "texture", "geomodel", "no_data", "mask". Paths must be absolutes :type sensor_image_right: dict :param grid_left: left grid. Grid dict contains : - "grid_spacing", "grid_origin", \ "epipolar_size_x", "epipolar_size_y", \ "epipolar_origin_x", "epipolar_origin_y", \ "epipolar_spacing_x", "epipolar_spacing", \ "disp_to_alt_ratio", "path" :type grid_left: dict :param grid_right: right grid. Grid dict contains : - "grid_spacing", "grid_origin",\ "epipolar_size_x", "epipolar_size_y", \ "epipolar_origin_x", "epipolar_origin_y", \ "epipolar_spacing_x", "epipolar_spacing", \ "disp_to_alt_ratio", "path" :type grid_right: dict :param epsg: epsg to use :type epsg: str :param geometry_plugin: geometry plugin to use :type geometry_plugin: AbstractGeometry :param disp_min: minimum disparity :type disp_min: int :param disp_max: maximum disparity :type disp_max: int :param resolution: resolution :type resolution: float :param roi_poly: roi polygon :type roi_poly: Polygon :param pair_key: pair key id :type pair_key: str :param pair_folder: pair folder to save data to :type pair_folder: str :param orchestrator: orchestrator :type orchestrator: Orchestrator :param check_inputs: true if user wants to check inputs :type check_inputs: bool :return: former post prepare configuration :rtype: dict """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) if pair_folder is None: pair_folder = os.path.join(orchestrator.out_dir, "tmp") safe_makedirs(pair_folder) out_dir = pair_folder # Check that the envelopes intersect one another logging.info("Computing images envelopes and their intersection") geojson1 = os.path.join(out_dir, "left_envelope.geojson") geojson2 = os.path.join(out_dir, "right_envelope.geojson") out_envelopes_intersection = os.path.join( out_dir, "envelopes_intersection.geojson" ) sensor1 = sensor_image_left[sens_cst.INPUT_IMG] sensor2 = sensor_image_right[sens_cst.INPUT_IMG] geomodel1 = sensor_image_left[sens_cst.INPUT_GEO_MODEL] geomodel2 = sensor_image_right[sens_cst.INPUT_GEO_MODEL] inter_poly, ( inter_xmin, inter_ymin, inter_xmax, inter_ymax, ) = projection.ground_intersection_envelopes( sensor1["bands"]["b0"]["path"], sensor2["bands"]["b0"]["path"], geomodel1, geomodel2, geometry_plugin, geojson1, geojson2, out_envelopes_intersection, envelope_file_driver="GeoJSON", intersect_file_driver="GeoJSON", ) json_data_1 = None json_data_2 = None json_data_intersect = None try: with open(geojson1, "r", encoding="utf-8") as file_1: json_data_1 = json.load(file_1) with open(geojson2, "r", encoding="utf-8") as file_2: json_data_2 = json.load(file_2) with open(out_envelopes_intersection, "r", encoding="utf-8") as file_3: json_data_intersect = json.load(file_3) except Exception as exc: raise FileNotFoundError("Unable to read bbox GeoJSON files") from exc # update out_json updating_dict = { PREPROCESSING_TAG: { pair_key: { LEFT_ENVELOPE_TAG: json_data_1, RIGHT_ENVELOPE_TAG: json_data_2, ENVELOPES_INTERSECTION_TAG: json_data_intersect, ENVELOPES_INTERSECTION_BB_TAG: [ inter_xmin, inter_ymin, inter_xmax, inter_ymax, ], } } } orchestrator.update_out_info(updating_dict) if check_inputs: logging.info("Checking DEM coverage") _, epsg1 = inputs.read_vector(geojson1) __, dem_coverage = projection.compute_dem_intersection_with_poly( geometry_plugin.dem, inter_poly, epsg1 ) if dem_coverage < 100.0: logging.warning( "The input DEM covers {}% of the useful zone".format( int(dem_coverage) ) ) # Get largest epipolar regions from configuration file largest_epipolar_region = [ 0, 0, grid_left["epipolar_size_x"], grid_left["epipolar_size_y"], ] # Numpy array with corners of largest epipolar region. # Order does not matter here, # since it will be passed to grids.compute_epipolar_grid_min_max corners = np.array( [ [ [largest_epipolar_region[0], largest_epipolar_region[1]], [largest_epipolar_region[0], largest_epipolar_region[3]], ], [ [largest_epipolar_region[2], largest_epipolar_region[3]], [largest_epipolar_region[2], largest_epipolar_region[1]], ], ], dtype=np.float64, ) # Compute terrain min and max again, this time using estimated epsg code terrain_dispmin, terrain_dispmax = grids_algo.compute_epipolar_grid_min_max( geometry_plugin, corners, sensor1, sensor2, geomodel1, geomodel2, grid_left, grid_right, epsg, disp_min, disp_max, ) # Compute bounds from epipolar image corners and dispmin/dispmax terrain_bounds = np.stack((terrain_dispmin, terrain_dispmax), axis=0) terrain_min = np.nanmin(terrain_bounds, axis=(0, 1)) terrain_max = np.nanmax(terrain_bounds, axis=(0, 1)) terrain_area = (terrain_max[0] - terrain_min[0]) * ( terrain_max[1] - terrain_min[1] ) logging.info( "Terrain area covered: {} square meters (or square degrees)".format( terrain_area ) ) # Retrieve bounding box of the ground intersection of the envelopes inter_poly, inter_epsg = inputs.read_vector(out_envelopes_intersection) # Project polygon if epsg is different if epsg != inter_epsg: inter_poly = projection.polygon_projection(inter_poly, inter_epsg, epsg) (inter_xmin, inter_ymin, inter_xmax, inter_ymax) = inter_poly.bounds # Align bounding box to integer resolution steps xmin, ymin, xmax, ymax = tiling.snap_to_grid( inter_xmin, inter_ymin, inter_xmax, inter_ymax, resolution ) logging.info( "Terrain bounding box : [{}, {}] x [{}, {}]".format( xmin, xmax, ymin, ymax ) ) terrain_bounding_box = [xmin, ymin, xmax, ymax] # Check if roi given by user intersects with current terrain region if roi_poly is not None: if not roi_poly.intersects(inter_poly): logging.warning( "The pair composed of {} and {} " "does not intersect the requested ROI".format( sensor_image_left[sens_cst.INPUT_IMG], sensor_image_right[sens_cst.INPUT_IMG], ) ) # Get number of epipolar tiles that are previously used nb_epipolar_tiles = ( epipolar_image_left.shape[0] * epipolar_image_left.shape[1] ) # Compute average epipolar tile width epipolar_average_tile_width = math.sqrt(terrain_area / nb_epipolar_tiles) return (terrain_bounding_box, epipolar_average_tile_width), inter_poly @cars_profile(name="Compute roi poly") def compute_roi_poly(input_roi_poly, input_roi_epsg, epsg): """ Compute roi polygon from input roi :param input_roi_poly: roi polygon :type input_roi_poly: shapely Polygon :param input_roi_epsg: epsg of roi :type input_roi_epsg: str :param epsg: epsg to use :type epsg: str :return: polygon of roi with right epsg :rtype: Polygon """ roi_poly = input_roi_poly if input_roi_poly is not None: if input_roi_epsg != epsg: roi_poly = projection.polygon_projection( roi_poly, input_roi_epsg, epsg ) return roi_poly @cars_profile(name="Compute epsg") def compute_epsg( # pylint: disable=too-many-positional-arguments sensor_image_left, sensor_image_right, grid_left, grid_right, geometry_plugin, disp_min=-10, disp_max=10, ): """ Compute epsg to use :param sensor_image_left: left image Dict Must contain keys : "image", "texture", "geomodel", "no_data", "mask". Paths must be absolutes :type sensor_image_left: dict :param sensor_image_right: right image Dict Must contain keys : "image", "texture", "geomodel", "no_data", "mask". Paths must be absolutes :type sensor_image_right: dict :param grid_left: left grid. Grid dict contains : - Attributes containing: "grid_spacing", "grid_origin", \ "epipolar_size_x", "epipolar_size_y", \ "epipolar_origin_x", "epipolar_origin_y", \ "epipolar_spacing_x", "epipolar_spacing", \ "disp_to_alt_ratio", "path" :type grid_left: dict :param grid_right: right grid. Grid dict contains : - Attributes containing: "grid_spacing", "grid_origin", \ "epipolar_size_x", "epipolar_size_y", \ "epipolar_origin_x", "epipolar_origin_y", \ "epipolar_spacing_x", "epipolar_spacing", \ "disp_to_alt_ratio", "path" :type grid_right: dict :param geometry_plugin: geometry plugin to use :type geometry_plugin: AbstractGeometry :param srtm_dir: srtm directory :type srtm_dir: str :param default_alt: default altitude :type default_alt: int :param disp_min: minimum disparity :type disp_min: int :param disp_max: maximum disparity :type disp_max: int :return: epsg :rtype: str """ sensor1 = sensor_image_left[sens_cst.INPUT_IMG] sensor2 = sensor_image_right[sens_cst.INPUT_IMG] geomodel1 = sensor_image_left[sens_cst.INPUT_GEO_MODEL] geomodel2 = sensor_image_right[sens_cst.INPUT_GEO_MODEL] # Get largest epipolar regions from configuration file largest_epipolar_region = [ 0, 0, grid_left["epipolar_size_x"], grid_left["epipolar_size_y"], ] # Numpy array with corners of largest epipolar region. # Order does not matter here, # since it will be passed to grids.compute_epipolar_grid_min_max corners = np.array( [ [ [largest_epipolar_region[0], largest_epipolar_region[1]], [largest_epipolar_region[0], largest_epipolar_region[3]], ], [ [largest_epipolar_region[2], largest_epipolar_region[3]], [largest_epipolar_region[2], largest_epipolar_region[1]], ], ], dtype=np.float64, ) # Compute epipolar image terrain position corners # for min and max disparity ( terrain_dispmin, _, ) = grids_algo.compute_epipolar_grid_min_max( geometry_plugin, corners, sensor1, sensor2, geomodel1, geomodel2, grid_left, grid_right, 4326, disp_min, disp_max, ) epsg = get_utm_zone_as_epsg_code(*np.nanmean(terrain_dispmin, axis=0)) logging.info("EPSG code: {}".format(epsg)) return epsg
[docs] def crop_terrain_bounds_with_roi(roi_poly, xmin, ymin, xmax, ymax): """ Crop current terrain bounds with roi :param roi_poly: Polygon of ROI :type roi_poly: Shapely Polygon :param xmin: xmin :type xmin: float :param ymin: ymin :type ymin: float :param xmax: xmax :type xmax: float :param ymax: ymax :type ymax: float :return: new xmin, ymin, xmax, ymax :rtype: (float, float, float, float) """ # terrain bounding box polygon terrain_poly = Polygon( [ (xmin, ymin), (xmax, ymin), (xmax, ymax), (xmin, ymax), (xmin, ymin), ] ) if not roi_poly.intersects(terrain_poly): raise RuntimeError("None of the input data intersect the requested ROI") # Show ROI if valid (no exception raised) : logging.info("Setting terrain bounding box to the requested ROI") new_xmin, new_ymin, new_xmax, new_ymax = roi_poly.bounds return new_xmin, new_ymin, new_xmax, new_ymax
@cars_profile(name="Compute terrain bounds") def compute_terrain_bounds(list_of_terrain_roi, roi_poly=None, resolution=0.5): """ Compute Terrain bounds of merged pairs :param list_of_terrain_roi: list of terrain roi list of (terrain bbox, terrain epi_tile_size) :type list_of_terrain_roi: list :param roi_poly: terrain roi of given roi :type roi_poly: Polygon :param resolution: list of terrain roi :type resolution: float :return: bounds, optimal_terrain_tile_width_average """ # get lists ( list_terrain_bounding_box, list_terrain_epi_tile_width, ) = zip( # noqa: B905 *list_of_terrain_roi ) list_terrain_bounding_box = list(list_terrain_bounding_box) list_terrain_epi_tile_width = list(list_terrain_epi_tile_width) xmin, ymin, xmax, ymax = tiling.union(list_terrain_bounding_box) if roi_poly is not None: (xmin, ymin, xmax, ymax) = crop_terrain_bounds_with_roi( roi_poly, xmin, ymin, xmax, ymax ) xmin, ymin, xmax, ymax = tiling.snap_to_grid( xmin, ymin, xmax, ymax, resolution ) logging.info( "Total terrain bounding box : [{}, {}] x [{}, {}]".format( xmin, xmax, ymin, ymax ) ) bounds = [xmin, ymin, xmax, ymax] # Compute optimal terrain tile width optimal_terrain_tile_width_average = np.nanmean(list_terrain_epi_tile_width) optimal_terrain_tile_width = ( int(math.ceil(optimal_terrain_tile_width_average / resolution)) * resolution ) logging.info( "Optimal terrain tile size: {}x{} pixels".format( int(optimal_terrain_tile_width / resolution), int(optimal_terrain_tile_width / resolution), ) ) return bounds, optimal_terrain_tile_width
[docs] def get_conversion_factor(bounds, epsg, epsg_cloud): """ Conmpute conversion factor :param bounds: terrain bounds :type bounds: list :param epsg: epsg of bounds :type epsg: int :param epsg_cloud: epsg of the input cloud :type epsg_cloud: int :return: conversion factor :rtype: float """ conversion_factor = 1 # only if epsg and epsg_cloud are different spatial_ref = CRS.from_epsg(epsg) spatial_ref_cloud = CRS.from_epsg(epsg_cloud) is_geographic = spatial_ref.is_geographic or spatial_ref_cloud.is_geographic if is_geographic and epsg != epsg_cloud: # Compute bounds and terrain grid [xmin, ymin, xmax, ymax] = bounds bounds_points = [ [xmin, ymin], [xmax, ymax], ] bounds_point_epsg_cloud = projection.point_cloud_conversion( bounds_points, epsg, epsg_cloud ) # Compute area in both epsg terrain_area_epsg = (xmax - xmin) * (ymax - ymin) terrain_area_epsg_cloud = ( bounds_point_epsg_cloud[1][0] - bounds_point_epsg_cloud[0][0] ) * (bounds_point_epsg_cloud[1][1] - bounds_point_epsg_cloud[0][1]) # Compute conversion factor conversion_factor = math.sqrt( terrain_area_epsg / terrain_area_epsg_cloud ) return conversion_factor
[docs] def convert_optimal_tile_size_with_epsg( bounds, optimal_terrain_tile_width, epsg, epsg_cloud ): """ Convert optimal_tile_size according to epsg. Only if epsg_cloud is different of the output epsg. :param bounds: terrain bounds :type bounds: list :param optimal_terrain_tile_width: initial optimal_terrain_tile_width :type optimal_terrain_tile_width: float :param epsg: target epsg :type epsg: int :param epsg_cloud: epsg of the input cloud :type epsg_cloud: int :return: converted optimal tile size :rtype: float """ # Convert optimal terrain tile width conversion_factor = get_conversion_factor(bounds, epsg, epsg_cloud) optimal_terrain_tile_width *= conversion_factor return optimal_terrain_tile_width
@cars_profile(name="Compute epipolar roi") def compute_epipolar_roi( # pylint: disable=too-many-positional-arguments terrain_roi_poly, terrain_roi_epsg, geometry_plugin, sensor_image_left, sensor_image_right, grid_left, grid_right, output_path, disp_min=0, disp_max=0, ): """ Compute epipolar roi to use :param terrain_roi_poly: terrain roi polygon :param terrain_roi_epsg: terrain roi epsg :param geometry_plugin: geometry plugin to use :param epsg: epsg :param disp_min: minimum disparity :param disp_max: maximum disparity :return: epipolar region to use, with tile_size a sample """ if terrain_roi_poly is None: return None roi_bbox = terrain_roi_poly.bounds pair_folder = os.path.join(output_path, "tmp") safe_makedirs(pair_folder) sensor1 = sensor_image_left[sens_cst.INPUT_IMG] sensor2 = sensor_image_right[sens_cst.INPUT_IMG] geomodel1 = sensor_image_left[sens_cst.INPUT_GEO_MODEL] geomodel2 = sensor_image_right[sens_cst.INPUT_GEO_MODEL] epipolar_roi = grids_algo.terrain_region_to_epipolar( roi_bbox, sensor1, sensor2, geomodel1, geomodel2, grid_left, grid_right, geometry_plugin, epsg=terrain_roi_epsg, disp_min=disp_min, disp_max=disp_max, tile_size=100, epipolar_size_x=grid_left["epipolar_size_x"], epipolar_size_y=grid_left["epipolar_size_y"], ) return epipolar_roi