#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the output definition
"""
import logging
import os
from json_checker import And, Checker, Or
from pyproj import CRS
import cars.core.constants as cst
from cars.core import preprocessing
from cars.core.utils import safe_makedirs
from cars.pipelines.parameters import output_constants
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
[docs]
def is_valid_epsg(epsg) -> bool:
"""
Check if the given EPSG code is valid using pyproj.
"""
if epsg is None:
return True
try:
# Try creating a CRS
CRS(f"EPSG:{epsg}")
return True
except Exception:
return False
[docs]
def check_output_parameters( # noqa: C901 : too complex
inputs,
conf,
scaling_coeff=None,
bounds=None,
):
"""
Check the output json configuration and fill in default values
:param conf: configuration of output
:type conf: dict
:param scaling_coeff: scaling factor for resolution
:type scaling_coeff: float
:param pipeline_name: name of corresponding pipeline
:type pipeline_name: str
"""
overloaded_conf = conf.copy()
out_dir = conf[output_constants.OUT_DIRECTORY]
out_dir = os.path.abspath(out_dir)
# Ensure that output directory and its subdirectories exist
safe_makedirs(out_dir)
# Overload some parameters
overloaded_conf[output_constants.OUT_DIRECTORY] = out_dir
overloaded_conf[output_constants.PRODUCT_LEVEL] = overloaded_conf.get(
output_constants.PRODUCT_LEVEL, "dsm"
)
if isinstance(overloaded_conf[output_constants.PRODUCT_LEVEL], str):
overloaded_conf[output_constants.PRODUCT_LEVEL] = [
overloaded_conf[output_constants.PRODUCT_LEVEL]
]
for level in overloaded_conf[output_constants.PRODUCT_LEVEL]:
if level not in ["dsm", "depth_map", "point_cloud"]:
raise RuntimeError("Unknown product level {}".format(level))
overloaded_conf[output_constants.OUT_GEOID] = overloaded_conf.get(
output_constants.OUT_GEOID, True
)
overloaded_conf[output_constants.EPSG] = overloaded_conf.get(
output_constants.EPSG, None
)
resolution = None
if overloaded_conf.get(output_constants.RESOLUTION, None) is not None:
resolution = overloaded_conf[output_constants.RESOLUTION]
overloaded_scaling_coeff = scaling_coeff
res_val = 0.5
if bounds is not None:
if overloaded_conf[output_constants.EPSG] is not None:
crs = CRS.from_epsg(overloaded_conf[output_constants.EPSG])
if crs.is_geographic:
xmin = bounds[0]
ymin = bounds[1]
utm_epsg = preprocessing.get_utm_zone_as_epsg_code(xmin, ymin)
conversion_factor = preprocessing.get_conversion_factor(
bounds, utm_epsg, crs.to_epsg()
)
res_val = 0.5 / conversion_factor # convert to degree
if scaling_coeff is not None:
if resolution is not None:
if resolution < res_val * scaling_coeff:
logging.warning(
"The requested DSM resolution of "
f"{overloaded_conf[output_constants.RESOLUTION]} seems "
"too low for the sensor images' resolution. "
"The pipeline will still continue with it."
)
else:
resolution = float(res_val * scaling_coeff)
logging.info(
"The resolution of the output DSM will be "
f"{resolution} meters. "
)
overloaded_conf[output_constants.RESOLUTION] = resolution
overloaded_conf[output_constants.SAVE_BY_PAIR] = overloaded_conf.get(
output_constants.SAVE_BY_PAIR, False
)
# Load auxiliary and subfields
overloaded_conf[output_constants.AUXILIARY] = overloaded_conf.get(
output_constants.AUXILIARY, {}
)
# Load auxiliary and subfields
overloaded_conf[output_constants.AUXILIARY][output_constants.AUX_IMAGE] = (
overloaded_conf[output_constants.AUXILIARY].get(
output_constants.AUX_IMAGE, True
)
)
for key in [
output_constants.AUX_WEIGHTS,
output_constants.AUX_CLASSIFICATION,
output_constants.AUX_PERFORMANCE_MAP,
output_constants.AUX_CONTRIBUTING_PAIR,
output_constants.AUX_FILLING,
output_constants.AUX_AMBIGUITY,
output_constants.AUX_EDGES,
]:
overloaded_conf[output_constants.AUXILIARY][key] = overloaded_conf[
output_constants.AUXILIARY
].get(key, False)
# Check schema
output_schema = {
output_constants.OUT_DIRECTORY: str,
output_constants.PRODUCT_LEVEL: list,
output_constants.OUT_GEOID: Or(bool, str),
output_constants.EPSG: And(Or(int, str, None), is_valid_epsg),
output_constants.RESOLUTION: Or(int, float, None),
output_constants.SAVE_BY_PAIR: bool,
output_constants.AUXILIARY: dict,
}
checker_output = Checker(output_schema)
checker_output.validate(overloaded_conf)
# check auxiliary keys
auxiliary_schema = {
output_constants.AUX_IMAGE: Or(bool, str, list),
output_constants.AUX_WEIGHTS: bool,
output_constants.AUX_CLASSIFICATION: Or(bool, dict, list),
output_constants.AUX_PERFORMANCE_MAP: Or(bool, list),
output_constants.AUX_CONTRIBUTING_PAIR: bool,
output_constants.AUX_FILLING: Or(bool, dict),
output_constants.AUX_AMBIGUITY: bool,
output_constants.AUX_EDGES: bool,
}
# Check and overload classification parameter
check_classification_parameter(inputs, overloaded_conf)
# Check and overload image parameter
check_texture_bands(inputs, overloaded_conf)
# Check and overload performance_map parameter
check_performance_classes(overloaded_conf)
# Check and overload filling parameter
check_filling_parameter(overloaded_conf)
checker_auxiliary = Checker(auxiliary_schema)
checker_auxiliary.validate(overloaded_conf[output_constants.AUXILIARY])
if "epsg" in overloaded_conf and overloaded_conf["epsg"]:
spatial_ref = CRS.from_epsg(overloaded_conf["epsg"])
if spatial_ref.is_geographic:
if overloaded_conf[output_constants.RESOLUTION] is not None:
if overloaded_conf[output_constants.RESOLUTION] > 10e-3:
logging.warning(
"The resolution of the "
+ "point_cloud_rasterization should be "
+ "fixed according to the epsg"
)
return overloaded_conf, overloaded_scaling_coeff
[docs]
def check_filling_parameter(overloaded_conf):
"""
Check and overload filling parameter
"""
filling_param = overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_FILLING
]
valid_names = [
"no_edition",
"fill_with_exogenous_dem",
"interpolation",
"fill_with_endogenous_dem",
"interpolate_from_borders",
"fill_with_geoid",
"other",
]
if isinstance(filling_param, dict):
for _, value in filling_param.items():
if isinstance(value, str):
value = [value]
if any(elem not in valid_names for elem in value):
raise RuntimeError(
"Those filling methods are not available in CARS"
)
result = {}
for i, name in enumerate(valid_names):
if name not in filling_param:
result[i + 1] = name
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_FILLING
] = result
elif filling_param is True:
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_FILLING
] = {i + 1: name for i, name in enumerate(valid_names)}
[docs]
def check_texture_bands(inputs, overloaded_conf):
"""
Check and overload texture bands
"""
texture_bands = overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
]
if inputs[sens_cst.SENSORS] is not None:
first_key = list(inputs[sens_cst.SENSORS].keys())[0]
image = inputs[sens_cst.SENSORS][first_key][sens_cst.INPUT_IMG]
bands = set(image["bands"].keys())
sensor_type = image.get(sens_cst.SENSOR_TYPE, None)
if isinstance(texture_bands, list):
for elem in texture_bands:
if not isinstance(elem, str):
raise RuntimeError(
"The image parameter of auxiliary should "
"be a boolean, a string or a list of string"
)
if elem not in bands:
raise RuntimeError(
f"The band {elem} is "
f"not an existing band of "
f"the input image"
)
elif isinstance(texture_bands, str):
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
] = [texture_bands]
if texture_bands not in bands:
raise RuntimeError(
f"The band {texture_bands} is "
f"not an existing band of "
f"the input image"
)
elif texture_bands is True:
if sensor_type == "PHR":
if len(bands) == 1:
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
] = ["b0"]
elif len(bands) >= 4: # RGB or RGBN
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
] = sorted(bands)[1:]
else:
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
] = sorted(bands)
elif sensor_type == "CO3D":
if len(bands) == 3: # RGB
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
] = [
"b1",
"b0",
"b2",
] # RGB
else:
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
] = sorted(bands)
else:
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_IMAGE
] = sorted(bands)
[docs]
def check_classification_parameter(inputs, overloaded_conf):
"""
Check and overload classification parameter
"""
classification_formatting = overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_CLASSIFICATION
]
if inputs[sens_cst.SENSORS] is not None:
first_key = list(inputs[sens_cst.SENSORS].keys())[0]
if (
"classification" in inputs[sens_cst.SENSORS][first_key]
and inputs[sens_cst.SENSORS][first_key]["classification"]
is not None
):
classif = inputs[sens_cst.SENSORS][first_key][
sens_cst.INPUT_CLASSIFICATION
]
bands_classif = classif["values"]
if isinstance(classification_formatting, list):
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_CLASSIFICATION
] = {val: val for val in classification_formatting}
for elem in classification_formatting:
if not isinstance(elem, int):
raise RuntimeError(
"The image parameter of auxiliary should "
"be a boolean, a string or a list of int"
)
if elem not in bands_classif:
raise RuntimeError(
f"If you want to use {elem} as a band num, "
f"you should use a dictionary, not a list"
)
elif classification_formatting is True:
overloaded_conf[output_constants.AUXILIARY][
output_constants.AUX_CLASSIFICATION
] = {int(name): name for val, name in enumerate(bands_classif)}
elif isinstance(classification_formatting, dict):
for _, value in classification_formatting.items():
if isinstance(value, int):
value = [value]
if any(elem not in bands_classif for elem in value):
raise RuntimeError(
f"The band {value} is "
f"not an existing band of "
f"the input classification"
)
[docs]
def intialize_product_index(orchestrator, product_levels, input_pairs):
"""
Initialize the index dictionary according to requested levels with None
values for all paths.
:param orchestrator: cars orchestrator
:type orchestrator: Orchestrator
:param product_levels: name of corresponding pipeline
:type product_levels: list
:param input_pairs: list containing the pair names
:type input_pairs: list
"""
index = {}
if "dsm" in product_levels:
index["dsm"] = {
cst.INDEX_DSM_ALT: None,
cst.INDEX_DSM_COLOR: None,
cst.INDEX_DSM_MASK: None,
cst.INDEX_DSM_CLASSIFICATION: None,
cst.INDEX_DSM_PERFORMANCE_MAP: None,
cst.INDEX_DSM_CONTRIBUTING_PAIR: None,
cst.INDEX_DSM_FILLING: None,
cst.INDEX_DSM_WEIGHTS: None,
}
if "point_cloud" in product_levels:
# Initialize an empty index for point cloud because its content is
# unknown at this point (tile name, save by pair or not)
index["point_cloud"] = {}
if "depth_map" in product_levels:
index["depth_map"] = {}
for pair in input_pairs:
index["depth_map"][pair] = {
cst.INDEX_DEPTH_MAP_X: None,
cst.INDEX_DEPTH_MAP_Y: None,
cst.INDEX_DEPTH_MAP_Z: None,
cst.INDEX_DEPTH_MAP_COLOR: None,
cst.INDEX_DEPTH_MAP_MASK: None,
cst.INDEX_DEPTH_MAP_CLASSIFICATION: None,
cst.INDEX_DEPTH_MAP_PERFORMANCE_MAP: None,
cst.INDEX_DEPTH_MAP_FILLING: None,
cst.INDEX_DEPTH_MAP_EPSG: None,
}
if orchestrator is not None:
orchestrator.update_index(index)
return index