#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=too-many-lines
# attribute-defined-outside-init is disabled so that we can create and use
# attributes however we need, to stick to the "everything is attribute" logic
# introduced in issue#895
# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-nested-blocks
"""
CARS subsampling pipeline class file
"""
# Standard imports
from __future__ import print_function
import copy
import logging
import os
from pathlib import Path
import rasterio as rio
import yaml
from json_checker import Checker, OptionalKey, Or
from cars.applications.application import Application
from cars.core import cars_logging
from cars.core.utils import safe_makedirs
from cars.orchestrator import orchestrator
from cars.orchestrator.cluster.log_wrapper import cars_profile
from cars.pipelines.parameters import advanced_parameters_constants as adv_cst
from cars.pipelines.parameters import output_constants as out_cst
from cars.pipelines.parameters import output_parameters, sensor_inputs
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
from cars.pipelines.pipeline import Pipeline
from cars.pipelines.pipeline_constants import (
ADVANCED,
APPLICATIONS,
INPUT,
ORCHESTRATOR,
OUTPUT,
)
from cars.pipelines.pipeline_template import PipelineTemplate
PIPELINE = "subsampling"
[docs]
@Pipeline.register(
PIPELINE,
)
class SubsamplingPipeline(PipelineTemplate):
"""
SubsamplingPipeline
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, conf, config_dir=None): # noqa: C901
"""
Creates pipeline
:param pipeline_name: name of the pipeline.
:type pipeline_name: str
:param cfg: configuration {'matching_cost_method': value}
:type cfg: dictionary
:param config_dir: path to dir containing json or yaml file
:type config_dir: str
"""
self.config_dir = config_dir
# Transform relative path to absolute path
if config_dir is not None:
config_dir = os.path.abspath(config_dir)
# metadata
self.metadata = None
# Check global conf
self.check_global_schema(conf)
if PIPELINE in conf:
self.check_pipeline_conf(conf)
else:
conf[PIPELINE] = {}
self.out_dir = conf[OUTPUT][out_cst.OUT_DIRECTORY]
self.subsampling_dir = os.path.join(self.out_dir, "subsampling")
pipeline_conf = conf.get(PIPELINE, {})
# Check input
conf[INPUT] = self.check_inputs(conf[INPUT], config_json_dir=config_dir)
# check advanced
conf[PIPELINE][ADVANCED] = self.check_advanced(
pipeline_conf.get(ADVANCED, {}),
conf[INPUT],
)
# Get epipolar resolutions to use
self.resolutions = conf[PIPELINE][ADVANCED][adv_cst.RESOLUTIONS]
# check output
conf[OUTPUT] = self.check_output(conf)
self.used_conf = {}
# Check conf orchestrator
self.used_conf[ORCHESTRATOR] = self.check_orchestrator(
conf.get(ORCHESTRATOR, None)
)
self.used_conf[INPUT] = conf[INPUT]
self.used_conf[OUTPUT] = conf[OUTPUT]
self.used_conf[ADVANCED] = conf[PIPELINE][ADVANCED]
self.used_conf[APPLICATIONS] = self.check_applications(
pipeline_conf.get(APPLICATIONS, {})
)
[docs]
def check_pipeline_conf(self, conf):
"""
Check pipeline configuration
"""
# Validate inputs
pipeline_schema = {
OptionalKey(ADVANCED): dict,
OptionalKey(APPLICATIONS): dict,
}
checker_inputs = Checker(pipeline_schema)
checker_inputs.validate(conf[PIPELINE])
[docs]
def check_output(self, conf):
"""
Check the output given
:param conf: configuration of output
:type conf: dict
:return overloader output
:rtype : dict
"""
conf_output, _ = output_parameters.check_output_parameters(
conf[INPUT], conf[OUTPUT], None
)
return conf_output
[docs]
def check_advanced(self, conf, inputs):
"""
Check all conf for advanced configuration
:return: overridden advanced conf
:rtype: dict
"""
overloaded_conf = copy.deepcopy(conf)
overloaded_conf[adv_cst.SAVE_INTERMEDIATE_DATA] = conf.get(
adv_cst.SAVE_INTERMEDIATE_DATA, False
)
overloaded_conf[adv_cst.RESOLUTIONS] = overloaded_conf.get(
adv_cst.RESOLUTIONS, [16, 4, 1]
)
overloaded_conf[adv_cst.MIN_IMAGE_SIZE] = conf.get(
adv_cst.MIN_IMAGE_SIZE, 80
)
if isinstance(overloaded_conf[adv_cst.RESOLUTIONS], int):
overloaded_conf[adv_cst.RESOLUTIONS] = [
overloaded_conf[adv_cst.RESOLUTIONS]
]
# Check minimal image size
sizes = []
for sensor in inputs[sens_cst.SENSORS]:
try:
image = rio.open(
inputs[sens_cst.SENSORS][sensor][sens_cst.INPUT_IMG][
"bands"
]["b0"]["path"]
)
image_width = min(image.height, image.width)
sizes.append(image_width)
except (KeyError, rio.errors.RasterioIOError):
logging.warning("Sensor {} not found".format(sensor))
min_resolution = 1
if sizes:
min_size = min(sizes)
min_resolution = min(overloaded_conf[adv_cst.RESOLUTIONS])
for res in overloaded_conf[adv_cst.RESOLUTIONS][:]:
if min_size / res < overloaded_conf[adv_cst.MIN_IMAGE_SIZE]:
overloaded_conf[adv_cst.RESOLUTIONS].remove(res)
if len(overloaded_conf[adv_cst.RESOLUTIONS]) == 0:
logging.warning(
"All resolutions are too low for the given images, {} is"
" used as resolution".format(min_resolution)
)
overloaded_conf[adv_cst.RESOLUTIONS] = [min_resolution]
schema = {
adv_cst.SAVE_INTERMEDIATE_DATA: Or(dict, bool),
adv_cst.RESOLUTIONS: [int],
adv_cst.MIN_IMAGE_SIZE: int,
}
checker_advanced_parameters = Checker(schema)
checker_advanced_parameters.validate(overloaded_conf)
return overloaded_conf
[docs]
def check_applications(self, conf):
"""
Check the given configuration for applications
:param conf: configuration of applications
:type conf: dict
"""
needed_applications = ["sensors_subsampling"]
# Check if all specified applications are used
# Application in terrain_application are note used in
# the sensors_to_dense_depth_maps pipeline
for app_key in conf.keys():
if app_key not in needed_applications:
msg = (
f"No {app_key} application used in the "
+ "default Cars pipeline"
)
logging.error(msg)
raise NameError(msg)
used_conf = copy.deepcopy(conf)
self.sensors_subsampling = Application(
"sensors_subsampling",
cfg=used_conf.get("sensors_subsampling", {}),
)
used_conf["sensors_subsampling"] = self.sensors_subsampling.get_conf()
return used_conf
@cars_profile(name="Run_subsampling_pipeline", interval=0.5)
def run(self, args=None, log_dir=None): # noqa C901 # pylint: disable=W0613
"""
Run pipeline
"""
cars_logging.add_progress_message("Starting subsampling pipeline")
inputs = copy.deepcopy(self.used_conf[INPUT])
if log_dir is not None:
self.log_dir = log_dir
else:
self.log_dir = os.path.join(self.subsampling_dir, "logs")
with orchestrator.Orchestrator(
orchestrator_conf=self.used_conf[ORCHESTRATOR],
out_dir=self.subsampling_dir,
log_dir=self.log_dir,
out_yaml_path=os.path.join(
self.subsampling_dir,
out_cst.INFO_FILENAME,
),
) as self.cars_orchestrator:
# link metadata
self.metadata = self.cars_orchestrator.out_yaml
for res in self.resolutions:
if res != 1:
conf_to_save = copy.deepcopy(self.used_conf)
for key, val in inputs[sens_cst.SENSORS].items():
# Define the output directory
out_directory = os.path.join(
self.subsampling_dir, "res_" + str(res)
)
safe_makedirs(out_directory)
_ = self.sensors_subsampling.run(
key,
val,
res,
out_directory,
self.cars_orchestrator,
)
self.formatting(
key, os.path.abspath(out_directory), conf_to_save
)
out_yaml = os.path.abspath(
os.path.join(out_directory, "input.yaml")
)
with open(out_yaml, "w", encoding="utf-8") as f:
yaml.dump(
conf_to_save[INPUT],
f,
default_flow_style=False,
sort_keys=False,
)