#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=too-many-lines
# attribute-defined-outside-init is disabled so that we can create and use
# attributes however we need, to stick to the "everything is attribute" logic
# introduced in issue#895
# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-nested-blocks
# pylint: disable=C0302
"""
CARS surface modeling pipeline class file
"""
# Standard imports
from __future__ import print_function
import copy
import logging
import os
import warnings
from collections import OrderedDict
import numpy as np
import rasterio
from json_checker import Checker, OptionalKey
from rasterio.errors import NodataShadowWarning
import cars.applications.sparse_matching.sparse_matching_constants as sm_cst
from cars import __version__
# CARS imports
from cars.applications import application_constants
from cars.applications.application import Application
from cars.applications.dem_generation import (
dem_generation_wrappers as dem_wrappers,
)
from cars.core import preprocessing, projection, roi_tools
from cars.core.geometry.abstract_geometry import AbstractGeometry
from cars.core.inputs import get_descriptions_bands
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset
from cars.orchestrator import orchestrator
from cars.orchestrator.cluster.log_wrapper import cars_profile
from cars.pipelines.parameters import advanced_parameters
from cars.pipelines.parameters import advanced_parameters_constants as adv_cst
from cars.pipelines.parameters import application_parameters
from cars.pipelines.parameters import output_constants as out_cst
from cars.pipelines.parameters import output_parameters, sensor_inputs
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
from cars.pipelines.parameters.advanced_parameters_constants import (
USE_ENDOGENOUS_DEM,
)
from cars.pipelines.parameters.output_constants import AUXILIARY
from cars.pipelines.pipeline import Pipeline
from cars.pipelines.pipeline_constants import (
ADVANCED,
APPLICATIONS,
INPUT,
ORCHESTRATOR,
OUTPUT,
TIE_POINTS,
)
from cars.pipelines.pipeline_template import PipelineTemplate
from cars.pipelines.tie_points.tie_points import TiePointsPipeline
PIPELINE = "surface_modeling"
[docs]
@Pipeline.register(PIPELINE)
class SurfaceModelingPipeline(PipelineTemplate):
"""
SurfaceModelingPipeline
"""
# pylint: disable=too-many-instance-attributes
def __init__(
self,
conf,
config_dir=None,
): # noqa: C901
"""
Creates pipeline
Directly creates class attributes:
used_conf
generate_terrain_products
debug_with_roi
save_output_dsm
save_output_depth_map
save_output_point_clouds
geom_plugin_without_dem_and_geoid
geom_plugin_with_dem_and_geoid
:param pipeline_name: name of the pipeline.
:type pipeline_name: str
:param cfg: configuration {'matching_cost_method': value}
:type cfg: dictionary
:param config_dir: path to dir containing json/yaml
:type config_dir: str
"""
# Used conf
self.used_conf = {}
# refined conf
self.refined_conf = {}
# metadata
self.metadata = None
# Transform relative path to absolute path
if config_dir is not None:
config_dir = os.path.abspath(config_dir)
self.config_dir = config_dir
# Check global conf
self.check_global_schema(conf)
if PIPELINE in conf:
self.check_pipeline_conf(conf)
self.out_dir = conf[OUTPUT][out_cst.OUT_DIRECTORY]
# Check conf orchestrator
self.used_conf[ORCHESTRATOR] = self.check_orchestrator(
conf.get(ORCHESTRATOR, None)
)
# Check conf inputs
inputs = self.check_inputs(conf[INPUT], config_dir=config_dir)
self.used_conf[INPUT] = inputs
self.refined_conf[INPUT] = copy.deepcopy(inputs)
initial_elevation = (
inputs[sens_cst.INITIAL_ELEVATION]["dem"] is not None
)
self.elevation_delta_lower_bound = -500 if initial_elevation else -1000
self.elevation_delta_upper_bound = 1000 if initial_elevation else 9000
self.dem_scaling_coeff = None
if inputs[sens_cst.LOW_RES_DSM] is not None:
low_res_dsm = rasterio.open(inputs[sens_cst.LOW_RES_DSM])
self.dem_scaling_coeff = np.mean(low_res_dsm.res) * 2
crs = low_res_dsm.crs
if crs.is_geographic:
xmin = low_res_dsm.bounds.left
ymin = low_res_dsm.bounds.bottom
utm_epsg = preprocessing.get_utm_zone_as_epsg_code(xmin, ymin)
conversion_factor = preprocessing.get_conversion_factor(
low_res_dsm.bounds, utm_epsg, crs.to_epsg()
)
self.dem_scaling_coeff = (
self.dem_scaling_coeff * conversion_factor
)
# Init tie points pipelines
self.tie_point_save = False
if TIE_POINTS in conf and conf[TIE_POINTS] is None:
self.tie_points_pipelines = None
else:
self.tie_points_pipelines = {}
for key1, key2 in inputs[sens_cst.PAIRING]:
pair_key = key1 + "_" + key2
tie_points_config = {}
tie_points_input = {}
tie_points_input[sens_cst.SENSORS] = {
key1: inputs[sens_cst.SENSORS][key1],
key2: inputs[sens_cst.SENSORS][key2],
}
tie_points_input[sens_cst.PAIRING] = [[key1, key2]]
tie_points_input[sens_cst.LOADERS] = inputs[sens_cst.LOADERS]
tie_points_input[sens_cst.INITIAL_ELEVATION] = inputs[
sens_cst.INITIAL_ELEVATION
]
tie_points_config[INPUT] = tie_points_input
tie_points_output = os.path.join(
self.out_dir, TIE_POINTS, pair_key
)
tie_points_config["output"] = {"directory": tie_points_output}
if TIE_POINTS in conf:
tie_points_config[TIE_POINTS] = conf[TIE_POINTS]
self.tie_points_pipelines[pair_key] = TiePointsPipeline(
tie_points_config,
config_dir=self.config_dir,
)
self.used_conf[TIE_POINTS] = {}
self.used_conf[TIE_POINTS][APPLICATIONS] = (
self.tie_points_pipelines[pair_key].used_conf[TIE_POINTS][
APPLICATIONS
]
)
self.used_conf[TIE_POINTS][ADVANCED] = (
self.tie_points_pipelines[pair_key].used_conf[TIE_POINTS][
ADVANCED
]
)
if self.used_conf[TIE_POINTS][ADVANCED][
adv_cst.SAVE_INTERMEDIATE_DATA
]:
self.tie_point_save = True
# Check advanced parameters
# TODO static method in the base class
output_dem_dir = os.path.join(
self.out_dir, "dump_dir", "initial_elevation"
)
pipeline_conf = conf.get(PIPELINE, {})
self.used_conf[PIPELINE] = {}
safe_makedirs(output_dem_dir)
(
inputs,
advanced,
self.geometry_plugin,
self.geom_plugin_without_dem_and_geoid,
self.geom_plugin_with_dem_and_geoid,
self.scaling_coeff,
self.land_cover_map,
self.classification_to_config_mapping,
bounds,
) = advanced_parameters.check_advanced_parameters(
inputs,
pipeline_conf.get(ADVANCED, {}),
output_dem_dir=output_dem_dir,
)
self.used_conf[PIPELINE][ADVANCED] = advanced
self.refined_conf[PIPELINE] = copy.deepcopy(self.used_conf[PIPELINE])
self.refined_conf[PIPELINE][ADVANCED] = copy.deepcopy(advanced)
# Refined conf: resolutions 1
self.refined_conf[PIPELINE][ADVANCED][adv_cst.RESOLUTIONS] = [1]
self.refined_conf["pipeline"] = "surface_modeling"
# Get ROI
(
self.input_roi_poly,
self.input_roi_epsg,
) = roi_tools.generate_roi_poly_from_inputs(
self.used_conf[INPUT][sens_cst.ROI]
)
self.debug_with_roi = self.used_conf[PIPELINE][ADVANCED][
adv_cst.DEBUG_WITH_ROI
]
# Check conf output
(
output,
self.scaling_coeff,
) = self.check_output(inputs, conf[OUTPUT], self.scaling_coeff, bounds)
self.used_conf[OUTPUT] = output
self.out_dir = self.used_conf[OUTPUT][out_cst.OUT_DIRECTORY]
self.dump_dir = os.path.join(self.out_dir, "dump_dir")
self.refined_conf[OUTPUT] = copy.deepcopy(output)
prod_level = output[out_cst.PRODUCT_LEVEL]
self.save_output_dsm = "dsm" in prod_level
self.save_output_depth_map = "depth_map" in prod_level
self.save_output_point_cloud = "point_cloud" in prod_level
self.output_level_none = not (
self.save_output_dsm
or self.save_output_depth_map
or self.save_output_point_cloud
)
# Used classification values, for filling -> will be masked
self.used_classif_values_for_filling = self.get_classif_values_filling(
self.used_conf[INPUT]
)
self.phasing = self.used_conf[PIPELINE][ADVANCED][adv_cst.PHASING]
self.compute_depth_map = not self.output_level_none
if self.output_level_none:
self.infer_conditions_from_applications(conf[PIPELINE])
self.save_all_intermediate_data = self.used_conf[PIPELINE][ADVANCED][
adv_cst.SAVE_INTERMEDIATE_DATA
]
self.save_all_point_clouds_by_pair = self.used_conf[OUTPUT].get(
out_cst.SAVE_BY_PAIR, False
)
# Check conf application
application_conf = self.check_applications(
pipeline_conf.get(APPLICATIONS, {})
)
# Check conf application vs inputs application
application_conf = self.check_applications_with_inputs(
self.used_conf[INPUT], application_conf
)
self.used_conf[PIPELINE][APPLICATIONS] = application_conf
[docs]
def check_pipeline_conf(self, conf):
"""
Check pipeline configuration
"""
# Validate inputs
pipeline_schema = {
OptionalKey(ADVANCED): dict,
OptionalKey(APPLICATIONS): dict,
}
checker_inputs = Checker(pipeline_schema)
checker_inputs.validate(conf[PIPELINE])
[docs]
def quit_on_app(self, app_name):
"""
Returns whether the pipeline should end after
the application was called.
Only works if the output_level is empty, so that
the control is instead given to
"""
if not self.output_level_none:
# custom quit step was not set, never quit early
return False
return self.app_values[app_name] >= self.last_application_to_run
[docs]
def infer_conditions_from_applications(self, conf):
"""
Fills the condition booleans used later in the pipeline by going
through the applications and infering which application we should
end the pipeline on.
"""
self.last_application_to_run = 0
sensor_to_depth_apps = {
"dem_generation": 1,
"grid_generation": 2,
"grid_correction": 3,
"resampling": 4,
"ground_truth_reprojection": 7,
"dense_matching": 9,
"dense_match_filling": 10,
"triangulation": 12,
"point_cloud_outlier_removal.1": 13,
"point_cloud_outlier_removal.2": 14,
}
depth_to_dsm_apps = {
"point_cloud_rasterization": 16,
"dsm_filling.1": 18,
"dsm_filling.2": 19,
"dsm_filling.3": 20,
"auxiliary_filling": 21,
}
self.app_values = {}
self.app_values.update(sensor_to_depth_apps)
self.app_values.update(depth_to_dsm_apps)
app_conf = conf.get(APPLICATIONS, {})
for key in app_conf:
if adv_cst.SAVE_INTERMEDIATE_DATA not in app_conf[key]:
continue
if not app_conf[key][adv_cst.SAVE_INTERMEDIATE_DATA]:
continue
if key in sensor_to_depth_apps:
self.compute_depth_map = True
self.last_application_to_run = max(
self.last_application_to_run, self.app_values[key]
)
elif key in depth_to_dsm_apps:
self.compute_depth_map = True
# enabled to start the depth map to dsm process
self.save_output_dsm = True
self.last_application_to_run = max(
self.last_application_to_run, self.app_values[key]
)
else:
warn_msg = (
"The application {} was not recognized. Its configuration"
"will be ignored."
).format(key)
logging.warning(warn_msg)
if not (self.compute_depth_map or self.save_output_dsm):
log_msg = (
"No product level was given. CARS has not detected any "
"data you wish to save. No computation will be done."
)
logging.info(log_msg)
else:
log_msg = (
"No product level was given. CARS has detected that you "
+ "wish to run up to the {} application.".format(
next(
k
for k, v in self.app_values.items()
if v == self.last_application_to_run
)
)
)
logging.warning(log_msg)
[docs]
def save_configurations(self):
"""
Save used_conf and refined_conf configurations
"""
cars_dataset.save_dict(
self.used_conf,
os.path.join(self.out_dir, "current_res_used_conf.yaml"),
)
cars_dataset.save_dict(
self.refined_conf,
os.path.join(self.out_dir, "refined_conf.yaml"),
)
[docs]
def check_output(self, inputs, conf, scaling_coeff, bounds):
"""
Check the output given
:param conf: configuration of output
:type conf: dict
:param scaling_coeff: scaling factor for resolution
:type scaling_coeff: float
:return: overloader output
:rtype: dict
"""
return output_parameters.check_output_parameters(
inputs, conf, scaling_coeff, bounds
)
[docs]
def check_applications( # noqa: C901 : too complex
self,
conf,
):
"""
Check the given configuration for applications,
and generates needed applications for pipeline.
:param conf: configuration of applications
:type conf: dict
"""
scaling_coeff = self.scaling_coeff
needed_applications = application_parameters.get_needed_apps(
True,
self.save_output_dsm,
self.save_output_point_cloud,
conf,
)
# Check if all specified applications are used
# Application in terrain_application are note used in
# the sensors_to_dense_depth_maps pipeline
for app_key in conf.keys():
if app_key not in needed_applications:
msg = (
f"No {app_key} application used in the "
+ "default Cars pipeline"
)
logging.error(msg)
raise NameError(msg)
# Initialize used config
used_conf = {}
for app_key in needed_applications:
used_conf[app_key] = conf.get(app_key, {})
if used_conf[app_key] is None:
continue
used_conf[app_key]["save_intermediate_data"] = (
self.save_all_intermediate_data
or used_conf[app_key].get("save_intermediate_data", False)
)
self.epipolar_grid_generation_application = None
self.resampling_application = None
self.ground_truth_reprojection = None
self.dense_match_filling = None
self.sparse_mtch_app = None
self.dense_matching_app = None
self.triangulation_application = None
self.dem_generation_application = None
self.pc_outlier_removal_apps = {}
self.rasterization_application = None
self.pc_fusion_application = None
self.grid_correction_app = None
# Epipolar grid generation
self.epipolar_grid_generation_application = Application(
"grid_generation",
cfg=used_conf.get("grid_generation", {}),
scaling_coeff=scaling_coeff,
)
used_conf["grid_generation"] = (
self.epipolar_grid_generation_application.get_conf()
)
# Epipolar grid correction
self.grid_correction_app = Application(
"grid_correction",
cfg=used_conf.get("grid_correction", {}),
)
used_conf["grid_correction"] = self.grid_correction_app.get_conf()
# image resampling
self.resampling_application = Application(
"resampling",
cfg=used_conf.get("resampling", {}),
scaling_coeff=scaling_coeff,
)
used_conf["resampling"] = self.resampling_application.get_conf()
# ground truth disparity map computation
if self.used_conf[PIPELINE][ADVANCED][adv_cst.GROUND_TRUTH_DSM]:
used_conf["ground_truth_reprojection"][
"save_intermediate_data"
] = True
if isinstance(
self.used_conf[PIPELINE][ADVANCED][adv_cst.GROUND_TRUTH_DSM],
str,
):
self.used_conf[PIPELINE][ADVANCED][adv_cst.GROUND_TRUTH_DSM] = {
"dsm": self.used_conf[PIPELINE][ADVANCED][
adv_cst.GROUND_TRUTH_DSM
]
}
self.ground_truth_reprojection = Application(
"ground_truth_reprojection",
cfg=used_conf.get("ground_truth_reprojection", {}),
scaling_coeff=scaling_coeff,
)
# disparity filling
self.dense_match_filling = Application(
"dense_match_filling",
cfg=used_conf.get(
"dense_match_filling",
{"method": "zero_padding"},
),
scaling_coeff=scaling_coeff,
)
used_conf["dense_match_filling"] = self.dense_match_filling.get_conf()
# Matching
generate_performance_map = bool(
self.used_conf[OUTPUT][out_cst.AUXILIARY][
out_cst.AUX_PERFORMANCE_MAP
]
)
generate_ambiguity = (
self.used_conf[OUTPUT]
.get(out_cst.AUXILIARY, {})
.get(out_cst.AUX_AMBIGUITY, False)
)
dense_matching_config = used_conf.get("dense_matching", {})
if generate_ambiguity is True:
dense_matching_config["generate_ambiguity"] = True
if (
generate_performance_map is True
and dense_matching_config.get("performance_map_method", None)
is None
):
dense_matching_config["performance_map_method"] = "risk"
# particular case for some epipolar resolutions
if not dense_matching_config:
used_conf["dense_matching"]["performance_map_method"] = [
"risk",
"intervals",
]
self.dense_matching_app = Application(
"dense_matching",
cfg=dense_matching_config,
scaling_coeff=scaling_coeff,
)
used_conf["dense_matching"] = self.dense_matching_app.get_conf()
# Triangulation
self.triangulation_application = Application(
"triangulation",
cfg=used_conf.get("triangulation", {}),
scaling_coeff=scaling_coeff,
)
used_conf["triangulation"] = self.triangulation_application.get_conf()
# MNT generation
self.dem_generation_application = Application(
"dem_generation",
cfg=used_conf.get("dem_generation", {}),
scaling_coeff=(
self.dem_scaling_coeff
if self.dem_scaling_coeff is not None
else scaling_coeff
),
)
used_conf["dem_generation"] = self.dem_generation_application.get_conf()
for app_key, app_conf in used_conf.items():
if not app_key.startswith("point_cloud_outlier_removal"):
continue
if app_conf is None:
self.pc_outlier_removal_apps = {}
# keep over multiple runs
used_conf["point_cloud_outlier_removal"] = None
break
if app_key in self.pc_outlier_removal_apps:
msg = (
f"The key {app_key} is defined twice in the input "
"configuration."
)
logging.error(msg)
raise NameError(msg)
if app_key[27:] == ".1":
app_conf.setdefault("method", "small_components")
if app_key[27:] == ".2":
app_conf.setdefault("method", "statistical")
self.pc_outlier_removal_apps[app_key] = Application(
"point_cloud_outlier_removal",
cfg=app_conf,
scaling_coeff=scaling_coeff,
)
used_conf[app_key] = self.pc_outlier_removal_apps[
app_key
].get_conf()
methods_str = "\n".join(
f" - {k}={a.used_method}"
for k, a in self.pc_outlier_removal_apps.items()
)
logging.info(
"{} point cloud outlier removal apps registered:\n{}".format(
len(self.pc_outlier_removal_apps), methods_str
)
)
if self.save_output_dsm:
# Rasterization
self.rasterization_application = Application(
"point_cloud_rasterization",
cfg=used_conf.get("point_cloud_rasterization", {}),
scaling_coeff=scaling_coeff,
)
used_conf["point_cloud_rasterization"] = (
self.rasterization_application.get_conf()
)
return used_conf
[docs]
def get_classif_values_filling(self, inputs):
"""
Get values in classif, used for filling
:param inputs: inputs
:type inputs: dict
:return: list of values
:rtype: list
"""
filling_classif_values = []
if sens_cst.FILLING not in inputs or inputs[sens_cst.FILLING] is None:
logging.info("No filling in input configuration")
return None
filling_classif_values = []
for _, classif_values in inputs[sens_cst.FILLING].items():
# Add new value to filling bands
if classif_values is not None:
if isinstance(classif_values, str):
classif_values = [classif_values]
filling_classif_values += classif_values
simplified_list = list(OrderedDict.fromkeys(filling_classif_values))
res_as_string_list = [str(value) for value in simplified_list]
return res_as_string_list
[docs]
def sensor_to_depth_maps(self): # noqa: C901
"""
Creates the depth map from the sensor images given in the input,
by following the CARS pipeline's steps.
"""
# pylint:disable=too-many-return-statements
inputs = self.used_conf[INPUT]
output = self.used_conf[OUTPUT]
# Initialize epsg for terrain tiles
self.phasing = self.used_conf[PIPELINE][ADVANCED][adv_cst.PHASING]
if self.phasing is not None:
self.epsg = self.phasing["epsg"]
else:
self.epsg = output[out_cst.EPSG]
if self.epsg is not None:
# Compute roi polygon, in output EPSG
self.roi_poly = preprocessing.compute_roi_poly(
self.input_roi_poly, self.input_roi_epsg, self.epsg
)
self.resolution = output[out_cst.RESOLUTION]
# List of terrain roi corresponding to each epipolar pair
# Used to generate final terrain roi
self.list_terrain_roi = []
# Polygons representing the intersection of each pair of images
# Used to fill the final DSM only inside of those Polygons
self.list_intersection_poly = []
# initialize lists of points
self.list_epipolar_point_clouds = []
sensor_inputs.load_geomodels(
inputs, self.geom_plugin_without_dem_and_geoid
)
self.list_sensor_pairs = sensor_inputs.generate_pairs(
self.used_conf[INPUT]
)
logging.info(
"Received {} stereo pairs configurations".format(
len(self.list_sensor_pairs)
)
)
_ = output_parameters.intialize_product_index(
self.cars_orchestrator,
output["product_level"],
[sensor_pair[0] for sensor_pair in self.list_sensor_pairs],
)
# pairs is a dict used to store the CarsDataset of
# all pairs, easily retrievable with pair keys
self.pairs = {}
# triangulated_matches_list is used to store triangulated matches
# used in dem generation
self.triangulated_matches_list = []
save_corrected_grid = (
self.epipolar_grid_generation_application.get_save_grids()
)
# Compute dems
dems = {}
if self.used_conf[INPUT][sens_cst.LOW_RES_DSM] is not None:
# Create dsm directory
dsm_dir = os.path.join(
self.used_conf[OUTPUT][out_cst.OUT_DIRECTORY],
"dsm",
)
safe_makedirs(dsm_dir)
# DSM file name
dsm_file_name = self.used_conf[INPUT][sens_cst.LOW_RES_DSM]
dem_generation_output_dir = os.path.join(
self.dump_dir, "dem_generation"
)
safe_makedirs(dem_generation_output_dir)
# Use initial elevation if provided, and generate dems
_, paths, _ = self.dem_generation_application.run(
dsm_file_name,
dem_generation_output_dir,
input_geoid=self.used_conf[INPUT][sens_cst.INITIAL_ELEVATION][
sens_cst.GEOID
],
output_geoid=self.used_conf[OUTPUT][out_cst.OUT_GEOID],
initial_elevation=(
self.used_conf[INPUT][sens_cst.INITIAL_ELEVATION][
sens_cst.DEM_PATH
]
),
default_alt=self.geom_plugin_with_dem_and_geoid.default_alt,
cars_orchestrator=self.cars_orchestrator,
)
if self.quit_on_app("dem_generation"):
return True
dem_median = paths["dem_median"]
dem_min = paths["dem_min"]
dem_max = paths["dem_max"]
dems["dem_median"] = dem_median
dems["dem_min"] = dem_min
dems["dem_max"] = dem_max
# Override initial elevation
if (
inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH] is None
or "dem_median"
in inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH]
):
inputs[sens_cst.INITIAL_ELEVATION][
sens_cst.DEM_PATH
] = dem_median
elif (
inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH]
is not None
and self.used_conf[PIPELINE][ADVANCED][USE_ENDOGENOUS_DEM]
):
inputs[sens_cst.INITIAL_ELEVATION][
sens_cst.DEM_PATH
] = dem_median
# Check advanced parameters with new initial elevation
output_dem_dir = os.path.join(
self.used_conf[OUTPUT][out_cst.OUT_DIRECTORY],
"dump_dir",
"initial_elevation",
)
safe_makedirs(output_dem_dir)
(
inputs,
_,
self.geometry_plugin,
self.geom_plugin_without_dem_and_geoid,
self.geom_plugin_with_dem_and_geoid,
_,
_,
_,
_,
) = advanced_parameters.check_advanced_parameters(
inputs,
self.used_conf.get(PIPELINE, {}).get(ADVANCED, {}),
output_dem_dir=output_dem_dir,
)
for (
pair_key,
sensor_image_left,
sensor_image_right,
) in self.list_sensor_pairs:
# initialize pairs for current pair
self.pairs[pair_key] = {}
self.pairs[pair_key]["sensor_image_left"] = sensor_image_left
self.pairs[pair_key]["sensor_image_right"] = sensor_image_right
# Run applications
# Run grid generation
# We generate grids with dem if it is provided.
# If not provided, grid are generated without dem and a dem
# will be generated, to use later for a new grid generation**
if inputs[sens_cst.INITIAL_ELEVATION][sens_cst.DEM_PATH] is None:
geom_plugin = self.geom_plugin_without_dem_and_geoid
else:
geom_plugin = self.geom_plugin_with_dem_and_geoid
# Generate rectification grids
(
self.pairs[pair_key]["grid_left"],
self.pairs[pair_key]["grid_right"],
) = self.epipolar_grid_generation_application.run(
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
geom_plugin,
orchestrator=self.cars_orchestrator,
pair_folder=os.path.join(
self.dump_dir,
"epipolar_grid_generation",
"initial",
pair_key,
),
pair_key=pair_key,
resolution=self.working_res,
)
if self.quit_on_app("grid_generation"):
continue # keep iterating over pairs, but don't go further
# Prepare tie point pipeline
# Update tie points pipeline with rectification grids
if self.tie_points_pipelines is not None:
tie_points_config = self.tie_points_pipelines[
pair_key
].used_conf
image_keys = list(tie_points_config[INPUT][sens_cst.SENSORS])
tie_points_config[INPUT][sens_cst.RECTIFICATION_GRIDS] = {
image_keys[0]: self.pairs[pair_key]["grid_left"],
image_keys[1]: self.pairs[pair_key]["grid_right"],
}
tie_points_pipeline = TiePointsPipeline(
tie_points_config,
config_dir=self.config_dir,
)
sparse_mtch_app = tie_points_pipeline.sparse_matching_app
tie_points_output = tie_points_config[OUTPUT][
out_cst.OUT_DIRECTORY
]
# If dem are provided, launch disparity grids generation
disp_range_grid_dir = os.path.join(
tie_points_output, "disparity_grids"
)
disp_range_grid = None
if dems:
disp_range_grid = (
self.dense_matching_app.generate_disparity_grids(
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["grid_right"],
self.geom_plugin_with_dem_and_geoid,
dem_min=dem_min,
dem_max=dem_max,
pair_folder=disp_range_grid_dir,
orchestrator=self.cars_orchestrator,
)
)
# Launch tie points pipeline
tie_points_pipeline.run(
disp_range_grid=disp_range_grid,
log_dir=self.log_dir,
cars_orchestrator=self.cars_orchestrator,
)
self.pairs[pair_key]["matches_array"] = np.load(
os.path.join(
tie_points_output, pair_key, "filtered_matches.npy"
)
)
minimum_nb_matches = (
self.grid_correction_app.get_minimum_nb_matches()
)
nb_matches = self.pairs[pair_key]["matches_array"].shape[0]
save_matches = sparse_mtch_app.save_intermediate_data
if nb_matches > minimum_nb_matches:
# Compute grid correction
(self.pairs[pair_key]["corrected_grid_right"], _, _, _) = (
self.grid_correction_app.run(
self.pairs[pair_key]["matches_array"],
self.pairs[pair_key]["grid_right"],
save_matches=save_matches,
pair_folder=os.path.join(
self.dump_dir,
"grid_correction",
"initial",
pair_key,
),
pair_key=pair_key,
orchestrator=self.cars_orchestrator,
save_corrected_grid=save_corrected_grid,
)
)
else:
logging.warning(
"Grid correction is not applied because number of "
"matches found ({}) is less than minimum number of "
"matches required for grid correction ({})".format(
nb_matches,
minimum_nb_matches,
)
)
self.pairs[pair_key]["corrected_grid_right"] = self.pairs[
pair_key
]["grid_right"]
else:
self.pairs[pair_key]["corrected_grid_right"] = self.pairs[
pair_key
]["grid_right"]
self.pairs[pair_key]["corrected_grid_left"] = self.pairs[pair_key][
"grid_left"
]
# Shrink disparity intervals according to SIFT disparities
disp_to_alt_ratio = self.pairs[pair_key]["grid_left"][
"disp_to_alt_ratio"
]
if self.tie_points_pipelines is not None:
disp_bounds_params = sparse_mtch_app.disparity_bounds_estimation
matches = self.pairs[pair_key]["matches_array"]
if disp_bounds_params["activated"] and matches.shape[0] > 0:
sift_disp = matches[:, 2] - matches[:, 0]
disp_min = np.percentile(
sift_disp, disp_bounds_params["percentile"]
)
disp_max = np.percentile(
sift_disp, 100 - disp_bounds_params["percentile"]
)
logging.info(
"Global disparity interval without margin : "
f"[{disp_min:.2f} pix, {disp_max:.2f} pix]"
)
disp_min -= (
disp_bounds_params["upper_margin"] / disp_to_alt_ratio
)
disp_max += (
disp_bounds_params["lower_margin"] / disp_to_alt_ratio
)
logging.info(
"Global disparity interval with margin : "
f"[{disp_min:.2f} pix, {disp_max:.2f} pix]"
)
else:
disp_min = (
-self.elevation_delta_upper_bound / disp_to_alt_ratio
)
disp_max = (
-self.elevation_delta_lower_bound / disp_to_alt_ratio
)
logging.info(
"Global disparity interval : "
f"[{disp_min:.2f} pix, {disp_max:.2f} pix]"
)
else:
disp_min = -self.elevation_delta_upper_bound / disp_to_alt_ratio
disp_max = -self.elevation_delta_lower_bound / disp_to_alt_ratio
logging.info(
"Global disparity interval : "
f"[{disp_min:.2f} pix, {disp_max:.2f} pix]"
)
if self.epsg is None:
# compute epsg
# Epsg uses global disparity min and max
self.epsg = preprocessing.compute_epsg(
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["corrected_grid_right"],
self.geom_plugin_with_dem_and_geoid,
disp_min=0,
disp_max=0,
)
# Compute roi polygon, in input EPSG
self.roi_poly = preprocessing.compute_roi_poly(
self.input_roi_poly, self.input_roi_epsg, self.epsg
)
# Clean grids at the end of processing if required. Note that this will
# also clean refined grids
if not save_corrected_grid:
self.cars_orchestrator.add_to_clean(
os.path.join(self.dump_dir, "grid_correction")
)
# grids file are already cleaned in the application, but the tree
# structure should also be cleaned
if not save_corrected_grid:
self.cars_orchestrator.add_to_clean(
os.path.join(self.dump_dir, "epipolar_grid_generation")
)
# quit if any app in the loop over the pairs was the last one
if self.quit_on_app("grid_generation") or self.quit_on_app(
"resampling"
):
return True
# Define param
use_global_disp_range = self.dense_matching_app.use_global_disp_range
self.pairs_names = [
pair_name for pair_name, _, _ in self.list_sensor_pairs
]
for _, (pair_key, _, _) in enumerate(self.list_sensor_pairs):
# Geometry plugin with dem will be used for the grid generation
geom_plugin = self.geom_plugin_with_dem_and_geoid
# saved used configuration
self.save_configurations()
# Generate min and max disp grids
# Global disparity min and max will be computed from
# these grids
dense_matching_pair_folder = os.path.join(
self.dump_dir, "dense_matching", pair_key
)
if self.which_resolution in ("first", "single") and dems in (
None,
{},
):
dmin = disp_min
dmax = disp_max
# generate_disparity_grids runs orchestrator.breakpoint()
self.pairs[pair_key]["disp_range_grid"] = (
self.dense_matching_app.generate_disparity_grids(
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_right"],
self.geom_plugin_with_dem_and_geoid,
dmin=dmin,
dmax=dmax,
pair_folder=dense_matching_pair_folder,
orchestrator=self.cars_orchestrator,
)
)
updating_infos = {
application_constants.APPLICATION_TAG: {
sm_cst.DISPARITY_RANGE_COMPUTATION_TAG: {
pair_key: {
sm_cst.MINIMUM_DISPARITY_TAG: dmin,
sm_cst.MAXIMUM_DISPARITY_TAG: dmax,
}
}
}
}
self.cars_orchestrator.update_out_info(updating_infos)
else:
# Generate min and max disp grids from dems
# generate_disparity_grids runs orchestrator.breakpoint()
self.pairs[pair_key]["disp_range_grid"] = (
self.dense_matching_app.generate_disparity_grids(
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_right"],
self.geom_plugin_with_dem_and_geoid,
dem_min=dem_min,
dem_max=dem_max,
pair_folder=dense_matching_pair_folder,
orchestrator=self.cars_orchestrator,
)
)
if use_global_disp_range:
# Generate min and max disp grids from constants
# sensor image is not used here
# TODO remove when only local diparity range will be used
dmin = self.pairs[pair_key]["disp_range_grid"]["global_min"]
dmax = self.pairs[pair_key]["disp_range_grid"]["global_max"]
# update orchestrator_out_json
updating_infos = {
application_constants.APPLICATION_TAG: {
sm_cst.DISPARITY_RANGE_COMPUTATION_TAG: {
pair_key: {
sm_cst.MINIMUM_DISPARITY_TAG: dmin,
sm_cst.MAXIMUM_DISPARITY_TAG: dmax,
}
}
}
}
self.cars_orchestrator.update_out_info(updating_infos)
# generate_disparity_grids runs orchestrator.breakpoint()
self.pairs[pair_key]["disp_range_grid"] = (
self.dense_matching_app.generate_disparity_grids(
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_right"],
self.geom_plugin_with_dem_and_geoid,
dmin=dmin,
dmax=dmax,
pair_folder=dense_matching_pair_folder,
orchestrator=self.cars_orchestrator,
)
)
# saved used configuration
self.save_configurations()
# end of for loop, to finish computing disparity range grids
for cloud_id, (pair_key, _, _) in enumerate(self.list_sensor_pairs):
# Generate roi
epipolar_roi = preprocessing.compute_epipolar_roi(
self.input_roi_poly,
self.input_roi_epsg,
self.geom_plugin_with_dem_and_geoid,
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["corrected_grid_right"],
os.path.join(self.dump_dir, "compute_epipolar_roi", pair_key),
disp_min=self.pairs[pair_key]["disp_range_grid"]["global_min"],
disp_max=self.pairs[pair_key]["disp_range_grid"]["global_max"],
)
# Generate new epipolar images
# Generated with corrected grids
# Optimal size is computed for the worst case scenario
# found with epipolar disparity range grids
(
optimum_tile_size,
local_tile_optimal_size_fun,
) = self.dense_matching_app.get_optimal_tile_size(
self.pairs[pair_key]["disp_range_grid"],
self.cars_orchestrator.cluster.checked_conf_cluster[
"max_ram_per_worker"
],
)
# Get required bands of third resampling
required_bands = self.dense_matching_app.get_required_bands()
# Add left required bands for texture
required_bands["left"] = sorted(
set(required_bands["left"]).union(set(self.texture_bands))
)
# Find index of texture band in left_dataset
texture_bands_indices = [
required_bands["left"].index(band)
for band in self.texture_bands
]
# Get margins used in dense matching,
dense_matching_margins_fun = (
self.dense_matching_app.get_margins_fun(
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["disp_range_grid"],
)
)
# Run third epipolar resampling
(
new_epipolar_image_left,
new_epipolar_image_right,
) = self.resampling_application.run(
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["corrected_grid_right"],
geom_plugin,
orchestrator=self.cars_orchestrator,
pair_folder=os.path.join(
self.dump_dir, "resampling", "corrected_grid", pair_key
),
pair_key=pair_key,
margins_fun=dense_matching_margins_fun,
tile_width=optimum_tile_size,
tile_height=optimum_tile_size,
add_classif=True,
epipolar_roi=epipolar_roi,
required_bands=required_bands,
texture_bands=self.texture_bands,
)
# Run ground truth dsm computation
if self.used_conf[PIPELINE][ADVANCED][adv_cst.GROUND_TRUTH_DSM]:
self.used_conf[PIPELINE][APPLICATIONS][
"ground_truth_reprojection"
]["save_intermediate_data"] = True
new_geomplugin_dsm = AbstractGeometry( # pylint: disable=E0110
self.geometry_plugin,
dem=self.used_conf[PIPELINE][ADVANCED][
adv_cst.GROUND_TRUTH_DSM
][adv_cst.INPUT_GROUND_TRUTH_DSM],
geoid=self.used_conf[PIPELINE][ADVANCED][
adv_cst.GROUND_TRUTH_DSM
][adv_cst.INPUT_GEOID],
scaling_coeff=self.scaling_coeff,
)
self.ground_truth_reprojection.run(
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["corrected_grid_right"],
new_geomplugin_dsm,
self.geom_plugin_with_dem_and_geoid,
self.pairs[pair_key]["corrected_grid_left"][
"disp_to_alt_ratio"
],
self.used_conf[PIPELINE][ADVANCED][
adv_cst.GROUND_TRUTH_DSM
][adv_cst.INPUT_AUX_PATH],
self.used_conf[PIPELINE][ADVANCED][
adv_cst.GROUND_TRUTH_DSM
][adv_cst.INPUT_AUX_INTERP],
orchestrator=self.cars_orchestrator,
pair_folder=os.path.join(
self.dump_dir, "ground_truth_reprojection", pair_key
),
)
if self.epsg is None:
# compute epsg
# Epsg uses global disparity min and max
self.epsg = preprocessing.compute_epsg(
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["corrected_grid_right"],
self.geom_plugin_with_dem_and_geoid,
disp_min=self.pairs[pair_key]["disp_range_grid"][
"global_min"
],
disp_max=self.pairs[pair_key]["disp_range_grid"][
"global_max"
],
)
# Compute roi polygon, in input EPSG
self.roi_poly = preprocessing.compute_roi_poly(
self.input_roi_poly, self.input_roi_epsg, self.epsg
)
self.vertical_crs = projection.get_output_crs(self.epsg, output)
if (
self.save_output_dsm
or self.save_output_point_cloud
or self.dense_matching_app.get_method() == "pandora_auto"
):
# Compute terrain bounding box /roi related to
# current images
(current_terrain_roi_bbox, intersection_poly) = (
preprocessing.compute_terrain_bbox(
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
new_epipolar_image_left,
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["corrected_grid_right"],
self.epsg,
self.geom_plugin_with_dem_and_geoid,
resolution=self.resolution,
disp_min=self.pairs[pair_key]["disp_range_grid"][
"global_min"
],
disp_max=self.pairs[pair_key]["disp_range_grid"][
"global_max"
],
roi_poly=(
None if self.debug_with_roi else self.roi_poly
),
orchestrator=self.cars_orchestrator,
pair_key=pair_key,
pair_folder=os.path.join(
self.dump_dir, "terrain_bbox", pair_key
),
check_inputs=False,
)
)
self.list_terrain_roi.append(current_terrain_roi_bbox)
self.list_intersection_poly.append(intersection_poly)
# compute terrain bounds for later use
(
self.terrain_bounds,
self.optimal_terrain_tile_width,
) = preprocessing.compute_terrain_bounds(
self.list_terrain_roi,
roi_poly=(None if self.debug_with_roi else self.roi_poly),
resolution=self.resolution,
)
if self.which_resolution not in ("final", "single"):
self.terrain_bounds = dem_wrappers.modify_terrain_bounds(
self.terrain_bounds,
self.dem_generation_application.margin[0],
self.dem_generation_application.margin[1],
self.epsg,
)
if self.dense_matching_app.get_method() == "pandora_auto":
# Copy the initial corr_config in order to keep
# the inputs that have already been checked
method = self.dense_matching_app.dense_matching_method
corr_cfg = method.corr_config.copy()
# Find the conf that correspond to the land cover map
conf = self.dense_matching_app.loader.find_auto_conf(
intersection_poly,
self.land_cover_map,
self.classification_to_config_mapping,
self.epsg,
)
# Update the used_conf if order to reinitialize
# the dense matching app
# Because we kept the information regarding the ambiguity,
# performance_map calculus..
self.used_conf[PIPELINE][APPLICATIONS]["dense_matching"][
"loader_conf"
] = conf
self.used_conf[PIPELINE][APPLICATIONS]["dense_matching"][
"method"
] = "pandora_custom"
# Re initialization of the dense matching application
self.dense_matching_app = Application(
"dense_matching",
cfg=self.used_conf[PIPELINE][APPLICATIONS][
"dense_matching"
],
)
# Update the corr_config with the inputs that have
# already been checked
self.dense_matching_app.dense_matching_method.corr_config[
"input"
] = corr_cfg["input"]
# Run epipolar matching application
epipolar_disparity_map = self.dense_matching_app.run(
new_epipolar_image_left,
new_epipolar_image_right,
local_tile_optimal_size_fun,
orchestrator=self.cars_orchestrator,
pair_folder=os.path.join(
self.dump_dir, "dense_matching", pair_key
),
pair_key=pair_key,
disp_range_grid=self.pairs[pair_key]["disp_range_grid"],
compute_disparity_masks=False,
margins_to_keep=sum(
app.get_epipolar_margin()
for _, app in self.pc_outlier_removal_apps.items()
),
texture_bands=texture_bands_indices,
classif_bands_to_mask=self.used_classif_values_for_filling,
)
if self.quit_on_app("dense_matching"):
continue # keep iterating over pairs, but don't go further
# Fill with zeros
(filled_epipolar_disparity_map) = self.dense_match_filling.run(
epipolar_disparity_map,
orchestrator=self.cars_orchestrator,
pair_folder=os.path.join(
self.dump_dir, "dense_match_filling", pair_key
),
pair_key=pair_key,
)
if self.quit_on_app("dense_match_filling"):
continue # keep iterating over pairs, but don't go further
if isinstance(output[sens_cst.GEOID], str):
output_geoid_path = output[sens_cst.GEOID]
elif (
isinstance(output[sens_cst.GEOID], bool)
and output[sens_cst.GEOID]
):
package_path = os.path.dirname(__file__)
output_geoid_path = os.path.join(
package_path,
"..",
"..",
"conf",
sensor_inputs.CARS_GEOID_PATH,
)
else:
# default case : stay on the ellipsoid
output_geoid_path = None
depth_map_dir = None
if self.save_output_depth_map:
depth_map_dir = os.path.join(
self.out_dir, "depth_map", pair_key
)
safe_makedirs(depth_map_dir)
point_cloud_dir = None
if self.save_output_point_cloud:
point_cloud_dir = os.path.join(
self.out_dir, "point_cloud", pair_key
)
safe_makedirs(point_cloud_dir)
triangulation_point_cloud_dir = (
point_cloud_dir
if (point_cloud_dir and len(self.pc_outlier_removal_apps) == 0)
else None
)
# Run epipolar triangulation application
epipolar_point_cloud = self.triangulation_application.run(
self.pairs[pair_key]["sensor_image_left"],
self.pairs[pair_key]["sensor_image_right"],
self.pairs[pair_key]["corrected_grid_left"],
self.pairs[pair_key]["corrected_grid_right"],
filled_epipolar_disparity_map,
self.geom_plugin_without_dem_and_geoid,
new_epipolar_image_left,
epsg=self.epsg,
denoising_overload_fun=None,
source_pc_names=self.pairs_names,
orchestrator=self.cars_orchestrator,
pair_dump_dir=os.path.join(
self.dump_dir, "triangulation", pair_key
),
pair_key=pair_key,
uncorrected_grid_right=self.pairs[pair_key]["grid_right"],
geoid_path=output_geoid_path,
cloud_id=cloud_id,
performance_maps_param=(
self.dense_matching_app.get_performance_map_parameters()
),
depth_map_dir=depth_map_dir,
point_cloud_dir=triangulation_point_cloud_dir,
save_output_coordinates=(len(self.pc_outlier_removal_apps) == 0)
and (
self.save_output_depth_map or self.save_output_point_cloud
),
save_output_color=bool(depth_map_dir)
and self.auxiliary[out_cst.AUX_IMAGE],
save_output_classification=bool(depth_map_dir)
and self.auxiliary[out_cst.AUX_CLASSIFICATION],
save_output_filling=bool(depth_map_dir)
and self.auxiliary[out_cst.AUX_FILLING],
save_output_performance_map=bool(depth_map_dir)
and self.auxiliary[out_cst.AUX_PERFORMANCE_MAP],
save_output_ambiguity=bool(depth_map_dir)
and self.auxiliary[out_cst.AUX_AMBIGUITY],
save_output_edges=bool(depth_map_dir)
and self.auxiliary[out_cst.AUX_EDGES],
)
if self.quit_on_app("triangulation"):
continue # keep iterating over pairs, but don't go further
filtered_epipolar_point_cloud = epipolar_point_cloud
for app_key, app in self.pc_outlier_removal_apps.items():
app_key_is_last = (
app_key == list(self.pc_outlier_removal_apps)[-1]
)
filtering_depth_map_dir = (
depth_map_dir if app_key_is_last else None
)
filtering_point_cloud_dir = (
point_cloud_dir if app_key_is_last else None
)
filtered_epipolar_point_cloud = app.run(
filtered_epipolar_point_cloud,
depth_map_dir=filtering_depth_map_dir,
point_cloud_dir=filtering_point_cloud_dir,
dump_dir=os.path.join(
self.dump_dir,
( # pylint: disable=inconsistent-quotes
f"pc_outlier_removal"
f"{str(app_key[27:]).replace('.', '_')}"
),
pair_key,
),
epsg=self.epsg,
orchestrator=self.cars_orchestrator,
)
if self.quit_on_app("point_cloud_outlier_removal"):
continue # keep iterating over pairs, but don't go further
self.list_epipolar_point_clouds.append(
filtered_epipolar_point_cloud
)
# quit if any app in the loop over the pairs was the last one
# pylint:disable=too-many-boolean-expressions
if (
self.quit_on_app("dense_matching")
or self.quit_on_app("dense_match_filling")
or self.quit_on_app("triangulation")
or self.quit_on_app("point_cloud_outlier_removal.1")
or self.quit_on_app("point_cloud_outlier_removal.2")
):
return True
return False
[docs]
def rasterize_point_cloud(self):
"""
Final step of the pipeline: rasterize the point
cloud created in the prior steps.
"""
self.rasterization_dump_dir = os.path.join(
self.dump_dir, "rasterization"
)
dsm_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"dsm.tif",
)
if self.save_output_dsm
else None
)
weights_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"weights.tif",
)
if self.save_output_dsm
and self.used_conf[OUTPUT][out_cst.AUXILIARY][out_cst.AUX_WEIGHTS]
else None
)
color_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"image.tif",
)
if self.save_output_dsm
and self.used_conf[OUTPUT][out_cst.AUXILIARY][out_cst.AUX_IMAGE]
else None
)
performance_map_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"performance_map.tif",
)
if self.save_output_dsm
and self.used_conf[OUTPUT][out_cst.AUXILIARY][
out_cst.AUX_PERFORMANCE_MAP
]
else None
)
ambiguity_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"ambiguity.tif",
)
if self.save_output_dsm
and self.used_conf[OUTPUT][out_cst.AUXILIARY][out_cst.AUX_AMBIGUITY]
else None
)
classif_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"classification.tif",
)
if self.save_output_dsm
and self.used_conf[OUTPUT][out_cst.AUXILIARY][
out_cst.AUX_CLASSIFICATION
]
else None
)
contributing_pair_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"contributing_pair.tif",
)
if self.save_output_dsm
and self.used_conf[OUTPUT][out_cst.AUXILIARY][
out_cst.AUX_CONTRIBUTING_PAIR
]
else None
)
filling_file_name = (
os.path.join(
self.out_dir,
out_cst.DSM_DIRECTORY,
"filling.tif",
)
if self.save_output_dsm
and self.used_conf[OUTPUT][out_cst.AUXILIARY][out_cst.AUX_FILLING]
else None
)
# rasterize point cloud
_ = self.rasterization_application.run(
self.point_cloud_to_rasterize,
self.epsg,
self.vertical_crs,
resolution=self.resolution,
orchestrator=self.cars_orchestrator,
dsm_file_name=dsm_file_name,
weights_file_name=weights_file_name,
color_file_name=color_file_name,
classif_file_name=classif_file_name,
performance_map_file_name=performance_map_file_name,
ambiguity_file_name=ambiguity_file_name,
contributing_pair_file_name=contributing_pair_file_name,
filling_file_name=filling_file_name,
color_dtype=self.color_type,
dump_dir=self.rasterization_dump_dir,
performance_map_classes=self.used_conf[OUTPUT][AUXILIARY][
out_cst.AUX_PERFORMANCE_MAP
],
phasing=self.phasing,
)
# Cleaning: don't keep terrain bbox if save_intermediate_data
# is not activated
if not self.used_conf[PIPELINE][ADVANCED][
adv_cst.SAVE_INTERMEDIATE_DATA
]:
self.cars_orchestrator.add_to_clean(
os.path.join(self.dump_dir, "terrain_bbox")
)
if self.quit_on_app("point_cloud_rasterization"):
return True
# dsm needs to be saved before filling
self.cars_orchestrator.breakpoint()
# saved used configuration
self.save_configurations()
if (
classif_file_name is not None
and self.used_conf[OUTPUT][out_cst.AUXILIARY][
out_cst.AUX_CLASSIFICATION
]
):
self.merge_classif_bands(
classif_file_name,
self.used_conf[OUTPUT][out_cst.AUXILIARY][
out_cst.AUX_CLASSIFICATION
],
dsm_file_name,
)
if (
filling_file_name is not None
and self.used_conf[OUTPUT][out_cst.AUXILIARY][out_cst.AUX_FILLING]
):
self.merge_filling_bands(
filling_file_name,
self.used_conf[OUTPUT][out_cst.AUXILIARY][out_cst.AUX_FILLING],
dsm_file_name,
)
return False
@cars_profile(name="merge filling bands", interval=0.5)
def merge_filling_bands(self, filling_path, aux_filling, dsm_file):
"""
Merge filling bands to get mono band in output
"""
with rasterio.open(dsm_file) as in_dsm:
dsm_msk = in_dsm.read_masks(1)
with rasterio.open(filling_path) as src:
nb_bands = src.count
if nb_bands == 1:
return False
filling_multi_bands = src.read()
filling_mono_bands = np.zeros(filling_multi_bands.shape[1:3])
descriptions = src.descriptions
dict_temp = {name: i for i, name in enumerate(descriptions)}
profile = src.profile
with warnings.catch_warnings():
warnings.simplefilter("ignore", NodataShadowWarning)
filling_mask = src.read_masks(1)
filling_mono_bands[filling_mask == 0] = 0
filling_bands_list = {
"fill_with_geoid": ["filling_exogenous"],
"interpolate_from_borders": [
"bulldozer",
"border_interpolation",
],
"fill_with_endogenous_dem": [
"filling_exogenous",
"bulldozer",
],
"fill_with_exogenous_dem": ["bulldozer"],
"no_edition": [],
}
# To get the right footprint
filling_mono_bands = np.logical_or(dsm_msk, filling_mask).astype(
np.uint8
)
# to keep the previous classif convention
filling_mono_bands[filling_mono_bands == 0] = src.nodata
filling_mono_bands[filling_mono_bands == 1] = 0
no_match = False
for key, value in aux_filling.items():
if isinstance(value, str):
value = [value]
if isinstance(value, list):
for elem in value:
if elem not in ("other", "interpolation"):
filling_method = filling_bands_list[elem]
if all(
method in descriptions
for method in filling_method
):
indices_true = [
dict_temp[m] for m in filling_method
]
mask_true = np.all(
filling_multi_bands[indices_true, :, :]
== 1,
axis=0,
)
indices_false = [
i
for i in range(filling_multi_bands.shape[0])
if i not in indices_true
]
mask_false = np.all(
filling_multi_bands[indices_false, :, :]
== 0,
axis=0,
)
mask = mask_true & mask_false
filling_mono_bands[mask] = key
else:
no_match = True
if no_match:
mask_1 = np.all(
filling_multi_bands[1:, :, :] == 1,
axis=0,
)
mask_2 = np.all(
filling_mono_bands == 0,
axis=0,
)
filling_mono_bands[mask_1 & mask_2] = (
aux_filling["other"] if "other" in aux_filling else 50
)
profile.update(count=1, dtype=filling_mono_bands.dtype)
with rasterio.open(filling_path, "w", **profile) as src:
src.write(filling_mono_bands, 1)
return True
@cars_profile(name="merge classif bands", interval=0.5)
def merge_classif_bands(self, classif_path, aux_classif, dsm_file):
"""
Merge classif bands to get mono band in output
"""
with rasterio.open(dsm_file) as in_dsm:
dsm_msk = in_dsm.read_masks(1)
with rasterio.open(classif_path) as src:
nb_bands = src.count
if nb_bands == 1:
return False
classif_multi_bands = src.read()
classif_mono_band = np.zeros(classif_multi_bands.shape[1:3])
descriptions = src.descriptions
profile = src.profile
classif_mask = src.read_masks(1)
classif_mono_band[classif_mask == 0] = 0
# To get the right footprint
classif_mono_band = np.logical_or(dsm_msk, classif_mask).astype(
np.uint8
)
# to keep the previous classif convention
classif_mono_band[classif_mono_band == 0] = src.nodata
classif_mono_band[classif_mono_band == 1] = 0
for key, value in aux_classif.items():
if isinstance(value, int):
num_band = descriptions.index(str(value))
mask_1 = classif_mono_band == 0
mask_2 = classif_multi_bands[num_band, :, :] == 1
classif_mono_band[mask_1 & mask_2] = key
elif isinstance(value, list):
for elem in value:
num_band = descriptions.index(str(elem))
mask_1 = classif_mono_band == 0
mask_2 = classif_multi_bands[num_band, :, :] == 1
classif_mono_band[mask_1 & mask_2] = key
profile.update(count=1, dtype=classif_mono_band.dtype)
with rasterio.open(classif_path, "w", **profile) as src:
src.write(classif_mono_band, 1)
return True
@cars_profile(name="Preprocess depth maps", interval=0.5)
def preprocess_depth_maps(self):
"""
Adds multiple processing steps to the depth maps :
Merging.
Creates the point cloud that will be rasterized in
the last step of the pipeline.
"""
self.point_cloud_to_rasterize = (
self.list_epipolar_point_clouds,
self.terrain_bounds,
)
self.color_type = self.point_cloud_to_rasterize[0][0].attributes.get(
"color_type", None
)
@cars_profile(name="Final cleanup", interval=0.5)
def final_cleanup(self):
"""
Clean temporary files and directory at the end of cars processing
"""
if (
not self.used_conf[PIPELINE][ADVANCED][
adv_cst.SAVE_INTERMEDIATE_DATA
]
and not self.tie_point_save
):
# delete everything in tile_processing if save_intermediate_data is
# not activated
self.cars_orchestrator.add_to_clean(
os.path.join(self.dump_dir, "tile_processing")
)
self.cars_orchestrator.add_to_clean(
os.path.join(self.out_dir, "tie_points")
)
# Remove dump_dir if no intermediate data should be written
if not any(
app.get("save_intermediate_data", False) is True
for app in self.used_conf[PIPELINE][APPLICATIONS].values()
if app is not None
):
self.cars_orchestrator.add_to_clean(self.dump_dir)
@cars_profile(name="run_surface_modeling_pipeline", interval=0.5)
def run(
self,
args=None, # pylint: disable=W0613
which_resolution="single",
working_res=1,
log_dir=None,
): # noqa C901
"""
Run pipeline
"""
if log_dir is not None:
self.log_dir = log_dir
else:
self.log_dir = os.path.join(self.out_dir, "logs")
self.texture_bands = self.used_conf[OUTPUT][AUXILIARY][
out_cst.AUX_IMAGE
]
self.auxiliary = self.used_conf[OUTPUT][out_cst.AUXILIARY]
self.which_resolution = which_resolution
self.working_res = working_res
# saved used configuration
self.save_configurations()
# start cars orchestrator
with orchestrator.Orchestrator(
orchestrator_conf=self.used_conf[ORCHESTRATOR],
out_dir=self.out_dir,
log_dir=self.log_dir,
out_yaml_path=os.path.join(
self.out_dir,
out_cst.INFO_FILENAME,
),
) as self.cars_orchestrator:
# link metadata
self.metadata = self.cars_orchestrator.out_yaml
# initialize out_json
self.cars_orchestrator.update_out_info({"version": __version__})
if self.compute_depth_map:
self.sensor_to_depth_maps()
if self.save_output_dsm or self.save_output_point_cloud:
self.preprocess_depth_maps()
if self.save_output_dsm:
self.rasterize_point_cloud()
self.final_cleanup()