#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
CARS containing inputs checking for sensor input data
Used for full_res and low_res pipelines
"""
import logging
import os
import rasterio as rio
from json_checker import Checker, OptionalKey, Or
# CARS imports
from cars.core import inputs, preprocessing, roi_tools
from cars.core.geometry.abstract_geometry import AbstractGeometry
from cars.core.utils import make_relative_path_absolute
from cars.pipelines.parameters import (
depth_map_inputs_constants as depth_map_cst,
)
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
CARS_GEOID_PATH = "geoid/egm96.grd" # Path in cars package (pkg)
[docs]def check_sensors(conf, overloaded_conf, config_json_dir=None):
"""
Check sensors
"""
# Validate each sensor image
sensor_schema = {
sens_cst.INPUT_IMG: str,
sens_cst.INPUT_COLOR: str,
sens_cst.INPUT_NODATA: int,
sens_cst.INPUT_GEO_MODEL: Or(str, dict),
sens_cst.INPUT_MSK: Or(str, None),
sens_cst.INPUT_CLASSIFICATION: Or(str, None),
}
checker_sensor = Checker(sensor_schema)
for sensor_image_key in conf[sens_cst.SENSORS]:
# Case where the sensor is defined as a string refering to the input
# image instead of a dict
if isinstance(conf[sens_cst.SENSORS][sensor_image_key], str):
# initialize sensor dictionary
overloaded_conf[sens_cst.SENSORS][sensor_image_key] = {
sens_cst.INPUT_IMG: conf[sens_cst.SENSORS][sensor_image_key]
}
# Overload optional parameters
geomodel = overloaded_conf[sens_cst.SENSORS][sensor_image_key].get(
"geomodel",
overloaded_conf[sens_cst.SENSORS][sensor_image_key][
sens_cst.INPUT_IMG
],
)
overloaded_conf[sens_cst.SENSORS][sensor_image_key][
"geomodel"
] = geomodel
color = overloaded_conf[sens_cst.SENSORS][sensor_image_key].get(
"color",
overloaded_conf[sens_cst.SENSORS][sensor_image_key][
sens_cst.INPUT_IMG
],
)
overloaded_conf[sens_cst.SENSORS][sensor_image_key][
sens_cst.INPUT_COLOR
] = color
no_data = overloaded_conf[sens_cst.SENSORS][sensor_image_key].get(
sens_cst.INPUT_NODATA, 0
)
overloaded_conf[sens_cst.SENSORS][sensor_image_key][
sens_cst.INPUT_NODATA
] = no_data
mask = overloaded_conf[sens_cst.SENSORS][sensor_image_key].get(
sens_cst.INPUT_MSK, None
)
overloaded_conf[sens_cst.SENSORS][sensor_image_key][
sens_cst.INPUT_MSK
] = mask
classif = overloaded_conf[sens_cst.SENSORS][sensor_image_key].get(
sens_cst.INPUT_CLASSIFICATION, None
)
overloaded_conf[sens_cst.SENSORS][sensor_image_key][
sens_cst.INPUT_CLASSIFICATION
] = classif
# Validate
checker_sensor.validate(
overloaded_conf[sens_cst.SENSORS][sensor_image_key]
)
# Modify to absolute path
if config_json_dir is not None:
modify_to_absolute_path(config_json_dir, overloaded_conf)
# Check image, msk and color size compatibility
for sensor_image_key in overloaded_conf[sens_cst.SENSORS]:
sensor_image = overloaded_conf[sens_cst.SENSORS][sensor_image_key]
check_input_size(
sensor_image[sens_cst.INPUT_IMG],
sensor_image[sens_cst.INPUT_MSK],
sensor_image[sens_cst.INPUT_COLOR],
sensor_image[sens_cst.INPUT_CLASSIFICATION],
)
# check band nbits of msk and classification
check_nbits(
sensor_image[sens_cst.INPUT_MSK],
sensor_image[sens_cst.INPUT_CLASSIFICATION],
)
# check image and color data consistency
check_input_data(
sensor_image[sens_cst.INPUT_IMG],
sensor_image[sens_cst.INPUT_COLOR],
)
# Validate pairs
# If there is two inputs with no associated pairing, consider that the first
# image is left and the second image is right
if (
overloaded_conf[sens_cst.PAIRING] is None
and len(overloaded_conf[sens_cst.SENSORS]) == 2
):
sensor_keys = list(overloaded_conf[sens_cst.SENSORS].keys())
overloaded_conf[sens_cst.PAIRING] = [[sensor_keys[0], sensor_keys[1]]]
logging.info(
(
"Pairing is not defined, '{}' will be used as left sensor and "
+ "'{}' will be used as right sensor"
).format(sensor_keys[0], sensor_keys[1])
)
if overloaded_conf[sens_cst.PAIRING] is None:
raise RuntimeError(
"Pairing is not defined and cannot be determined "
+ "because there are more than two inputs products"
)
for key1, key2 in overloaded_conf[sens_cst.PAIRING]:
if key1 not in overloaded_conf[sens_cst.SENSORS]:
logging.error("{} not in sensors images".format(key1))
raise RuntimeError("{} not in sensors images".format(key1))
if key2 not in overloaded_conf["sensors"]:
logging.error("{} not in sensors images".format(key2))
raise RuntimeError("{} not in sensors images".format(key2))
# Modify to absolute path
if config_json_dir is not None:
modify_to_absolute_path(config_json_dir, overloaded_conf)
else:
logging.debug(
"path of config file was not given,"
"relative path are not transformed to absolute paths"
)
# check datat type of pairs images
for key1, key2 in overloaded_conf[sens_cst.PAIRING]:
compare_image_type(
overloaded_conf[sens_cst.SENSORS], sens_cst.INPUT_IMG, key1, key2
)
return overloaded_conf
[docs]def check_geometry_plugin(conf_inputs, conf_geom_plugin):
"""
Check the geometry plugin with inputs
:param conf_inputs: checked configuration of inputs
:type conf_inputs: type
:param conf_advanced: checked configuration of advanced
:type conf_advanced: type
:param conf_geom_plugin: name of geometry plugin
:type conf_geom_plugin: str
:return: overload inputs conf
overloaded geometry plugin conf
geometry plugin without dem
geometry plugin with dem
"""
if conf_geom_plugin is None:
conf_geom_plugin = "SharelocGeometry"
# Initialize the desired geometry plugin without elevation information
geom_plugin_without_dem_and_geoid = (
AbstractGeometry( # pylint: disable=abstract-class-instantiated
conf_geom_plugin,
default_alt=sens_cst.CARS_DEFAULT_ALT,
)
)
# Check products consistency with this plugin
overloaded_conf_inputs = conf_inputs.copy()
for sensor_key, sensor_image in conf_inputs[sens_cst.SENSORS].items():
sensor = sensor_image[sens_cst.INPUT_IMG]
geomodel = sensor_image[sens_cst.INPUT_GEO_MODEL]
(
sensor,
geomodel,
) = geom_plugin_without_dem_and_geoid.check_product_consistency(
sensor, geomodel
)
overloaded_conf_inputs[sens_cst.SENSORS][sensor_key][
sens_cst.INPUT_IMG
] = sensor
overloaded_conf_inputs[sens_cst.SENSORS][sensor_key][
sens_cst.INPUT_GEO_MODEL
] = geomodel
geom_plugin_with_dem_and_geoid = generate_geometry_plugin_with_dem(
conf_geom_plugin, conf_inputs
)
# Check dem is big enough
dem_generation_roi_poly = None
needed_dem_roi = geom_plugin_with_dem_and_geoid.dem_roi
needed_dem_roi_epsg = geom_plugin_with_dem_and_geoid.dem_roi_epsg
if needed_dem_roi is not None:
needed_dem_roi_poly = roi_tools.bounds_to_poly(needed_dem_roi)
# convert to 4326 roi
dem_generation_roi_poly = preprocessing.compute_roi_poly(
needed_dem_roi_poly, needed_dem_roi_epsg, 4326
)
if (
conf_inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH]
is not None
):
# get dem total roi
total_input_roi_poly = roi_tools.bounds_to_poly(
inputs.rasterio_get_bounds(
conf_inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH]
)
)
total_input_roi_epsg = inputs.rasterio_get_epsg_code(
conf_inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH]
)
total_input_roi_poly = preprocessing.compute_roi_poly(
total_input_roi_poly, total_input_roi_epsg, 4326
)
# if needed roi not inside dem, raise error
if not total_input_roi_poly.contains_properly(
dem_generation_roi_poly
):
base_message = (
"Given initial elevation ROI is not covering needed ROI: "
" EPSG:4326, ROI: {}".format(dem_generation_roi_poly.bounds)
)
if total_input_roi_poly.intersects(dem_generation_roi_poly):
logging.warning(
"{}. Only a part of it intersects. "
"Errors might occur".format(base_message)
)
else:
# Exit, Error is certain to occur
raise RuntimeError(base_message)
else:
logging.warning(
"Current geometry plugin doesnt compute dem roi needed "
"for later computations. Errors related to unsufficient "
"dem roi might occur."
)
return (
overloaded_conf_inputs,
conf_geom_plugin,
geom_plugin_without_dem_and_geoid,
geom_plugin_with_dem_and_geoid,
dem_generation_roi_poly,
)
[docs]def generate_geometry_plugin_with_dem(
conf_geom_plugin, conf_inputs, dem=None, crop_dem=True
):
"""
Generate geometry plugin with dem and geoid
:param conf_geom_plugin: plugin configuration
:param conf_inputs: inputs configuration
:param dem: dem to overide the one in inputs
:return: geometry plugin object, with a dem
"""
dem_path = (
dem
if dem is not None
else conf_inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH]
)
if crop_dem:
# Get image pairs for DEM intersection with ROI
pairs_for_roi = []
for key1, key2 in conf_inputs[sens_cst.PAIRING]:
sensor1 = conf_inputs[sens_cst.SENSORS][key1]
sensor2 = conf_inputs[sens_cst.SENSORS][key2]
image1 = sensor1[sens_cst.INPUT_IMG]
image2 = sensor2[sens_cst.INPUT_IMG]
geomodel1 = sensor1[sens_cst.INPUT_GEO_MODEL]
geomodel2 = sensor2[sens_cst.INPUT_GEO_MODEL]
pairs_for_roi.append((image1, geomodel1, image2, geomodel2))
else:
pairs_for_roi = None
# Initialize a second geometry plugin with elevation information
geom_plugin_with_dem_and_geoid = (
AbstractGeometry( # pylint: disable=abstract-class-instantiated
conf_geom_plugin,
dem=dem_path,
geoid=conf_inputs[sens_cst.INITIAL_ELEVATION][sens_cst.GEOID],
default_alt=sens_cst.CARS_DEFAULT_ALT,
pairs_for_roi=pairs_for_roi,
)
)
return geom_plugin_with_dem_and_geoid
[docs]def modify_to_absolute_path(config_json_dir, overloaded_conf):
"""
Modify input file path to absolute path
:param config_json_dir: directory of the json configuration
:type config_json_dir: str
:param overloaded_conf: overloaded configuration json
:dict overloaded_conf: dict
"""
for sensor_image_key in overloaded_conf[sens_cst.SENSORS]:
sensor_image = overloaded_conf[sens_cst.SENSORS][sensor_image_key]
for tag in [
sens_cst.INPUT_IMG,
sens_cst.INPUT_MSK,
sens_cst.INPUT_GEO_MODEL,
sens_cst.INPUT_COLOR,
sens_cst.INPUT_CLASSIFICATION,
]:
if isinstance(sensor_image[tag], dict):
sensor_image[tag]["path"] = make_relative_path_absolute(
sensor_image[tag]["path"], config_json_dir
)
elif sensor_image[tag] is not None:
sensor_image[tag] = make_relative_path_absolute(
sensor_image[tag], config_json_dir
)
if overloaded_conf[sens_cst.ROI] is not None:
if isinstance(overloaded_conf[sens_cst.ROI], str):
overloaded_conf[sens_cst.ROI] = make_relative_path_absolute(
overloaded_conf[sens_cst.ROI], config_json_dir
)
for tag in [sens_cst.DEM_PATH, sens_cst.GEOID]:
if overloaded_conf[sens_cst.INITIAL_ELEVATION][tag] is not None:
if isinstance(
overloaded_conf[sens_cst.INITIAL_ELEVATION][tag], str
):
overloaded_conf[sens_cst.INITIAL_ELEVATION][tag] = (
make_relative_path_absolute(
overloaded_conf[sens_cst.INITIAL_ELEVATION][tag],
config_json_dir,
)
)
[docs]def check_srtm(srtm_dir):
"""
Check srtm data
:param srtm_dir: directory of srtm
:type srtm_dir: str
"""
if srtm_dir is not None:
if os.path.isdir(srtm_dir):
srtm_tiles = os.listdir(srtm_dir)
if len(srtm_tiles) == 0:
logging.warning(
"SRTM directory is empty, "
"the default altitude will be used as reference altitude."
)
else:
logging.info(
"Indicated SRTM tiles valid regions "
"will be used as reference altitudes "
"(the default altitude is used "
"for undefined regions of the SRTM)"
)
else:
# TODO add check for single file
pass
else:
logging.info("The default altitude will be used as reference altitude.")
[docs]def get_initial_elevation(config):
"""
Return initial elevation parameters (dem and geoid paths)
from input configuration.
:param config: input initial elevation
:type config: str, dict or None
"""
# Case 1 config is already a dict
if isinstance(config, dict):
updated_config = config
else:
updated_config = {}
updated_config[sens_cst.DEM_PATH] = (
config if isinstance(config, str) else None
)
# Add geoid path to the initial_elevation dict
if sens_cst.GEOID not in updated_config:
# use cars geoid
logging.info("CARS will use its own internal file as geoid reference")
# Get root package directory
package_path = os.path.dirname(__file__)
geoid_path = os.path.join(
package_path, "..", "..", "conf", CARS_GEOID_PATH
)
updated_config[sens_cst.GEOID] = geoid_path
return updated_config
[docs]def check_nbits(mask, classif):
"""
Check the bits number of the mask, classif
mask and classification are limited to 1 bits per band
:param mask: mask path
:type mask: str
:param classif: classif path
:type classif: str
"""
if mask is not None:
nbits = inputs.rasterio_get_nbits(mask)
if not check_all_nbits_equal_one(nbits):
raise RuntimeError(
"The mask {} have {} nbits per band. ".format(mask, nbits)
+ "Only the mask with nbits=1 is supported! "
)
if classif is not None:
nbits = inputs.rasterio_get_nbits(classif)
if not check_all_nbits_equal_one(nbits):
raise RuntimeError(
"The classification {} have {} nbits per band. ".format(
classif, nbits
)
+ "Only the classification with nbits=1 is supported! "
)
[docs]def compare_image_type(imgs, image_type, key1, key2):
"""
Compare the data type between a pair of images
:param imgs: list of image paths
:type imgs: str
:param key1: key of the images pair
:type key1: str
:param image_type: type of cardataset image (IMG, MASK, CLASSIF...)
:type image_type: int
:param key1: other key of the images pair
:type key1: str
"""
dtype1 = inputs.rasterio_get_image_type(imgs[key1][image_type])
dtype2 = inputs.rasterio_get_image_type(imgs[key2][image_type])
if dtype1 != dtype2:
raise RuntimeError(
"The pair images haven't the same data type."
+ "\nSensor[{}]: {}".format(key1, dtype1)
+ "; Sensor[{}]: {}".format(key2, dtype2)
)
[docs]def check_all_nbits_equal_one(nbits):
"""
Check if all the nbits = 1
:param nbits: list of the nbits
:return: True if all the nbits = 1
"""
if len(nbits) > 0 and nbits[0] == 1 and all(x == nbits[0] for x in nbits):
return True
return False