Source code for cars.pipelines.subsampling.subsampling

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=too-many-lines
# attribute-defined-outside-init is disabled so that we can create and use
# attributes however we need, to stick to the "everything is attribute" logic
# introduced in issue#895
# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-nested-blocks
"""
CARS subsampling pipeline class file
"""
# Standard imports
from __future__ import print_function

import copy
import logging
import os
from pathlib import Path

import rasterio as rio
import yaml
from json_checker import Checker, OptionalKey, Or

from cars.applications.application import Application
from cars.core import cars_logging
from cars.core.utils import safe_makedirs
from cars.orchestrator import orchestrator
from cars.orchestrator.cluster.log_wrapper import cars_profile
from cars.pipelines.parameters import advanced_parameters_constants as adv_cst
from cars.pipelines.parameters import output_constants as out_cst
from cars.pipelines.parameters import output_parameters, sensor_inputs
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
from cars.pipelines.pipeline import Pipeline
from cars.pipelines.pipeline_constants import (
    ADVANCED,
    APPLICATIONS,
    INPUT,
    ORCHESTRATOR,
    OUTPUT,
)
from cars.pipelines.pipeline_template import PipelineTemplate

PIPELINE = "subsampling"


[docs] @Pipeline.register( PIPELINE, ) class SubsamplingPipeline(PipelineTemplate): """ SubsamplingPipeline """ # pylint: disable=too-many-instance-attributes def __init__(self, conf, config_dir=None): # noqa: C901 """ Creates pipeline :param pipeline_name: name of the pipeline. :type pipeline_name: str :param cfg: configuration {'matching_cost_method': value} :type cfg: dictionary :param config_dir: path to dir containing json or yaml file :type config_dir: str """ self.config_dir = config_dir # Transform relative path to absolute path if config_dir is not None: config_dir = os.path.abspath(config_dir) # metadata self.metadata = None # Check global conf self.check_global_schema(conf) if PIPELINE in conf: self.check_pipeline_conf(conf) else: conf[PIPELINE] = {} self.out_dir = conf[OUTPUT][out_cst.OUT_DIRECTORY] self.subsampling_dir = os.path.join(self.out_dir, "subsampling") pipeline_conf = conf.get(PIPELINE, {}) # Check input conf[INPUT] = self.check_inputs(conf[INPUT], config_json_dir=config_dir) # check advanced conf[PIPELINE][ADVANCED] = self.check_advanced( pipeline_conf.get(ADVANCED, {}), conf[INPUT], ) # Get epipolar resolutions to use self.resolutions = conf[PIPELINE][ADVANCED][adv_cst.RESOLUTIONS] # check output conf[OUTPUT] = self.check_output(conf) self.used_conf = {} # Check conf orchestrator self.used_conf[ORCHESTRATOR] = self.check_orchestrator( conf.get(ORCHESTRATOR, None) ) self.used_conf[INPUT] = conf[INPUT] self.used_conf[OUTPUT] = conf[OUTPUT] self.used_conf[ADVANCED] = conf[PIPELINE][ADVANCED] self.used_conf[APPLICATIONS] = self.check_applications( pipeline_conf.get(APPLICATIONS, {}) )
[docs] def check_pipeline_conf(self, conf): """ Check pipeline configuration """ # Validate inputs pipeline_schema = { OptionalKey(ADVANCED): dict, OptionalKey(APPLICATIONS): dict, } checker_inputs = Checker(pipeline_schema) checker_inputs.validate(conf[PIPELINE])
[docs] def check_inputs(self, conf, config_json_dir=None): """ Check the inputs given :param conf: configuration :type conf: dict :param config_dir: directory of used json, if user filled paths with relative paths :type config_dir: str :return: overloader inputs :rtype: dict """ return sensor_inputs.sensors_check_inputs( conf, config_dir=config_json_dir )
[docs] def check_output(self, conf): """ Check the output given :param conf: configuration of output :type conf: dict :return overloader output :rtype : dict """ conf_output, _ = output_parameters.check_output_parameters( conf[INPUT], conf[OUTPUT], None ) return conf_output
[docs] def check_advanced(self, conf, inputs): """ Check all conf for advanced configuration :return: overridden advanced conf :rtype: dict """ overloaded_conf = copy.deepcopy(conf) overloaded_conf[adv_cst.SAVE_INTERMEDIATE_DATA] = conf.get( adv_cst.SAVE_INTERMEDIATE_DATA, False ) overloaded_conf[adv_cst.RESOLUTIONS] = overloaded_conf.get( adv_cst.RESOLUTIONS, [16, 4, 1] ) overloaded_conf[adv_cst.MIN_IMAGE_SIZE] = conf.get( adv_cst.MIN_IMAGE_SIZE, 80 ) if isinstance(overloaded_conf[adv_cst.RESOLUTIONS], int): overloaded_conf[adv_cst.RESOLUTIONS] = [ overloaded_conf[adv_cst.RESOLUTIONS] ] # Check minimal image size sizes = [] for sensor in inputs[sens_cst.SENSORS]: try: image = rio.open( inputs[sens_cst.SENSORS][sensor][sens_cst.INPUT_IMG][ "bands" ]["b0"]["path"] ) image_width = min(image.height, image.width) sizes.append(image_width) except (KeyError, rio.errors.RasterioIOError): logging.warning("Sensor {} not found".format(sensor)) min_resolution = 1 if sizes: min_size = min(sizes) min_resolution = min(overloaded_conf[adv_cst.RESOLUTIONS]) for res in overloaded_conf[adv_cst.RESOLUTIONS][:]: if min_size / res < overloaded_conf[adv_cst.MIN_IMAGE_SIZE]: overloaded_conf[adv_cst.RESOLUTIONS].remove(res) if len(overloaded_conf[adv_cst.RESOLUTIONS]) == 0: logging.warning( "All resolutions are too low for the given images, {} is" " used as resolution".format(min_resolution) ) overloaded_conf[adv_cst.RESOLUTIONS] = [min_resolution] schema = { adv_cst.SAVE_INTERMEDIATE_DATA: Or(dict, bool), adv_cst.RESOLUTIONS: [int], adv_cst.MIN_IMAGE_SIZE: int, } checker_advanced_parameters = Checker(schema) checker_advanced_parameters.validate(overloaded_conf) return overloaded_conf
[docs] def check_applications(self, conf): """ Check the given configuration for applications :param conf: configuration of applications :type conf: dict """ needed_applications = ["sensors_subsampling"] # Check if all specified applications are used # Application in terrain_application are note used in # the sensors_to_dense_depth_maps pipeline for app_key in conf.keys(): if app_key not in needed_applications: msg = ( f"No {app_key} application used in the " + "default Cars pipeline" ) logging.error(msg) raise NameError(msg) used_conf = copy.deepcopy(conf) self.sensors_subsampling = Application( "sensors_subsampling", cfg=used_conf.get("sensors_subsampling", {}), ) used_conf["sensors_subsampling"] = self.sensors_subsampling.get_conf() return used_conf
[docs] def formatting(self, key, out_dir, conf_to_save): """ Format the input.yaml with new inputs """ inputs = conf_to_save[INPUT] sensor = inputs[sens_cst.SENSORS][key] def replace_path(path): """ Replace format by TIF :param path: image path """ return os.path.join(out_dir, key, Path(path).stem + ".tif") sensor[sens_cst.INPUT_IMG]["bands"]["b0"]["path"] = replace_path( sensor[sens_cst.INPUT_IMG]["bands"]["b0"]["path"] ) if ( sens_cst.INPUT_CLASSIFICATION in sensor and sensor[sens_cst.INPUT_CLASSIFICATION] is not None ): sensor[sens_cst.INPUT_CLASSIFICATION]["path"] = replace_path( sensor[sens_cst.INPUT_CLASSIFICATION]["path"] ) if ( sens_cst.INPUT_MSK in sensor and sensor[sens_cst.INPUT_MSK] is not None ): sensor[sens_cst.INPUT_MSK] = replace_path( sensor[sens_cst.INPUT_MSK] ) for band_name, band_info in sensor[sens_cst.INPUT_IMG]["bands"].items(): if band_name == "b0": continue band_info["path"] = replace_path(band_info["path"])
@cars_profile(name="Run_subsampling_pipeline", interval=0.5) def run(self, args=None, log_dir=None): # noqa C901 # pylint: disable=W0613 """ Run pipeline """ cars_logging.add_progress_message("Starting subsampling pipeline") inputs = copy.deepcopy(self.used_conf[INPUT]) if log_dir is not None: self.log_dir = log_dir else: self.log_dir = os.path.join(self.subsampling_dir, "logs") with orchestrator.Orchestrator( orchestrator_conf=self.used_conf[ORCHESTRATOR], out_dir=self.subsampling_dir, log_dir=self.log_dir, out_yaml_path=os.path.join( self.subsampling_dir, out_cst.INFO_FILENAME, ), ) as self.cars_orchestrator: # link metadata self.metadata = self.cars_orchestrator.out_yaml for res in self.resolutions: if res != 1: conf_to_save = copy.deepcopy(self.used_conf) for key, val in inputs[sens_cst.SENSORS].items(): # Define the output directory out_directory = os.path.join( self.subsampling_dir, "res_" + str(res) ) safe_makedirs(out_directory) _ = self.sensors_subsampling.run( key, val, res, out_directory, self.cars_orchestrator, ) self.formatting( key, os.path.abspath(out_directory), conf_to_save ) out_yaml = os.path.abspath( os.path.join(out_directory, "input.yaml") ) with open(out_yaml, "w", encoding="utf-8") as f: yaml.dump( conf_to_save[INPUT], f, default_flow_style=False, sort_keys=False, )