#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2025 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
CARS merging pipeline class file
"""
import os
from json_checker import Checker, OptionalKey, Or
from cars.applications.application import Application
from cars.core import preprocessing, roi_tools
from cars.core.inputs import rasterio_get_epsg
from cars.core.utils import safe_makedirs
from cars.orchestrator import orchestrator
from cars.pipelines.parameters import advanced_parameters_constants as adv_cst
from cars.pipelines.parameters import dsm_inputs
from cars.pipelines.parameters import dsm_inputs_constants as dsm_cst
from cars.pipelines.parameters import output_constants as out_cst
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
from cars.pipelines.pipeline import Pipeline
from cars.pipelines.pipeline_constants import (
ADVANCED,
APPLICATIONS,
INPUT,
ORCHESTRATOR,
OUTPUT,
)
from cars.pipelines.pipeline_template import PipelineTemplate
PIPELINE = "merging"
[docs]
@Pipeline.register(
PIPELINE,
)
class MergingPipeline(PipelineTemplate):
"""
Merging pipeline
"""
def __init__(self, conf, config_dir=None):
"""
Creates pipeline
Directly creates class attributes:
used_conf
:param pipeline_name: name of the pipeline.
:type pipeline_name: str
:param cfg: configuration {'matching_cost_method': value}
:type cfg: dictionary
:param config_dir: path to dir containing json/yaml
:type config_dir: str
"""
# Used conf
self.used_conf = {}
# metadata
self.metadata = None
# Transform relative path to absolute path
if config_dir is not None:
config_dir = os.path.abspath(config_dir)
# Check global conf
self.check_global_schema(conf)
if PIPELINE in conf:
self.check_pipeline_conf(conf)
# Check conf orchestrator
self.used_conf[ORCHESTRATOR] = self.check_orchestrator(
conf.get(ORCHESTRATOR, None)
)
# Check conf inputs
inputs = self.check_inputs(conf[INPUT], config_dir=config_dir)
self.used_conf[INPUT] = inputs
# Check advanced parameters
pipeline_conf = conf.get(PIPELINE, {})
advanced = self.check_advanced_parameters(
pipeline_conf.get(ADVANCED, {})
)
self.used_conf[ADVANCED] = advanced
# Check conf output
output = self.check_output(conf[OUTPUT])
self.used_conf[OUTPUT] = output
self.out_dir = self.used_conf[OUTPUT][out_cst.OUT_DIRECTORY]
self.dump_dir = os.path.join(self.out_dir, "dump_dir")
self.save_all_intermediate_data = self.used_conf[ADVANCED][
adv_cst.SAVE_INTERMEDIATE_DATA
]
# Check conf application
application_conf = self.check_applications(
pipeline_conf.get(APPLICATIONS, {})
)
self.used_conf[APPLICATIONS] = application_conf
self.out_dir = self.used_conf[OUTPUT][out_cst.OUT_DIRECTORY]
[docs]
def check_pipeline_conf(self, conf):
"""
Check pipeline configuration
"""
# Validate inputs
pipeline_schema = {
OptionalKey(ADVANCED): dict,
OptionalKey(APPLICATIONS): dict,
}
checker_inputs = Checker(pipeline_schema)
checker_inputs.validate(conf[PIPELINE])
[docs]
@staticmethod
def check_advanced_parameters(conf):
"""
Check the advanced parameters consistency
:param conf: configuration of inputs
:type conf: dict
:param config_dir: directory of used json/yaml, if
user filled paths with relative paths
:type config_dir: str
:return: overloaded inputs
:rtype: dict
"""
overloaded_conf = conf.copy()
overloaded_conf[adv_cst.SAVE_INTERMEDIATE_DATA] = conf.get(
adv_cst.SAVE_INTERMEDIATE_DATA, False
)
# Validate inputs
schema = {
adv_cst.SAVE_INTERMEDIATE_DATA: Or(dict, bool),
}
checker_advanced_parameters = Checker(schema)
checker_advanced_parameters.validate(overloaded_conf)
return overloaded_conf
[docs]
@staticmethod
def check_output(conf):
"""
Check the output given
:param conf: configuration of output
:type conf: dict
:return: overloader output
:rtype: dict
"""
overloaded_conf = conf.copy()
out_dir = conf[out_cst.OUT_DIRECTORY]
out_dir = os.path.abspath(out_dir)
# Ensure that output directory and its subdirectories exist
safe_makedirs(out_dir)
# Overload some parameters
overloaded_conf[out_cst.OUT_DIRECTORY] = out_dir
# Load auxiliary and subfields
overloaded_conf[out_cst.AUXILIARY] = overloaded_conf.get(
out_cst.AUXILIARY, {}
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_IMAGE] = overloaded_conf[
out_cst.AUXILIARY
].get(out_cst.AUX_IMAGE, True)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_DEM_MIN] = (
overloaded_conf[out_cst.AUXILIARY].get(out_cst.AUX_DEM_MIN, False)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_DEM_MAX] = (
overloaded_conf[out_cst.AUXILIARY].get(out_cst.AUX_DEM_MAX, False)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_DEM_MEDIAN] = (
overloaded_conf[out_cst.AUXILIARY].get(
out_cst.AUX_DEM_MEDIAN, False
)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_WEIGHTS] = (
overloaded_conf[out_cst.AUXILIARY].get(out_cst.AUX_WEIGHTS, False)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_CLASSIFICATION] = (
overloaded_conf[out_cst.AUXILIARY].get(
out_cst.AUX_CLASSIFICATION, False
)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_PERFORMANCE_MAP] = (
overloaded_conf[out_cst.AUXILIARY].get(
out_cst.AUX_PERFORMANCE_MAP, False
)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_CONTRIBUTING_PAIR] = (
overloaded_conf[out_cst.AUXILIARY].get(
out_cst.AUX_CONTRIBUTING_PAIR, False
)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_FILLING] = (
overloaded_conf[out_cst.AUXILIARY].get(out_cst.AUX_FILLING, False)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_AMBIGUITY] = (
overloaded_conf[out_cst.AUXILIARY].get(out_cst.AUX_AMBIGUITY, False)
)
overloaded_conf[out_cst.AUXILIARY][out_cst.AUX_EDGES] = overloaded_conf[
out_cst.AUXILIARY
].get(out_cst.AUX_EDGES, False)
# Check schema
output_schema = {
out_cst.OUT_DIRECTORY: str,
out_cst.AUXILIARY: dict,
}
checker_output = Checker(output_schema)
checker_output.validate(overloaded_conf)
# Check auxiliary keys
auxiliary_schema = {
out_cst.AUX_IMAGE: Or(bool, str, list),
out_cst.AUX_WEIGHTS: bool,
out_cst.AUX_CLASSIFICATION: Or(bool, dict, list),
out_cst.AUX_PERFORMANCE_MAP: Or(bool, list),
out_cst.AUX_CONTRIBUTING_PAIR: bool,
out_cst.AUX_FILLING: Or(bool, dict),
out_cst.AUX_AMBIGUITY: bool,
out_cst.AUX_EDGES: bool,
out_cst.AUX_DEM_MIN: bool,
out_cst.AUX_DEM_MAX: bool,
out_cst.AUX_DEM_MEDIAN: bool,
}
checker_auxiliary = Checker(auxiliary_schema)
checker_auxiliary.validate(overloaded_conf[out_cst.AUXILIARY])
return overloaded_conf
[docs]
def check_applications(self, conf):
"""
Check the given configuration for applications,
and generates needed applications for pipeline.
:param conf: configuration of applications
:type conf: dict
"""
# Initialize used config
used_conf = {}
needed_applications = ["dsm_merging"]
for app_key in needed_applications:
used_conf[app_key] = conf.get(app_key, {})
if used_conf[app_key] is not None:
used_conf[app_key]["save_intermediate_data"] = (
self.save_all_intermediate_data
or used_conf[app_key].get("save_intermediate_data", False)
)
# DSM merging
self.dsm_merging_application = Application(
"dsm_merging",
cfg=used_conf.get("dsm_merging", {}),
)
used_conf["dsm_merging"] = self.dsm_merging_application.get_conf()
return used_conf
[docs]
def run(self, args=None, log_dir=None): # pylint: disable=W0613
"""
Run pipeline
"""
if log_dir is None:
log_dir = os.path.join(self.out_dir, "logs")
with orchestrator.Orchestrator(
orchestrator_conf=self.used_conf[ORCHESTRATOR],
out_dir=self.out_dir,
log_dir=log_dir,
out_yaml_path=os.path.join(
self.out_dir,
out_cst.INFO_FILENAME,
),
) as cars_orchestrator:
# link metadata
self.metadata = cars_orchestrator.out_yaml
dsms_merging_dump_dir = os.path.join(self.dump_dir, "dsms_merging")
dsm_dict = self.used_conf[INPUT][dsm_cst.DSMS]
dict_path = {}
for key in dsm_dict.keys():
for path_name in dsm_dict[key].keys():
if dsm_dict[key][path_name] is not None:
if isinstance(dsm_dict[key][path_name], str):
if path_name not in dict_path:
dict_path[path_name] = [
dsm_dict[key][path_name]
]
else:
dict_path[path_name].append(
dsm_dict[key][path_name]
)
dsm_file_name = os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"dsm.tif",
)
color_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"image.tif",
)
if "texture" in dict_path
or self.used_conf[OUTPUT][out_cst.AUXILIARY][out_cst.AUX_IMAGE]
else None
)
performance_map_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"performance_map.tif",
)
if "performance_map" in dict_path
else None
)
ambiguity_bool = any("ambiguity" in key for key in dict_path)
ambiguity_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"ambiguity.tif",
)
if ambiguity_bool
else None
)
classif_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"classification.tif",
)
if "merging_classification" in dict_path
or self.used_conf[OUTPUT][out_cst.AUXILIARY][
out_cst.AUX_CLASSIFICATION
]
else None
)
contributing_all_pair_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"contributing_pair.tif",
)
if "contributing_pair" in dict_path
else None
)
filling_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"filling.tif",
)
if "merging_filling" in dict_path
else None
)
# Get ROI
epsg = rasterio_get_epsg(dict_path["dsm"][0])
(
input_roi_poly,
input_roi_epsg,
) = roi_tools.generate_roi_poly_from_inputs(
self.used_conf[INPUT][sens_cst.ROI]
)
roi_poly = preprocessing.compute_roi_poly(
input_roi_poly, input_roi_epsg, epsg
)
# Launch merging
_ = self.dsm_merging_application.run(
dict_path,
cars_orchestrator,
roi_poly,
dsms_merging_dump_dir,
dsm_file_name,
color_file_name,
classif_file_name,
filling_file_name,
performance_map_file_name,
ambiguity_file_name,
contributing_all_pair_file_name,
)