Source code for cars.applications.grid_generation.epipolar_grid_generation_app

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the epipolar grid generation application class.
"""

# Standard imports
import logging
import os

import numpy as np

# Third party imports
from json_checker import And, Checker

import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.grid_generation import (
    grid_generation_algo,
)
from cars.applications.grid_generation import (
    grid_generation_constants as grid_constants,
)
from cars.applications.grid_generation.abstract_grid_generation_app import (
    GridGeneration,
)
from cars.core import projection

# CARS imports
from cars.core.utils import safe_makedirs
from cars.orchestrator.cluster.log_wrapper import cars_profile
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst


[docs] class EpipolarGridGeneration(GridGeneration, short_name="epipolar"): """ EpipolarGridGeneration """ def __init__(self, conf=None): """ Init function of EpipolarGridGeneration :param conf: configuration for grid generation :return: a application_to_use object """ super().__init__(conf=conf) # check conf self.used_method = self.used_config["method"] self.epi_step = self.used_config["epi_step"] # Saving files self.save_intermediate_data = self.used_config["save_intermediate_data"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # Init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "epipolar") overloaded_conf["epi_step"] = conf.get("epi_step", 30) overloaded_conf["save_intermediate_data"] = conf.get( "save_intermediate_data", False ) grid_generation_schema = { "method": str, "epi_step": And(int, lambda x: x > 0), "save_intermediate_data": bool, } # Check conf checker = Checker(grid_generation_schema) checker.validate(overloaded_conf) return overloaded_conf
[docs] def get_save_grids(self): """ Get whether the grid will be saved :return: true is grid saving is activated :rtype: bool """ return self.save_intermediate_data
@cars_profile(name="Epi Grid Generation") def run( # pylint: disable=too-many-positional-arguments self, image_left, image_right, geometry_plugin, orchestrator=None, pair_folder=None, pair_key="PAIR_0", resolution=1, ): """ Run EpipolarGridGeneration application Create left and right grid files, corresponding to left and right epipolar grids. The returned dicts contain all the attributes of the grid & their path :param image_left: left image. Dict Must contain keys : \ "image", "texture", "geomodel","no_data", "mask". Paths must be absolutes :type image_left: dict :param image_right: right image. Dict Must contain keys :\ "image", "texture", "geomodel","no_data", "mask". Paths must be absolutes :type image_right: dict :param geometry_plugin: geometry plugin to use :type geometry_plugin: AbstractGeometry :param pair_folder: folder used for current pair :type pair_folder: str :param orchestrator: orchestrator used :param pair_key: pair configuration id :type pair_key: str :return: left grid, right grid. Each grid dict contains : - Attributes containing: "grid_spacing", "grid_origin", \ "epipolar_size_x", epipolar_size_y", "epipolar_origin_x", \ "epipolar_origin_y","epipolar_spacing_x", \ "epipolar_spacing", "disp_to_alt_ratio", "path" :rtype: Tuple(dict, dict) """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator sensor1 = image_left[sens_cst.INPUT_IMG]["bands"]["b0"]["path"] sensor2 = image_right[sens_cst.INPUT_IMG]["bands"]["b0"]["path"] geomodel1 = image_left[sens_cst.INPUT_GEO_MODEL] geomodel2 = image_right[sens_cst.INPUT_GEO_MODEL] # Get satellites angles from ground: Azimuth to north, Elevation angle try: ( left_az, left_elev_angle, right_az, right_elev_angle, convergence_angle, ) = projection.get_ground_angles( sensor1, sensor2, geomodel1, geomodel2, geometry_plugin ) logging.info( "Left satellite acquisition angles: " "Azimuth angle: {:.1f} degrees, " "Elevation angle: {:.1f} degrees".format( left_az, left_elev_angle ) ) logging.info( "Right satellite acquisition angles: " "Azimuth angle: {:.1f} degrees, " "Elevation angle: {:.1f} degrees".format( right_az, right_elev_angle ) ) logging.info( "Stereo satellite convergence angle from ground: " "{:.1f} degrees".format(convergence_angle) ) except Exception as exc: logging.error( "Error in Angles information retrieval: {}".format(exc) ) ( left_az, left_elev_angle, right_az, right_elev_angle, convergence_angle, ) = ( None, None, None, None, None, ) # Generate rectification grids ( grid1, grid2, grid_origin, grid_spacing, epipolar_size, disp_to_alt_ratio, ) = grid_generation_algo.generate_epipolar_grids( sensor1, sensor2, geomodel1, geomodel2, geometry_plugin, self.epi_step, ) # Create CarsDataset grid_left = { "grid_spacing": grid_spacing, "grid_origin": grid_origin, "epipolar_size_x": epipolar_size[0], "epipolar_size_y": epipolar_size[1], "epipolar_origin_x": grid_origin[0], "epipolar_origin_y": grid_origin[1], "epipolar_spacing_x": grid_spacing[0], "epipolar_spacing": grid_spacing[1], "disp_to_alt_ratio": disp_to_alt_ratio, "epipolar_step": self.epi_step, "path": None, } grid_right = grid_left.copy() grid_origin = grid_left["grid_origin"] grid_spacing = grid_left["grid_spacing"] if self.save_intermediate_data: left_grid_path = os.path.join(pair_folder, "left_epi_grid.tif") right_grid_path = os.path.join(pair_folder, "right_epi_grid.tif") safe_makedirs(pair_folder) else: if pair_folder is None: tmp_folder = os.path.join(self.orchestrator.out_dir, "tmp") else: tmp_folder = os.path.join(pair_folder, "tmp") safe_makedirs(tmp_folder) self.orchestrator.add_to_clean(tmp_folder) left_grid_path = os.path.join(tmp_folder, "left_epi_grid.tif") right_grid_path = os.path.join(tmp_folder, "right_epi_grid.tif") grid_generation_algo.write_grid( grid1, left_grid_path, grid_origin, grid_spacing ) grid_generation_algo.write_grid( grid2, right_grid_path, grid_origin, grid_spacing ) grid_left["path"] = left_grid_path grid_right["path"] = right_grid_path # Add infos to orchestrator.out_json updating_dict = { application_constants.APPLICATION_TAG: { grid_constants.GRID_GENERATION_RUN_TAG: { pair_key: { grid_constants.EPIPOLAR_SIZE_X_TAG: epipolar_size[0], grid_constants.EPIPOLAR_SIZE_Y_TAG: epipolar_size[1], grid_constants.EPIPOLAR_ORIGIN_X_TAG: grid_origin[0], grid_constants.EPIPOLAR_ORIGIN_Y_TAG: grid_origin[1], grid_constants.EPIPOLAR_SPACING_X_TAG: grid_spacing[0], grid_constants.EPIPOLAR_SPACING_Y_TAG: grid_spacing[1], grid_constants.DISP_TO_ALT_RATIO_TAG: disp_to_alt_ratio, grid_constants.LEFT_AZIMUTH_ANGLE_TAG: left_az, grid_constants.LEFT_ELEVATION_ANGLE_TAG: ( left_elev_angle ), grid_constants.RIGHT_AZIMUTH_ANGLE_TAG: right_az, grid_constants.RIGHT_ELEVATION_ANGLE_TAG: ( right_elev_angle ), grid_constants.CONVERGENCE_ANGLE_TAG: convergence_angle, }, } } } self.orchestrator.update_out_info(updating_dict) logging.info( "Size of epipolar images: {}x{} pixels".format( epipolar_size[0], epipolar_size[1] ) ) logging.info( "Disparity to altitude factor for pixel at 1/{} " "sampling : {} m/pixel".format( resolution, np.round(disp_to_alt_ratio, 2) ) ) return grid_left, grid_right