Source code for cars.pipelines.parameters.dsm_inputs

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-lines
"""
CARS dsm inputs
"""

import logging

import numpy as np
from json_checker import Checker, Or

# CARS imports
import cars.pipelines.parameters.dsm_inputs_constants as dsm_cst
from cars.core import constants as cst
from cars.core import inputs
from cars.core.geometry.abstract_geometry import AbstractGeometry
from cars.core.utils import make_relative_path_absolute
from cars.pipelines.parameters import sensor_inputs as sens_inp
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst


[docs] def check_dsm_inputs(conf, config_dir=None): """ Check the inputs given :param conf: configuration of inputs :type conf: dict :param config_dir: directory of used json/yaml, if user filled paths with relative paths :type config_dir: str :return: overloader inputs :rtype: dict """ overloaded_conf = {} # Overload some optional parameters overloaded_conf[dsm_cst.DSMS] = {} overloaded_conf[sens_cst.ROI] = conf.get(sens_cst.ROI, None) overloaded_conf[sens_cst.INITIAL_ELEVATION] = ( sens_inp.get_initial_elevation( conf.get(sens_cst.INITIAL_ELEVATION, None) ) ) overloaded_conf[sens_cst.SENSORS] = conf.get(sens_cst.SENSORS, None) overloaded_conf[sens_cst.PAIRING] = conf.get(sens_cst.PAIRING, None) # Validate inputs inputs_schema = { dsm_cst.DSMS: dict, sens_cst.ROI: Or(str, dict, None), sens_cst.INITIAL_ELEVATION: Or(dict, None), sens_cst.SENSORS: Or(dict, None), sens_cst.PAIRING: Or([[str]], None), } checker_inputs = Checker(inputs_schema) checker_inputs.validate(overloaded_conf) # Validate depth maps dsm_schema = { cst.DSM_CLASSIF: Or(str, None), cst.DSM_ALT: Or(str, None), cst.DSM_ALT_INF: Or(str, None), cst.DSM_ALT_SUP: Or(str, None), cst.DSM_WEIGHTS_SUM: Or(str, None), cst.DSM_MSK: Or(str, None), cst.DSM_NB_PTS: Or(str, None), cst.DSM_NB_PTS_IN_CELL: Or(str, None), cst.DSM_MEAN: Or(str, None), cst.DSM_STD_DEV: Or(str, None), cst.DSM_INF_MEAN: Or(str, None), cst.DSM_INF_STD: Or(str, None), cst.DSM_SUP_MEAN: Or(str, None), cst.DSM_SUP_STD: Or(str, None), cst.DSM_AMBIGUITY: Or(str, None), cst.DSM_PERFORMANCE_MAP: Or(str, None), cst.DSM_SOURCE_PC: Or(str, None), cst.DSM_FILLING: Or(str, None), cst.DSM_COLOR: Or(str, None), } checker_pc = Checker(dsm_schema) for dsm_key in conf[dsm_cst.DSMS]: # Get depth maps with default overloaded_conf[dsm_cst.DSMS][dsm_key] = {} overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_ALT] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_CLASSIF] = conf[ dsm_cst.DSMS ][dsm_key].get("merging_classification", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_COLOR] = conf[ dsm_cst.DSMS ][dsm_key].get("image", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_MSK] = conf[ dsm_cst.DSMS ][dsm_key].get("mask", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_ALT_INF] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_inf", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_ALT_SUP] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_sup", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_WEIGHTS_SUM] = conf[ dsm_cst.DSMS ][dsm_key].get("weights", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_NB_PTS] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_n_pts", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_NB_PTS_IN_CELL] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_pts_in_cell", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_MEAN] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_mean", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_STD_DEV] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_std", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_INF_MEAN] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_inf_mean", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_INF_STD] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_inf_std", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_SUP_MEAN] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_sup_mean", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_SUP_STD] = conf[ dsm_cst.DSMS ][dsm_key].get("dsm_sup_std", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_AMBIGUITY] = conf[ dsm_cst.DSMS ][dsm_key].get("ambiguity", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_PERFORMANCE_MAP] = conf[ dsm_cst.DSMS ][dsm_key].get("performance_map", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_SOURCE_PC] = conf[ dsm_cst.DSMS ][dsm_key].get("contributing_pair", None) overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_FILLING] = conf[ dsm_cst.DSMS ][dsm_key].get("merging_filling", None) # validate checker_pc.validate(overloaded_conf[dsm_cst.DSMS][dsm_key]) # Modify to absolute path if config_dir is not None: modify_to_absolute_path(config_dir, overloaded_conf) else: logging.debug( "path of config file was not given," "relative path are not transformed to absolute paths" ) for dsm_key in conf[dsm_cst.DSMS]: # check sizes check_input_size( overloaded_conf[dsm_cst.DSMS][dsm_key][cst.INDEX_DSM_ALT], overloaded_conf[dsm_cst.DSMS][dsm_key][cst.DSM_CLASSIF], overloaded_conf[dsm_cst.DSMS][dsm_key][cst.INDEX_DSM_COLOR], overloaded_conf[dsm_cst.DSMS][dsm_key][cst.INDEX_DSM_MASK], ) # Check srtm dir sens_inp.check_srtm( overloaded_conf[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH] ) check_phasing(conf[dsm_cst.DSMS]) overloaded_conf[sens_cst.LOADERS] = sens_inp.check_loaders( conf.get(sens_cst.LOADERS, {}) ) classif_loader = overloaded_conf[sens_cst.LOADERS][ sens_cst.INPUT_CLASSIFICATION ] overloaded_conf[sens_cst.FILLING] = sens_inp.check_filling( conf.get(sens_cst.FILLING, {}), classif_loader ) if sens_cst.SENSORS in conf and conf[sens_cst.SENSORS] is not None: sens_inp.check_sensors(conf, overloaded_conf, config_dir) return overloaded_conf
[docs] def check_geometry_plugin(conf_inputs, conf_geom_plugin): """ Check the geometry plugin with inputs :param conf_geom_plugin: name of geometry plugin :type conf_geom_plugin: str :param conf_inputs: checked configuration of inputs :type conf_inputs: type :return: geometry plugin with dem """ if conf_geom_plugin is None: conf_geom_plugin = "SharelocGeometry" dem_path = conf_inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH] if dem_path is None: return conf_geom_plugin, None # Initialize a geometry plugin with elevation information geom_plugin_with_dem_and_geoid = ( AbstractGeometry( # pylint: disable=abstract-class-instantiated conf_geom_plugin, dem=dem_path, geoid=conf_inputs[sens_cst.INITIAL_ELEVATION][sens_cst.GEOID], default_alt=sens_cst.CARS_DEFAULT_ALT, ) ) return conf_geom_plugin, geom_plugin_with_dem_and_geoid
[docs] def check_input_size(dsm, classif, color, mask): """ Check dsm, mask, color, classif given Images must have same size :param dsm: phased dsm path :type dsm: str :param classif: classif path :type classif: str :param color: color path :type color: str :param mask: mask path :type mask: str """ if inputs.rasterio_get_nb_bands(dsm) != 1: raise RuntimeError("{} is not mono-band image".format(dsm)) for path in [mask, color, classif]: if path is not None: if inputs.rasterio_get_size(dsm) != inputs.rasterio_get_size(path): raise RuntimeError( "The image {} and {} " "do not have the same size".format(dsm, path) )
[docs] def modify_to_absolute_path(config_dir, overloaded_conf): """ Modify input file path to absolute path :param config_dir: directory of the json configuration :type config_dir: str :param overloaded_conf: overloaded configuration json :dict overloaded_conf: dict """ for dsm_key in overloaded_conf[dsm_cst.DSMS]: dsms = overloaded_conf[dsm_cst.DSMS][dsm_key] for tag in [ cst.INDEX_DSM_ALT, cst.DSM_CLASSIF, cst.INDEX_DSM_COLOR, cst.INDEX_DSM_MASK, cst.DSM_FILLING, ]: if dsms[tag] is not None: dsms[tag] = make_relative_path_absolute(dsms[tag], config_dir) if overloaded_conf[sens_cst.ROI] is not None: if isinstance(overloaded_conf[sens_cst.ROI], str): overloaded_conf[sens_cst.ROI] = make_relative_path_absolute( overloaded_conf[sens_cst.ROI], config_dir )
[docs] def check_phasing(dsm_dict): """ Check if the dsm are phased, and if resolution and epsg code are equivalent :param dsm_dict: list of phased dsm :type dsm_dict: dict """ ref_key = next(iter(dsm_dict)) ref_epsg = inputs.rasterio_get_epsg_code(dsm_dict[ref_key]["dsm"]) ref_profile = inputs.rasterio_get_profile(dsm_dict[ref_key]["dsm"]) ref_transform = list(ref_profile["transform"]) ref_res_x = ref_transform[0] ref_res_y = ref_transform[4] ref_bounds = inputs.rasterio_get_bounds(dsm_dict[ref_key]["dsm"]) for dsm_key in dsm_dict: if dsm_key == ref_key: continue epsg = inputs.rasterio_get_epsg_code(dsm_dict[dsm_key]["dsm"]) profile = inputs.rasterio_get_profile(dsm_dict[ref_key]["dsm"]) transform = list(profile["transform"]) res_x = transform[0] res_y = transform[4] bounds = inputs.rasterio_get_bounds(dsm_dict[dsm_key]["dsm"]) if epsg != ref_epsg: raise RuntimeError( f"EPSG mismatch: DSM {dsm_key} has EPSG {epsg}, " f"expected {ref_epsg}." ) if ref_res_x != res_x or ref_res_y != res_y: raise RuntimeError( f"Resolution mismatch: DSM {dsm_key} has resolution " f"{(res_x, res_y)}, expected {(ref_res_x, ref_res_y)}." ) # Compare the left_bottom corner diff = ref_bounds[0:2] - bounds[0:2] resolution = np.array([ref_res_x, -ref_res_y]) res_ratio = diff / resolution if ~np.all(np.equal(res_ratio, res_ratio.astype(int))) and ~np.all( np.equal(1 / res_ratio, (1 / res_ratio).astype(int)) ): raise RuntimeError(f"DSM {dsm_key} and {ref_key} are not phased")