#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2026 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
CARS analysis pipeline class file
"""
import copy
import json
import logging
import math
import os
import numpy as np
import rasterio
from rasterio.warp import transform_bounds
from shapely.geometry import box, mapping
from cars import extractroi
from cars.core import projection
from cars.core.geometry.abstract_geometry import AbstractGeometry
from cars.core.utils import safe_makedirs
from cars.pipelines.analysis.report_tools import (
generate_report_cars_output,
generate_satelite_position_report,
merge_reports,
)
from cars.pipelines.default.default_pipeline import DefaultPipeline
from cars.pipelines.parameters import advanced_parameters_constants as adv_cst
from cars.pipelines.parameters import output_constants as out_cst
from cars.pipelines.parameters import sensor_inputs_constants as sens_cst
from cars.pipelines.pipeline import Pipeline
from cars.pipelines.pipeline_constants import (
ADVANCED,
INPUT,
OUTPUT,
SUBSAMPLING,
TIE_POINTS,
)
from cars.pipelines.pipeline_template import PipelineTemplate
from cars.pipelines.subsampling.subsampling import SubsamplingPipeline
from cars.pipelines.surface_modeling.surface_modeling import (
SurfaceModelingPipeline,
)
PIPELINE = "analysis"
[docs]
@Pipeline.register(
PIPELINE,
)
class AnalysisPipeline(PipelineTemplate):
"""
Analysis pipeline
"""
def __init__(self, conf, config_dir=None):
"""
Creates pipeline
Directly creates class attributes:
used_conf
:param pipeline_name: name of the pipeline.
:type pipeline_name: str
:param cfg: configuration {'matching_cost_method': value}
:type cfg: dictionary
:param config_dir: path to dir containing json/yaml
:type config_dir: str
"""
self.used_conf = {"input": conf["input"], "output": conf["output"]}
self.config_dir = config_dir
[docs]
@staticmethod
def check_output(conf):
"""
Check the output given
:param conf: configuration of output
:type conf: dict
:return: overloader output
:rtype: dict
"""
return conf
[docs]
def run(self, args=None, log_dir=None): # pylint: disable=W0613
"""
Run pipeline
"""
if not self.used_conf[INPUT][sens_cst.SENSORS]:
raise ValueError("No sensor provided in input")
pipeline_out_dir = self.used_conf[OUTPUT][out_cst.OUT_DIRECTORY]
intermediate_data_dir = os.path.join(
pipeline_out_dir, "intermediate_data"
)
# Generate overview of input images
# Generate terain envelope of input images
# Run satelite position report
satelite_positions, sat_infos = generate_satelite_position(
copy.deepcopy(self.used_conf), self.config_dir
)
sat_data_dir = os.path.join(intermediate_data_dir, "sat_data")
sat_report = generate_satelite_position_report(
satelite_positions, sat_infos, sat_data_dir
)
# Run Low res pipeline : dsm
low_res_dir = os.path.join(intermediate_data_dir, "low_res")
low_res_report, low_res_error, used_resolution = (
launch_low_res_pipeline(
self.used_conf, low_res_dir, self.config_dir
)
)
# Run full res pipeline on crop : tie point analysis, and dsm
full_res_dir = os.path.join(intermediate_data_dir, "full_res")
full_res_report, full_res_error = launch_full_res_pipeline(
copy.deepcopy(self.used_conf),
full_res_dir,
low_res_dir,
self.config_dir,
used_resolution,
)
# Generate report
reports_to_merge = [sat_report, low_res_report, full_res_report]
merged_report_pdf = os.path.join(pipeline_out_dir, "report.pdf")
merged_report_html = os.path.join(pipeline_out_dir, "report.html")
merge_reports(reports_to_merge, merged_report_html, merged_report_pdf)
if low_res_error or full_res_error:
raise RuntimeError(
"CARS pipeline did not run successfully, "
f"check report for more details in {merged_report_html}"
)
[docs]
def launch_low_res_pipeline(conf, low_res_dir, config_dir):
"""
Launch full res pipeline on crop : tie point analysis, and dsm
"""
safe_makedirs(low_res_dir)
report_file = os.path.join(low_res_dir, "report.html")
# launch cars
log_error = "No error during low res pipeline"
error = False
try:
# launch cars on low res images
low_res_conf = copy.deepcopy(conf)
if sens_cst.ROI in low_res_conf[INPUT][sens_cst.SENSORS]:
del low_res_conf[INPUT][sens_cst.SENSORS][sens_cst.ROI]
if sens_cst.FILLING in low_res_conf[INPUT]:
del low_res_conf[INPUT][sens_cst.FILLING]
# compute resolution to use
subsampling_dir = os.path.join(low_res_dir, "tmp_subsampling")
safe_makedirs(low_res_dir)
subsampling_conf = {
INPUT: conf[INPUT],
OUTPUT: {
out_cst.OUT_DIRECTORY: subsampling_dir,
},
}
subsampling_pipeline = SubsamplingPipeline(
subsampling_conf, config_dir=config_dir
)
subsamp_advanced = subsampling_pipeline.check_advanced(
{
"resolutions": [1024, 512, 256, 128, 64, 32, 16, 8, 4, 2, 1],
"min_image_size": 300,
},
subsampling_conf[INPUT],
)
# Get resolution: higher one
resolution = subsamp_advanced[adv_cst.RESOLUTIONS][0]
cars_conf = {
INPUT: low_res_conf[INPUT],
OUTPUT: {
out_cst.OUT_DIRECTORY: low_res_dir,
out_cst.AUXILIARY: {
out_cst.AUX_IMAGE: True,
out_cst.AUX_AMBIGUITY: True,
},
},
SUBSAMPLING: {"advanced": {"resolutions": [resolution]}},
TIE_POINTS: {ADVANCED: {"all": {"save_intermediate_data": True}}},
}
saved_cars_conf = copy.deepcopy(cars_conf)
default_pipeline = DefaultPipeline(cars_conf, config_dir=config_dir)
default_pipeline.run()
except Exception as exc:
# Generate report with error message
log_error = "Error during low res pipeline: {}".format(exc)
logging.error(log_error)
error = True
# Generate report
generate_report_cars_output(
report_file, low_res_dir, log_error, saved_cars_conf
)
return report_file, error, resolution
[docs]
def launch_full_res_pipeline(
conf, full_res_dir, low_res_dir, config_dir, used_resolution
):
"""
Launch full res pipeline on crop : tie point analysis, and dsm
"""
if not os.path.exists(full_res_dir):
os.makedirs(full_res_dir)
# retrieve best roi in 4326
roi = get_best_roi(
low_res_dir, tile_size=int(math.ceil(2000 / used_resolution))
)
# Generate pivot format
if "pipeline" in conf:
del conf["pipeline"]
if "loaders" not in conf["input"]:
conf["input"]["loaders"] = {}
# fix bug: image is in pivot format
conf["input"]["loaders"]["image"] = "pivot"
default_pipeline_dry = DefaultPipeline(conf, config_dir=config_dir)
input_conf = default_pipeline_dry.used_conf[0]["input"]
# crop images on best roi
cropped_images_dir = os.path.join(full_res_dir, "cropped_images")
if not os.path.exists(cropped_images_dir):
os.makedirs(cropped_images_dir)
new_conf_input = generate_cropped_images(
input_conf, cropped_images_dir, roi
)
# launch cars on crop
log_error = "No error during full res pipeline"
error = False
try:
cars_conf = {
INPUT: new_conf_input,
OUTPUT: {
out_cst.OUT_DIRECTORY: full_res_dir,
out_cst.AUXILIARY: {
out_cst.AUX_IMAGE: True,
out_cst.AUX_AMBIGUITY: True,
},
},
TIE_POINTS: {ADVANCED: {"all": {"save_intermediate_data": True}}},
}
saved_cars_conf = copy.deepcopy(cars_conf)
default_pipeline = DefaultPipeline(cars_conf, config_dir=config_dir)
default_pipeline.run()
except Exception as exc:
# Generate report with error message
log_error = "Error during full res pipeline: {}".format(exc)
logging.error(log_error)
error = True
# Generate report on crop results
report_file = os.path.join(full_res_dir, "report.html")
# Generate report
generate_report_cars_output(
report_file, full_res_dir, log_error, saved_cars_conf
)
return report_file, error
[docs]
def generate_cropped_images( # noqa: C901
input_conf, cropped_images_dir, roi_bbox
):
"""
Generate cropped image and associated conf
:param input_conf:
:param cropped_images_dir:
:param roi_bbox:
:return:
"""
# Get pivot format conf
new_input_conf = copy.deepcopy(input_conf)
# Update dem
rpc_options = {}
dem = new_input_conf["initial_elevation"]["dem"]
if dem is not None:
rpc_options["rpc_dem"] = dem
# new_input_conf in in pivot format
for sensor_key, sensor in new_input_conf[sens_cst.SENSORS].items():
# Get geomodel
if sensor["geomodel"]["model_type"] != "RPC":
raise ValueError("Only RPC geomodel is supported for now")
geomodel = sensor["geomodel"]["path"]
# get all images
images = []
for _, band_dict in sensor["image"]["bands"].items():
if band_dict["path"] not in images:
images.append(band_dict["path"])
mask = sensor.get("mask", None)
classification = sensor.get("classification", None)
if classification is not None:
classification = classification["path"]
if mask is not None:
images.append(mask)
if classification is not None:
images.append(classification)
# crop images
for img in images:
# crop
cropped_image = os.path.join(
cropped_images_dir, sensor_key + "_" + os.path.basename(img)
)
extractroi.process_image_file(
roi_bbox,
img,
cropped_image,
None,
rpc_options,
external_rpc_file=geomodel,
)
# replace in conf
for _, band_dict in sensor["image"]["bands"].items():
if band_dict["path"] == img:
band_dict["path"] = cropped_image
if mask is not None:
if sensor["mask"] == img:
sensor["mask"] = cropped_image
if classification is not None:
if sensor["classification"]["path"] == img:
sensor["classification"]["path"] = cropped_image
return new_input_conf
[docs]
def get_best_roi(low_res_dir, tile_size=2000):
"""
Get best roi from low res results
"""
dsm_file = os.path.join(low_res_dir, "dsm", "dsm.tif")
ambiguity_file = os.path.join(low_res_dir, "dsm", "ambiguity.tif")
with (
rasterio.open(dsm_file) as dsm_src,
rasterio.open(ambiguity_file) as ambi_src,
):
nodata = dsm_src.nodata
crs = dsm_src.crs
best_score = -1
# best_tile_coords : (minx, miny, maxx, maxy)
best_tile_coords = None
# Generate tiling grid
for col in range(0, dsm_src.width, tile_size):
for row in range(0, dsm_src.height, tile_size):
window = rasterio.windows.Window(
col,
row,
min(tile_size, dsm_src.width - col),
min(tile_size, dsm_src.height - row),
)
window_bounds = rasterio.windows.bounds(
window, dsm_src.transform
)
# Read data for this tile
dsm_data = dsm_src.read(1, window=window)
ambi_data = ambi_src.read(1, window=window)
# Calculate metrics: Valid pixels ratio and mean confidence
valid_mask = (
(dsm_data != nodata)
if nodata is not None
else np.isfinite(dsm_data)
)
valid_count = np.count_nonzero(valid_mask)
if valid_count > 0:
mean_ambi = np.mean(ambi_data[valid_mask])
# Score: priority to coverage, then quality
score = valid_count * (1 - mean_ambi)
if score > best_score:
best_score = score
best_tile_coords = window_bounds
if not best_tile_coords:
return None
# Reproject to EPSG:4326
# (west, south, east, north)
bbox_roi = transform_bounds(crs, "EPSG:4326", *best_tile_coords)
geom = box(*bbox_roi)
# Create GeoJSON structure
geojson_feature = {
"type": "Feature",
"properties": {"name": "ROI Selection", "buffer_margin": "10%"},
"geometry": mapping(geom),
}
geosjon = json.dumps(geojson_feature, indent=2)
logging.info(f"Best ROI GeoJSON:\n{geosjon}")
return bbox_roi
[docs]
def generate_satelite_position(cars_conf, config_dir):
"""
Generate satellite positions
:param cars_conf:
:param config_dir:
:return:
"""
cars_conf = {
"input": cars_conf["input"],
"output": cars_conf["output"],
}
surface_modeling_pipeline = SurfaceModelingPipeline(
cars_conf, config_dir=config_dir
)
full_cars_conf = surface_modeling_pipeline.used_conf
geom_plugin = AbstractGeometry("SharelocGeometry") # pylint: disable=E0110
sensors = full_cars_conf["input"]["sensors"]
skeys = list(sensors.keys())
sat_positions = []
infos = {"convergence_angles": {}}
for idx, skey in enumerate(skeys[1:]):
angles = projection.get_ground_angles(
sensors[skeys[0]]["image"]["bands"]["b0"]["path"],
sensors[skey]["image"]["bands"]["b0"]["path"],
sensors[skeys[0]]["geomodel"],
sensors[skey]["geomodel"],
geom_plugin,
)
azl, ell, azr, elr, conv = angles
if len(sat_positions) == 0:
sat_positions.append(["0", azl, ell])
sat_positions.append([str(idx + 1), azr, elr])
conv_key = f"0 <-> {idx + 1} , {skeys[0]} <-> {skey}"
infos["convergence_angles"][conv_key] = conv
return sat_positions, infos