#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=too-many-lines
# attribute-defined-outside-init is disabled so that we can create and use
# attributes however we need, to stick to the "everything is attribute" logic
# introduced in issue#895
# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-nested-blocks
"""
CARS default pipeline class file
"""
# Standard imports
from __future__ import print_function
import copy
import json
import logging
import os
import shutil
from collections import OrderedDict
from datetime import datetime
import yaml
# CARS imports
from cars.core import cars_logging
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset
from cars.orchestrator.cluster import log_wrapper
from cars.orchestrator.cluster.log_wrapper import cars_profile
from cars.pipelines import pipeline_constants as pipeline_cst
from cars.pipelines.filling.filling import FillingPipeline
from cars.pipelines.formatting.formatting import FormattingPipeline
from cars.pipelines.merging.merging import MergingPipeline
from cars.pipelines.parameters import advanced_parameters_constants as adv_cst
from cars.pipelines.parameters import dsm_inputs
from cars.pipelines.parameters import dsm_inputs_constants as dsm_cst
from cars.pipelines.parameters import output_constants as out_cst
from cars.pipelines.parameters import output_parameters, sensor_inputs
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
from cars.pipelines.parameters.output_constants import AUXILIARY
from cars.pipelines.pipeline import Pipeline
from cars.pipelines.pipeline_constants import (
ADVANCED,
APPLICATIONS,
INPUT,
ORCHESTRATOR,
OUTPUT,
PIPELINE,
)
from cars.pipelines.pipeline_template import PipelineTemplate
from cars.pipelines.subsampling.subsampling import SubsamplingPipeline
from cars.pipelines.surface_modeling.surface_modeling import (
SurfaceModelingPipeline,
)
package_path = os.path.dirname(__file__)
FIRST_RES = "first_resolution"
INTERMEDIATE_RES = "intermediate_resolution"
FINAL_RES = "final_resolution"
PIPELINE_CONFS = {
FIRST_RES: os.path.join(
package_path,
"..",
"conf_resolution",
"conf_first_resolution.yaml",
),
INTERMEDIATE_RES: os.path.join(
package_path,
"..",
"conf_resolution",
"conf_intermediate_resolution.yaml",
),
FINAL_RES: os.path.join(
package_path,
"..",
"conf_resolution",
"conf_final_resolution.yaml",
),
}
# pylint: disable=E0401
[docs]
@Pipeline.register(
"default",
)
class DefaultPipeline(PipelineTemplate):
"""
DefaultPipeline
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, conf, config_dir=None): # noqa: C901
"""
Creates pipeline
:param pipeline_name: name of the pipeline.
:type pipeline_name: str
:param cfg: configuration {'matching_cost_method': value}
:type cfg: dictionary
:param config_dir: path to dir containing json or yaml file
:type config_dir: str
"""
self.config_dir = config_dir
# Transform relative path to absolute path
if config_dir is not None:
config_dir = os.path.abspath(config_dir)
# Check global conf
self.check_global_schema(conf)
# init global metadata
self.metadata = {}
self.out_dir = conf[OUTPUT][out_cst.OUT_DIRECTORY]
conf[PIPELINE] = self.check_pipeline(conf)
self.pipeline_to_use = conf[PIPELINE]
# Check input
conf[INPUT] = self.check_inputs(conf, config_json_dir=config_dir)
# check output
conf[OUTPUT] = self.check_output(conf)
self.intermediate_data_dir = os.path.join(
self.out_dir, "intermediate_data"
)
conf[ORCHESTRATOR] = self.check_orchestrator(
conf.get(ORCHESTRATOR, None)
)
if self.pipeline_to_use[pipeline_cst.SUBSAMPLING]:
self.subsampling_conf = self.construct_subsampling_conf(conf)
conf[pipeline_cst.SUBSAMPLING] = self.check_subsampling(
self.subsampling_conf,
)
# Get epipolar resolutions to use
self.resolutions = conf[pipeline_cst.SUBSAMPLING][ADVANCED][
adv_cst.RESOLUTIONS
]
else:
self.resolutions = [1]
for pipeline, activated in self.pipeline_to_use.items():
if pipeline in conf and not activated:
logging.warning(
f"You tried to override the {pipeline} pipeline but "
f"didn't specify it in the pipeline section. "
"Therefore, this pipeline will not be used"
)
if pipeline in conf and activated:
# Check pipeline conf format
self.check_pipeline_section(pipeline, conf[pipeline])
if pipeline_cst.SURFACE_MODELING not in conf:
conf[pipeline_cst.SURFACE_MODELING] = {}
used_configurations = {}
self.positions = {}
self.used_conf = {}
self.keep_low_res_dir = True
if dsm_cst.DSMS in conf[INPUT] and len(self.resolutions) != 1:
logging.info(
"For the use of those pipelines, "
"you have to give only one resolution"
)
# overide epipolar resolutions
# TODO: delete with external dsm pipeline (refactoring)
self.resolutions = [1]
elif (
not self.pipeline_to_use[pipeline_cst.SUBSAMPLING]
and len(self.resolutions) != 1
):
logging.warning(
"As you're not using the subsampling pipeline, "
"the working resolution will be 1"
)
self.resolutions = [1]
if self.pipeline_to_use[pipeline_cst.FILLING]:
self.filling_conf = self.construct_filling_conf(conf)
conf[pipeline_cst.FILLING] = self.check_filling(self.filling_conf)
consistent_filling = False
for app in conf[pipeline_cst.FILLING][APPLICATIONS]:
if "dsm_filling" in app:
consistent_filling = True
for filling_method in conf[INPUT][sens_cst.FILLING]:
if conf[INPUT][sens_cst.FILLING][filling_method]:
consistent_filling = True
if not consistent_filling:
self.pipeline_to_use[pipeline_cst.FILLING] = False
subsampling_used_conf = conf.get(pipeline_cst.SUBSAMPLING, {})
filling_used_conf = conf.get(pipeline_cst.FILLING, {})
if self.pipeline_to_use[pipeline_cst.SURFACE_MODELING]:
for epipolar_resolution_index, epipolar_res in enumerate(
self.resolutions
):
first_res = epipolar_resolution_index == 0
last_res = (
epipolar_resolution_index == len(self.resolutions) - 1
)
intermediate_res = not first_res and not last_res
# set computed bool
self.positions[epipolar_resolution_index] = {
"first_res": first_res,
"intermediate_res": intermediate_res,
"last_res": last_res,
}
current_conf = copy.deepcopy(conf)
current_conf = extract_conf_with_resolution(
current_conf,
epipolar_res,
first_res,
intermediate_res,
last_res,
self.intermediate_data_dir,
self.pipeline_to_use,
)
if not isinstance(epipolar_res, int) or epipolar_res < 0:
raise RuntimeError("The resolution has to be an int > 0")
self.used_conf[epipolar_resolution_index] = current_conf
# Initialize unit pipeline in order to retrieve the
# used configuration
# This pipeline will not be run
_ = current_conf.pop(pipeline_cst.SUBSAMPLING, None)
_ = current_conf.pop(pipeline_cst.FILLING, None)
current_unit_pipeline = SurfaceModelingPipeline(
current_conf,
config_dir=self.config_dir,
)
if last_res and self.pipeline_to_use[pipeline_cst.FILLING]:
# Force classification saving for filling
if not current_unit_pipeline.used_conf[OUTPUT][AUXILIARY][
sens_cst.INPUT_CLASSIFICATION
]:
current_unit_pipeline.used_conf[OUTPUT][AUXILIARY][
sens_cst.INPUT_CLASSIFICATION
] = True
if not self.filling_conf[OUTPUT][AUXILIARY][
sens_cst.INPUT_CLASSIFICATION
]:
self.filling_conf[OUTPUT][AUXILIARY][
sens_cst.INPUT_CLASSIFICATION
] = True
# Get used_conf
used_configurations[epipolar_res] = (
current_unit_pipeline.used_conf
)
# Generate full used_conf
full_used_conf = merge_used_conf(
used_configurations,
self.resolutions,
os.path.abspath(self.out_dir),
)
else:
self.used_conf = copy.deepcopy(conf)
full_used_conf = self.used_conf
full_used_conf[pipeline_cst.SUBSAMPLING] = subsampling_used_conf
full_used_conf[pipeline_cst.PIPELINE] = conf[PIPELINE]
full_used_conf[pipeline_cst.FILLING] = filling_used_conf
# Save used_conf
cars_dataset.save_dict(
full_used_conf,
os.path.join(self.out_dir, "global_used_conf.yaml"),
)
[docs]
def check_output(self, conf):
"""
Check the output given
:param conf: configuration of output
:type conf: dict
:return overloader output
:rtype : dict
"""
conf_output, _ = output_parameters.check_output_parameters(
conf[INPUT], conf[OUTPUT]
)
return conf_output
[docs]
def check_pipeline(self, conf): # noqa: C901
"""
Check the pipeline section
"""
possible_pipeline = [
pipeline_cst.SUBSAMPLING,
pipeline_cst.SURFACE_MODELING,
pipeline_cst.TIE_POINTS,
pipeline_cst.FILLING,
pipeline_cst.MERGING,
pipeline_cst.FORMATTING,
]
dict_pipeline = {}
if PIPELINE not in conf:
if dsm_cst.DSMS in conf[INPUT]:
conf[PIPELINE] = [pipeline_cst.MERGING, pipeline_cst.FORMATTING]
elif sens_cst.SENSORS in conf[INPUT]:
conf[PIPELINE] = [
pipeline_cst.SUBSAMPLING,
pipeline_cst.SURFACE_MODELING,
pipeline_cst.TIE_POINTS,
pipeline_cst.FORMATTING,
]
if isinstance(conf[PIPELINE], str):
if conf[PIPELINE] not in possible_pipeline:
raise RuntimeError("This pipeline does not exist")
dict_pipeline = {conf[PIPELINE]: True}
elif isinstance(conf[PIPELINE], list):
for elem in conf[PIPELINE]:
if elem not in possible_pipeline:
raise RuntimeError(f"The pipeline {elem} does not exist")
dict_pipeline.update({elem: True})
elif isinstance(conf[PIPELINE], dict):
for key, _ in conf[PIPELINE].items():
if key not in possible_pipeline:
raise RuntimeError(f"The pipeline {key} does not exist")
dict_pipeline = copy.deepcopy(conf[PIPELINE])
for key in possible_pipeline:
if key not in dict_pipeline:
dict_pipeline.update({key: False})
if (
dsm_cst.DSMS in conf[INPUT]
and not dict_pipeline[pipeline_cst.MERGING]
):
dict_pipeline[pipeline_cst.MERGING] = True
elif (
dsm_cst.DSMS in conf[INPUT]
and dict_pipeline[pipeline_cst.SURFACE_MODELING]
):
raise RuntimeError(
"You can not use the surface modeling pipeline with dsm inputs"
)
elif (
sens_cst.SENSORS in conf[INPUT]
and dict_pipeline[pipeline_cst.MERGING]
and dsm_cst.DSMS not in conf[INPUT]
):
raise RuntimeError(
"You can not use the merging pipeline with sensors inputs only"
)
activate_filling = False
if pipeline_cst.FILLING in conf[INPUT]:
for filling_method in conf[INPUT][pipeline_cst.FILLING]:
if conf[INPUT][pipeline_cst.FILLING][filling_method]:
activate_filling = True
if pipeline_cst.FILLING in conf and conf[pipeline_cst.FILLING]:
activate_filling = True
if (
sens_cst.LOADERS in conf[INPUT]
and sens_cst.INPUT_CLASSIFICATION in conf[INPUT][sens_cst.LOADERS]
and conf[INPUT][sens_cst.LOADERS][sens_cst.INPUT_CLASSIFICATION]
== "slurp"
):
activate_filling = True
if activate_filling:
dict_pipeline[pipeline_cst.FILLING] = True
if (
pipeline_cst.SURFACE_MODELING in conf[INPUT]
and not dict_pipeline[pipeline_cst.SURFACE_MODELING]
):
dict_pipeline[pipeline_cst.SURFACE_MODELING] = True
if (
pipeline_cst.MERGING in conf[INPUT]
and not dict_pipeline[pipeline_cst.MERGING]
):
dict_pipeline[pipeline_cst.MERGING] = True
if (
pipeline_cst.SUBSAMPLING in conf[INPUT]
and not dict_pipeline[pipeline_cst.SUBSAMPLING]
):
dict_pipeline[pipeline_cst.SUBSAMPLING] = True
if (
pipeline_cst.TIE_POINTS in conf[INPUT]
and not dict_pipeline[pipeline_cst.TIE_POINTS]
):
dict_pipeline[pipeline_cst.TIE_POINTS] = True
return dict_pipeline
[docs]
def check_subsampling(self, conf):
"""
Check the subsampling section
:param conf: configuration of subsampling
:type conf: dict
"""
pipeline = SubsamplingPipeline(conf)
advanced = pipeline.check_advanced(
conf[pipeline_cst.SUBSAMPLING].get(ADVANCED, {}),
conf[INPUT],
)
applications = pipeline.check_applications(
conf[pipeline_cst.SUBSAMPLING].get(APPLICATIONS, {})
)
return {ADVANCED: advanced, APPLICATIONS: applications}
[docs]
def check_filling(self, conf):
"""
Check the filling section
:param conf: configuration of subsampling
:type conf: dict
"""
pipeline = FillingPipeline(conf, pre_check=True)
advanced = pipeline.check_advanced(
conf[pipeline_cst.FILLING],
conf[INPUT],
)
applications = pipeline.check_applications(
conf[pipeline_cst.FILLING].get(APPLICATIONS, {})
)
return {ADVANCED: advanced, APPLICATIONS: applications}
[docs]
def check_pipeline_section(self, pipeline_name, pipeline_conf):
"""
Check any pipeline section
:param pipeline_name: key name in conf
:type pipeline_name: str
:param pipeline_conf: pipeline configuration
:type pipeline_conf: dict
"""
for key in pipeline_conf:
if key not in [APPLICATIONS, ADVANCED]:
raise KeyError(
"Keys of a pipeline must be 'applications' or 'advanced'"
)
if pipeline_name in (
pipeline_cst.SURFACE_MODELING,
pipeline_cst.TIE_POINTS,
):
int_keys = [int(epi_res) for epi_res in self.resolutions]
string_keys = [str(key) for key in int_keys]
possible_keys = ["all"] + int_keys + string_keys
for section in pipeline_conf:
for key in pipeline_conf[section]:
if key not in possible_keys:
raise KeyError(
"When meta pipeline is used, keys of {} pipeline"
"must be in {}".format(
pipeline_name,
string_keys + ["all"],
)
)
[docs]
def cleanup_low_res_dir(self):
"""
Clean low res dir
"""
if os.path.exists(self.intermediate_data_dir) and os.path.isdir(
self.intermediate_data_dir
):
try:
shutil.rmtree(self.intermediate_data_dir)
logging.info(
f"th directory {self.intermediate_data_dir} "
f" has been cleaned."
)
except Exception as exception:
logging.error(
f"Error while deleting {self.intermediate_data_dir}: "
f"{exception}"
)
else:
logging.info(
f"The directory {self.intermediate_data_dir} has not "
f"been deleted"
)
[docs]
def construct_merging_conf(self, conf):
"""
Construct the right conf for merging
"""
merging_conf = {}
merging_conf[INPUT] = copy.deepcopy(conf[INPUT])
merging_conf[ORCHESTRATOR] = copy.deepcopy(conf[ORCHESTRATOR])
merging_conf[OUTPUT] = {}
merging_conf[OUTPUT]["directory"] = os.path.join(
self.intermediate_data_dir, pipeline_cst.MERGING
)
merging_conf[OUTPUT][AUXILIARY] = conf[OUTPUT].get(AUXILIARY, {})
merging_conf[pipeline_cst.MERGING] = conf.get(pipeline_cst.MERGING, {})
return merging_conf
[docs]
def construct_subsampling_conf(self, conf):
"""
Construct the right conf for subsampling
"""
subsampling_conf = {}
subsampling_conf[INPUT] = copy.deepcopy(conf[INPUT])
subsampling_conf[ORCHESTRATOR] = copy.deepcopy(conf[ORCHESTRATOR])
subsampling_conf[OUTPUT] = {}
subsampling_conf[OUTPUT]["directory"] = self.intermediate_data_dir
subsampling_conf[pipeline_cst.SUBSAMPLING] = conf.get(
pipeline_cst.SUBSAMPLING, {}
)
return subsampling_conf
[docs]
def construct_filling_conf(self, conf):
"""
Construct the right conf for filling
"""
filling_conf = {}
filling_conf[INPUT] = copy.deepcopy(conf[INPUT])
_ = filling_conf[INPUT].pop(dsm_cst.DSMS, None)
filling_conf[OUTPUT] = copy.deepcopy(conf[OUTPUT])
filling_conf[OUTPUT]["directory"] = self.intermediate_data_dir
filling_conf[pipeline_cst.FILLING] = conf.get(pipeline_cst.FILLING, {})
return filling_conf
@cars_profile(name="Run_default_pipeline", interval=0.5)
def run(self, args=None): # noqa C901
"""
Run pipeline
"""
loglevel = getattr(args, "loglevel", "PROGRESS").upper()
global_log_file = os.path.join(
self.out_dir,
"logs",
"{}_{}.log".format(
datetime.now().strftime("%y-%m-%d_%Hh%Mm"), "default_pipeline"
),
)
previous_out_dir = None
current_surface_modeling_out_dir = None
updated_conf = {}
if self.pipeline_to_use[pipeline_cst.SUBSAMPLING]:
current_log_dir = os.path.join(self.out_dir, "logs", "subsampling")
cars_logging.setup_logging(
loglevel,
out_dir=current_log_dir,
pipeline="subsampling",
global_log_file=global_log_file,
)
subsampling_pipeline = SubsamplingPipeline(
self.subsampling_conf, self.config_dir
)
subsampling_pipeline.run(
log_dir=current_log_dir,
)
# Update metadata
self.metadata[pipeline_cst.SUBSAMPLING] = (
subsampling_pipeline.metadata
)
# generate summary
log_wrapper.generate_summary(
current_log_dir,
subsampling_pipeline.used_conf,
pipeline_cst.SUBSAMPLING,
)
if self.pipeline_to_use[pipeline_cst.SURFACE_MODELING]:
for resolution_index, epipolar_res in enumerate(self.resolutions):
# Get tested unit pipeline
current_conf = self.used_conf[resolution_index]
current_surface_modeling_out_dir = current_conf[OUTPUT][
"directory"
]
# Put right directory for subsampling
if self.pipeline_to_use[pipeline_cst.SUBSAMPLING]:
if epipolar_res != 1:
yaml_file = os.path.join(
self.intermediate_data_dir,
"subsampling/res_"
+ str(epipolar_res)
+ "/input.yaml",
)
with open(yaml_file, encoding="utf-8") as f:
data = yaml.safe_load(f)
json_str = json.dumps(data, indent=4)
data = json.loads(json_str)
current_conf[INPUT] = data
# update directory for unit pipeline
current_conf[OUTPUT][
"directory"
] = current_surface_modeling_out_dir
# get position
first_res, _, last_res = (
self.positions[resolution_index]["first_res"],
self.positions[resolution_index]["intermediate_res"],
self.positions[resolution_index]["last_res"],
)
# setup logging
current_log_dir = os.path.join(
self.out_dir,
"logs",
"surface_modeling_res_" + str(epipolar_res),
)
cars_logging.setup_logging(
loglevel,
out_dir=current_log_dir,
pipeline="surface_modeling",
global_log_file=global_log_file,
)
cars_logging.add_progress_message(
"Starting surface modeling pipeline for resolution 1/"
+ str(epipolar_res)
)
# define wich resolution
if first_res and last_res:
which_resolution = "single"
elif first_res:
which_resolution = "first"
elif last_res:
which_resolution = "final"
else:
which_resolution = "intermediate"
# Overide with a priori
if not first_res:
dsm = os.path.join(previous_out_dir, "dsm/dsm.tif")
current_conf[INPUT][sens_cst.LOW_RES_DSM] = dsm
if not last_res:
for _, data in current_conf[INPUT][
sens_cst.SENSORS
].items():
data["edges"] = {
"edges_mask": None,
"depth_map": None,
"normals": None,
"tile_id": None,
}
updated_pipeline = SurfaceModelingPipeline(
current_conf,
config_dir=self.config_dir,
)
updated_pipeline.run(
which_resolution=which_resolution,
working_res=epipolar_res,
log_dir=current_log_dir,
)
# Update metadata
if pipeline_cst.SURFACE_MODELING not in self.metadata:
self.metadata[pipeline_cst.SURFACE_MODELING] = {}
self.metadata[pipeline_cst.SURFACE_MODELING][
str(epipolar_res)
] = updated_pipeline.metadata
# update previous out dir
previous_out_dir = current_surface_modeling_out_dir
# generate summary
log_wrapper.generate_summary(
current_log_dir,
updated_pipeline.used_conf,
pipeline_cst.SURFACE_MODELING,
)
updated_conf[epipolar_res] = updated_pipeline.used_conf
# Generate full used_conf
full_used_conf = merge_used_conf(
updated_conf,
self.resolutions,
os.path.abspath(self.out_dir),
)
else:
full_used_conf = self.used_conf
final_conf = None
if self.pipeline_to_use[pipeline_cst.MERGING]:
current_log_dir = os.path.join(self.out_dir, "logs", "merging")
cars_logging.setup_logging(
loglevel,
out_dir=current_log_dir,
pipeline="merging",
global_log_file=global_log_file,
)
merging_conf = self.construct_merging_conf(self.used_conf)
merging_pipeline = MergingPipeline(merging_conf, self.config_dir)
merging_pipeline.run()
# Update metadata
self.metadata[pipeline_cst.MERGING] = merging_pipeline.metadata
# generate summary
log_wrapper.generate_summary(
current_log_dir,
merging_pipeline.used_conf,
pipeline_cst.MERGING,
)
final_conf = merging_pipeline.used_conf
if updated_conf and final_conf is None:
last_key = list(updated_conf.keys())[-1]
final_conf = updated_conf[last_key]
elif not updated_conf and final_conf is None:
final_conf = self.used_conf
formatting_input_dir = final_conf[OUTPUT][out_cst.OUT_DIRECTORY]
if self.pipeline_to_use[pipeline_cst.FILLING]:
current_log_dir = os.path.join(self.out_dir, "logs", "filling")
cars_logging.setup_logging(
loglevel,
out_dir=current_log_dir,
pipeline="filling",
global_log_file=global_log_file,
)
if self.filling_conf[INPUT][pipeline_cst.DSM_TO_FILL] is None:
if (
not self.pipeline_to_use[pipeline_cst.SURFACE_MODELING]
and not self.pipeline_to_use[pipeline_cst.MERGING]
):
raise RuntimeError(
"You have to fill the dsm_to_fill part of the input if "
"you want to use the filling pipeline separately"
)
self.filling_conf[INPUT][pipeline_cst.DSM_TO_FILL] = {}
aux_path = os.path.join(
final_conf[OUTPUT][out_cst.OUT_DIRECTORY], "dsm/"
)
self.filling_conf[INPUT][pipeline_cst.DSM_TO_FILL]["dsm"] = (
os.path.join(aux_path, "dsm.tif")
)
for aux_output, val in final_conf[OUTPUT][
out_cst.AUXILIARY
].items():
if val:
self.filling_conf[INPUT][pipeline_cst.DSM_TO_FILL][
aux_output
] = os.path.join(aux_path, aux_output + ".tif")
initial_elevation = final_conf[INPUT][
sens_cst.INITIAL_ELEVATION
].get("dem", None)
if (
initial_elevation is not None
and "dem_median" in initial_elevation
):
self.filling_conf[INPUT][sens_cst.INITIAL_ELEVATION] = None
filling_pipeline = FillingPipeline(
self.filling_conf, self.config_dir
)
filling_pipeline.run(
log_dir=current_log_dir,
)
# Update metadata
self.metadata[pipeline_cst.FILLING] = filling_pipeline.metadata
# generate summary
log_wrapper.generate_summary(
current_log_dir,
filling_pipeline.used_conf,
pipeline_cst.FILLING,
)
formatting_input_dir = os.path.join(
filling_pipeline.used_conf[OUTPUT][out_cst.OUT_DIRECTORY],
pipeline_cst.FILLING,
)
if self.pipeline_to_use[pipeline_cst.FORMATTING]:
current_log_dir = os.path.join(self.out_dir, "logs", "formatting")
cars_logging.setup_logging(
loglevel,
out_dir=current_log_dir,
pipeline="formatting",
global_log_file=global_log_file,
)
formatting_conf = self.construct_formatting_conf(
formatting_input_dir
)
formatting_pipeline = FormattingPipeline(
formatting_conf, self.config_dir
)
formatting_pipeline.run(
surface_modeling_dir=current_surface_modeling_out_dir,
log_dir=current_log_dir,
)
# Update metadata
self.metadata[pipeline_cst.FORMATTING] = (
formatting_pipeline.metadata
)
# generate summary
log_wrapper.generate_summary(
current_log_dir,
formatting_pipeline.used_conf,
pipeline_cst.FORMATTING,
)
if self.pipeline_to_use[pipeline_cst.FILLING]:
full_used_conf[pipeline_cst.FILLING] = {
ADVANCED: filling_pipeline.used_conf[ADVANCED],
APPLICATIONS: filling_pipeline.used_conf[APPLICATIONS],
}
# Save used_conf
cars_dataset.save_dict(
full_used_conf,
os.path.join(self.out_dir, "global_used_conf.yaml"),
)
# Save metadata file
cars_dataset.save_dict(
self.metadata,
os.path.join(self.out_dir, "metadata.yaml"),
)
# Merge profiling in pdf
log_wrapper.generate_pdf_profiling(os.path.join(self.out_dir, "logs"))
# clean outdir
if not self.keep_low_res_dir:
self.cleanup_low_res_dir()
# pylint: disable=too-many-positional-arguments
[docs]
def generate_filling_applications_for_surface_modeling(inputs_conf):
"""
Generate filling applications configuration according to inputs
:param inputs_conf: inputs configuration
:type inputs_conf: dict
"""
filling_applications = {}
# Generate applications configuration
for _, classif_values in inputs_conf[sens_cst.FILLING].items():
# No filling
if classif_values is None:
continue
classif_values = list(map(str, classif_values))
# Update application configuration
new_filling_conf = {
"dense_match_filling": {
"method": "zero_padding",
"classification": classif_values,
}
}
# Update application configuration
filling_applications = overide_pipeline_conf(
filling_applications, new_filling_conf, append_classification=True
)
return filling_applications
[docs]
def overide_pipeline_conf(conf, overiding_conf, append_classification=False):
"""
Merge two dictionaries recursively without removing keys from the base conf.
:param conf: base configuration dictionary
:type conf: dict
:param overiding_conf: overriding configuration dictionary
:type overiding_conf: dict
:return: merged configuration
:rtype: dict
"""
result = copy.deepcopy(conf)
def merge_recursive(base_dict, override_dict):
"""
Main recursive function
"""
for key, value in override_dict.items():
if (
key in base_dict
and isinstance(base_dict[key], dict)
and isinstance(value, dict)
):
merge_recursive(base_dict[key], value)
elif (
append_classification
and key in base_dict
and isinstance(base_dict[key], list)
and isinstance(value, list)
and key == "classification"
):
# extend list, avoiding duplicates
base_dict[key] = list(
OrderedDict.fromkeys(base_dict[key] + value)
)
else:
base_dict[key] = value
merge_recursive(result, overiding_conf)
return result
[docs]
def merge_used_conf(used_configurations, resolutions, out_dir):
"""
Merge all used configuration
"""
used_configurations = copy.deepcopy(used_configurations)
merged_conf = {
INPUT: used_configurations[resolutions[-1]][INPUT],
OUTPUT: used_configurations[resolutions[-1]][OUTPUT],
ORCHESTRATOR: used_configurations[resolutions[0]][ORCHESTRATOR],
}
merged_conf[OUTPUT]["directory"] = out_dir
merged_conf[pipeline_cst.TIE_POINTS] = {APPLICATIONS: {}, ADVANCED: {}}
merged_conf[pipeline_cst.SURFACE_MODELING] = {
APPLICATIONS: {},
ADVANCED: {},
}
for resolution in resolutions:
used_conf = used_configurations[resolution]
if pipeline_cst.TIE_POINTS in used_conf:
merged_conf[pipeline_cst.TIE_POINTS][APPLICATIONS][
str(resolution)
] = used_conf[pipeline_cst.TIE_POINTS][APPLICATIONS]
merged_conf[pipeline_cst.TIE_POINTS][ADVANCED][str(resolution)] = (
used_conf[pipeline_cst.TIE_POINTS][ADVANCED]
)
merged_conf[pipeline_cst.SURFACE_MODELING][APPLICATIONS][
str(resolution)
] = used_conf[pipeline_cst.SURFACE_MODELING][APPLICATIONS]
merged_conf[pipeline_cst.SURFACE_MODELING][ADVANCED][
str(resolution)
] = used_conf[pipeline_cst.SURFACE_MODELING][ADVANCED]
return merged_conf