Software
CARS design aims a modular and customizable framework for multiview 3d reconstruction. This design is organized around key concepts described in this section.
Warning
Under construction with CARS design evolution.
The CARS framework can be introduced by the following diagram:

This section presents one by one the CARS key concepts and their interaction.
cars_dataset Input and output object of an application. Contains a calculated and potentially tiled data.
application: Algorithmic methods that takes
Orchestrator: It instantiates and interfaces with the cluster to which it provides the tasks to be processed. It is responsible for writing the data calculated by the cluster on the fly.
plugin: library or external tools providing specific 3d functions. Under heavy reconstruction !
Pipeline: A chain of applications ( 3d reconstruction steps) from input to output with intermediate data (CarsDataset) controlled by orchestrator;
CarsDataset
Goals
CarsDataset is the CARS internal data structure. The data used within CARS imposes some restrictions that the structure must manage:
can contains array (ex: for images) or list of point (ex: for sparse matches).
can contains georeference, geometry information.
can be used for parallel/distributed computation, i.e tiled data.
can contained overlaps between tiled data;
CarsDataset aims at defining a generic data structure that takes into account this constraints.
Details

Here is an example of one dataset with all needed information.
Attributes
type : CarsDataset can manage Xarray.dataset or pandas.DataFrame
- tilesList of list of Xarray.dataset or pandas.DataFrame. Include overlaps.
example: blue + orange zone on the previous drawing.
- tiling_gridcoordinate position of each tile.
example: for the first tile on the previous drawing: [x0 ,x1,y0,y1]
- overlapscoordinate position of eachtile + overlaps
example: for the first tile on the previous drawing: [x0 ,x1+ox,y0,y1+oy]
attributes : all needed extra information
It is important to note that a tile, if even if you’decided to use Xarray.dataset or pandas.DataFrame could be a delayed or future related to dask definition. See next sections.
Functions
CarsDataset integrates all functions for manipulating the data throughout the framework:
for saving a whole dataset : save_cars_dataset
for loading a dataset written on disk: load_cars_dataset_from_disk
for creating a dataset from another one (same tiling_grid and overlaps) create_empty_copy
for saving dataset tile by tile with futur results, run_save, see next sections.
Application
Overview
An application is a main step of CARS 3D reconstruction framework. It contains algorithm methods. It takes CarsDatasets and configuration parameters as input and returns CarsDatasets.
It is composed of: * an application factory concept that register all 3D step application * an application template * Some abstract applications (each one defined a main 3d step) * Some subclass associated to each abstract application, containing specific algorithm

Example
Let’s take an example of dense_matching application to describe the main steps:
First, we can notice that dense_matching derives from ApplicationTemplate and is registered with the decorator:
@Application.register("dense_matching")
class DenseMatching(ApplicationTemplate, metaclass=ABCMeta):
Then, algorithm is contain in a subclass register, by is short_name, of dense_matching application.
class CensusMccnnSgm(
DenseMatching, short_name=["census_sgm"]
)
Init with parameters checking
To instantiate, need the orchestrator and a configuration file that contains algorirhm parameters.
def __init__(self, orchestrator, conf_matching):
"""
Init function of DenseMatching
:param orchestrator: orchestrator used
:param conf_matching: configuration for matching
:return: a application_to_use object
"""
# orchestrator
self.orchestrator = orchestrator
# check conf
self.corr_config = None
if "corr_config" in conf_matching:
self.corr_config = conf_matching["corr_config"]
# For now, this is a path, transform it to dict
# later : integrated to input config
# TODO use loader to check and generate corr_config
self.corr_config = corr_conf.configure_correlator(self.corr_config)
self.correlator = None
if "correlator" in conf_matching:
self.correlator = conf_matching["correlator"]
else:
self.correlator = "pandora"
# check loader
# Saving files
if "save_intermediate_data" in conf_matching:
self.save_intermediate_data = conf_matching["save_intermediate_data"]
else:
self.save_intermediate_data = False
#
Run,Take CarsDataset as input and return new CarsDatasets.
def run(
self,
epipolar_images_left,
epipolar_images_right,
pair_folder,
):
"""
Run Matching application.
Create left and right CarsDataset filled with xarray.Dataset ,
corresponding to epipolar disparities, on the same geometry
that epipolar_images_left and epipolar_images_right.
:param epipolar_images_left: tiled left epipolar
:type epipolar_images_left: CarsDataset
:param epipolar_images_right: tiled right epipolar
:type epipolar_images_right: CarsDataset
:param pair_folder: folder used for current pair
:type pair_folder: str
:return Disparity map
:rtype: CarsDataset
"""
2.1. Create empty CarsDatasets.
if epipolar_images_left.dataset_type == "arrays":
# Create CarsDataset
# Epipolar_disparity
epipolar_disparity_map = cars_dataset.CarsDataset("arrays")
epipolar_disparity_map.create_empty_copy(epipolar_images_left)
# Update attributes to get epipolar info
epipolar_disparity_map.attributes.update(
epipolar_images_left.attributes
)
2.2 Declare to Orchestrator which products we want to save.
# Save disparity maps
if self.save_intermediate_data:
self.orchestrator.add_to_save_lists(
os.path.join(pair_folder, "epi_disp.tif"),
cst_disp.MAP,
epipolar_disparity_map,
)
....
2.3 Ask to the Orchestrator ID for each CarsDataset
# Get saving infos in order to save tiles when they are computed
[
saving_info
] = self.orchestrator.get_saving_infos(
[epipolar_disparity_map]
)
2.4 Tile by tile, algorithm step computation
Use create_task function of the cluster throughout the Orchestrator. Algorithm function is called. See juste above
create_task returns a delayed stored in previous created CarsDataset
# Generate disparity maps
for col in range(epipolar_disparity_map.shape[1]):
for row in range(epipolar_disparity_map.shape[0]):
# Compute disparity
(
epipolar_disparity_map[row, col],
) = self.orchestrator.cluster.create_task(
compute_disparity
)(
epipolar_images[row, col],
self.corr_config,
saving_info=saving_info,
)
else:
logging.error(
"DenseMatching application doesn't "
"support this input data format"
)
return epipolar_disparity_map
For each tile, the core algorithm function is called.
Takes unique tile in input (not a whole CarsDataset) and returns a tile
Add the ID, given by orchestrator, to this tile
def compute_disparity(
image_object: xr.Dataset,
corr_cfg: dict,
saving_info=None,
) -> Dict[str, xr.Dataset]:
"""
Compute disparity map from image objects.
This function will be run as a delayed task.
User must provide saving infos to save properly created datasets
:param left_image_object: tiled Left image
* dataset with :
- cst.EPI_IMAGE
- cst.EPI_MSK (if given)
- cst.EPI_COLOR (for left, if given)
:type left_image_object: xr.Dataset
* dataset with :
- cst.EPI_IMAGE
- cst.EPI_MSK (if given)
- cst.EPI_COLOR (for left, if given)
:param right_image_object: tiled Right image
:type right_image_object: xr.Dataset
:param corr_cfg: Correlator configuration
:type corr_cfg: dict
:returns: Disparity object
Returned objects are composed of :
* dataset with :
- cst_disp.MAP
- cst_disp.VALID
- cst.EPI_COLOR
"""
# Get disp_min and disp_max
disp_min = cars_dataset.get_attributes(left_image_object)["disp_min"]
disp_max = cars_dataset.get_attributes(left_image_object)["disp_max"]
# Compute disparity
disp = dense_matching_tools.compute_disparity(
left_image_object,
right_image_object,
corr_cfg,
disp_min,
disp_max,
mask1_ignored_by_corr=mask1_ignored_by_corr,
mask2_ignored_by_corr=mask2_ignored_by_corr,
)
# Fill with attributes
left_disp_dataset = disp[cst.STEREO_REF]
cars_dataset.fill_dataset(
left_disp_dataset,
saving_info=saving_info_left,
window=cars_dataset.get_window_dataset(left_image_object),
profile=cars_dataset.get_profile_rasterio(left_image_object),
attributes=None,
overlaps=None, # overlaps are removed
)
return disp_dataset
At the end of the application, we can obtain CarsDatasets filled with delayed, one per tile.
Orchestrator
Goals
The orchestrator is the central element of CARS concepts. Its role is to ensure the communication between the computing technology, the applications and the CarsDatasets.
Details
The orchestrator is unique and instantiated for each pipeline:
with orchestrator.Orchestrator(distributed_conf=distributed_conf) as cars_orchestrator:
It is mainly composed of:
a cluster
a CarsDatasetRegistry
a dictionary, later saved in a json output file, containing information given by applications
Cluster
The cluster is the component managing access to all the threads available to CARS for its calculations.
class AbstractCluster(metaclass=ABCMeta):
...
@abstractmethod
def create_task(self, func, nout=1):
"""
Create task
:param func: function
:param nout: number of outputs
"""
@abstractmethod
def start_tasks(self, task_list):
"""
Start all tasks
:param task_list: task list
"""
@abstractmethod
def future_iterator(self, future_list):
"""
Iterator, iterating on computed futures
:param future_list: future_list list
"""
The two main functions are:
create_task to declare a task to the cluster. It returns a delayed object.
start_tasks to compute each task that has been declared.
future_iterator: iterate over the future objects
There are already 3 plugins, each one representing a mode:
dask
local_dask
pbs_dask
slurm_dask
mp (for mutliprocessing)
sequential : (note: in this mode, delayed objects do not exist. They will instead directly be of type Xarray.dataset or Panda.Dataframe)
Cluster Multiprocessing
Goals
The multiprocessing (MP) cluster facilitates the distribution of computing for the application and the management of cars_dataset data.
Details
The MP cluster is built upon Python’s multiprocessing module using the forkserver mode. In this mode, a pool of worker processes handles the parallel execution of functions. Each worker process is single-threaded, and only essential resources are inherited. By design, CARS utilizes disk-based registry for data storage, distributing data across the processes. If specified in configuration, data distribution can be done in memory, with degraded performance.
How it works
The main class is the MP Cluster, which inherits from the AbstractCluster class. It is instantiated within the orchestrator.
Inspired by the Dask cluster approach, the MP cluster initiates a list of delayed tasks and factorize the tasks that can be run sequentially. Factorisation of tasks allows to reduce the number of tasks without losing any time. Reducing the number of tasks permits to reduce the number of dumps on disk and to save time. For each task that has available data (intermediate results input from the linked previous task), the MP cluster transforms the delayed task into an MpFutureTask.
Upon completion of these jobs, the results are saved on disk, and the reference is passed to the next job. The refresh_task_cache function serves as the primary control function of the MP cluster.
The next sections illustrates the architecture of the MP cluster, while the API provides detailed functions that offer more insight into interactions and operations.
Class diagram
API detailed functions
init
Cluster allocation using a Python thread pool. The worker pool is set up in forkserver mode with a specified number of workers, job timeouts, and wrapper configuration for cluster logging.
create_task_wrapped
Declare task as MpDelayed within the cluster. MpDelayed are instantiated using the mp_delayed_builder wrapper builder. Furthermore, the wrapper provides parameters for the job logger.
start_tasks
Factorize tasks with mp_factorizer.factorize_tasks and add future tasks in the cluster queue. The cluster processes tasks from the queue. Transform MpDelayed with rec_start to MpJob, and calculate task dependencies for each job.
mp_factorizer.factorize_tasks
Take as input a list of final MpDelayed and factorize all the dependent tasks that are factorizable.
A task t of the class MpDelayedTask is factorizable if :
Only one task depends on the task t
The task t depends on only one task
If a task t2 is factorizable and depends on a task t1, then a new MpDelayedTask t_new is created with :
t_new.func = factorized_func
t_new.args = [factorized_object]
The task t2 is replaced by the task t_new.
The object factorized_object is of the class FactorizedObject and contains a list of tasks with all the information needed by factorized_func to run the tasks t1 and t2 sequentially (functions of tasks t1 and t2 and arguments of task t1) :
Arguments of t2 that are MpDelayed are replaced by objects PreviousData during factorization. Then, during the run of factorized_func, PreviousData objects will be replaced by output of t1 and then t2 will be computed. Thus, the computing of t_new = t1 + t2 will be treated as a single task by the cluster.
rec_start
Transform delayed tasks to MpJob and create MpFuture objects to retrieve results.
For each task:
The function transforms args and kwargs into actual data.
Determine the result dependency of the job and verify the status of the ready task.
Append the task to the queue with the ready task status, and updated args and kwargs.
Create MpFuture to share result object, and remove future at the task’s completion.
Create additional futures linked to this task.
# refresh_task_cache
At each refresh:
Sleep (refresh time).
Populate the cluster pool with nb_workers tasks based on tasks without dependencies. The remaining tasks are added to the wait_list.
Check for ready results in in_progress_list. Add job with ready results to done_list and map results with statuses in the done_task_results..
Update/remove dependency for each successfully completed job.
Search for next task with higher priority. If jobs that have succeeded depend on tasks in the wait_list, move these jobs to the next_priority_tasks list (ensuring duplicates are removed).
Remove completed jobs from the in_progress_list.
Obtain the lists of ready_list and failed_list jobs.
Filter tasks from the next_priority_tasks based on their presence in the ready_list, and place them into the priority_list.
Calculate nb_ready_task = nb_workers - size(priority_list) to add only nb_ready_task tasks without dependency.
If the priority tasks have completed, proceed with the remaining tasks of the ready_list in their initial order.
Remove failed jobs from the wait_list and copy results to corresponding future, and them remove themselves from task_cache.
Launch tasks nb_workers tasks from priority_list.
Replace jobs with actual data. Launch task. Eliminate launched tasks from the wait_list.
Clean unused future jobs with wrapper done_task_results.
get_ready_failed_tasks
Retrieve the new ready tasks and failed tasks.
get_tasks_without_deps
A static method evaluates a list of tasks that are ready and lack dependencies, excluding those deemed as initial tasks. The initial tasks of the graph have no priority. In order to enhance disk usage efficiency, the cluster initiates with N initial tasks (where N equals the number of workers), assigning priority to the subsequent connected tasks. After finishing a segment of the task graph, the cluster introduces N new initial tasks to continue the process.
future_iterator
Enable the initiation of all tasks from the orchestrator controller.
get_job_ids_from_futures
Obtain a list of job IDs from the future list.
replace_job_by_data
Substitute MpJob instances in lists or dict with their actual data.
compute_dependencies
Compute job result dependencies from args and kw_args.
MpFutureTask
A multiprocessing version of the Dask distributed.future. This class encapsulates data and references to job cluster threads. It also facilitates the sharing of references between jobs and cleaning cache operations.
log_error_hook
A custom Exception hook to manage cluster thread exceptions.
The CarsDatasetRegistry is a class that enables the management of the list of CarsDatasets that the user wants to save. It is mainly composed of:
a registry CarsDataset list
an id associated to each registered CarsDataset
There are some functions that allow a user to:
Add new CarsDataset to registry
Obtain an ID for a CarsDataset
Find a CarsDataset from an ID
Manage saving tile by tile (i.e future by future, related to dask terms), by using the SingleCarsDatasetSaver that wraps CarsDataset save functions.
WIP
WIP
WIP
WIP
How it works
Instantiate orchestrator before every pipeline with a configuration file that defines cluster mode and output directory
with orchestrator.Orchestrator(distributed_conf=distributed_conf) as cars_orchestrator:
Cluster and CarsDatasetRegistry are created
def __init__(self, distributed_conf=None):
"""
Init function of Orchestrator.
Creates Cluster and Registry for CarsDatasets
:param distributed_conf: configuration of distribution
"""
# out_dir
self.out_dir = None
if "out_dir" in distributed_conf:
self.out_dir = distributed_conf["out_dir"]
else:
logging.error("No out_dir defined")
self.nb_workers = 1
if "nb_workers" in distributed_conf:
self.nb_workers = distributed_conf["nb_workers"]
# init cluster
self.cluster = AbstractCluster( # pylint: disable=E0110
distributed_conf
)
# init CarsDataset savers registry
self.cars_ds_savers_registry = CarsDatasetsRegistry()
# init saving lists
self.cars_ds_list = []
# outjson
self.out_json = {}
Orchestrator is used in every applications which can add CarsDataset to save (orchestrator interacts with CarsDatasetRegistry)
def add_to_save_lists(
self, file_name, tag, cars_ds, dtype="float32", nodata=0
):
"""
Save file to list in order to be saved later
:param file_name: file name
:param tag: tag
:param cars_ds: cars dataset to register
"""
self.cars_ds_savers_registry.add_file_to_save(
file_name, cars_ds, tag=tag, dtype=dtype, nodata=nodata
)
if cars_ds not in self.cars_ds_list:
self.cars_ds_list.append(cars_ds)
Orchestrator can be used to obtain CarsDataset ID (see application)
def get_saving_infos(self, cars_ds_list):
"""
Get saving infos of given cars datasets
:param cars_ds_list: list of cars datasets
:type cars_ds_list: list[CarsDataset]
:return : list of saving infos
:rtype: list[dict]
"""
saving_infos = []
for cars_ds in cars_ds_list:
saving_infos.append(
self.cars_ds_savers_registry.get_saving_infos(cars_ds)
)
return saving_infos
4. At the end of the pipeline, the __exit__ function is called automatically. It computes all delayed needed for saving CarsDataset using cluster.start_tasks function` that returns future objects. And the save function of CarsDatasetRegistry is called for saving by iterating on future objects.
def __exit__(self, exc_type, exc_value, traceback):
"""
Function run on exit.
Compute cluster tasks, save futures to be saved, and cleanup cluster
and files
"""
# run compute and save files
logging.info("Compute delayed ...")
# Flatten to list
delayed_objects = flatten_object(
self.cars_ds_savers_registry.get_cars_datasets_list()
)
# Compute delayed
future_objects = self.cluster.start_tasks(delayed_objects)
# Save objects when they are computed
logging.info("Wait for futures results ...")
pbar = tqdm(total=len(future_objects), desc="Processing Futures ...")
for future_obj in tqdm(self.cluster.future_iterator(future_objects)):
# get corresponding CarsDataset and save tile
if future_obj is not None:
self.cars_ds_savers_registry.save(future_obj)
else:
logging.debug("None tile : not saved")
pbar.update()
# close files
logging.info("Close files ...")
self.cars_ds_savers_registry.cleanup()
# close cluster
logging.info("Close cluster ...")
self.cluster.cleanup()
Plugin
Geometry plugins aim to enable the use of different geometry libraries, typically libGEO or Shareloc to perform CARS geometric operations which require the interpretation of the geometric models of the pairs to process.
Those operations are:
The epipolar grids computation
The direct localization operation
The lines of sight triangulation
SharelocGeometry is an internal geometry plugin used in the baseline installations of CARS. In the current state, Shareloc has to be installed when using CARS as this class is imported in the step module.
Geometry abstract class
The CARS abstract geometry class, named AbstractGeometry, is defined in the core geometry module (cars/core/geometry/__init__.py). Considering the geometry operations, CARS has its own internal implementation of the CARS abstract class using Shareloc. External plugins can be used if they are registered correctly :
from cars.core.geometry.abstract_geometry import AbstractGeometry
@AbstractGeometry.register_subclass("GeometryPluginName")
class GeometryPluginName(AbstractGeometry):
...
In order to make an external plugin work with CARS, it is mandatory to use the entry point cars.plugins at the setup of the package to register the AbstractGeometry object. For example, if the AbstractGeometry object is defined in file cars_geometry_plugin_name.cars_geometry_plugin_name, this code has to present in the file cars_geometry_plugin_name.setup.py
setup(
entry_points={
"cars.plugins": [
"plugin=cars_geometry_plugin_name.cars_geometry_plugin_name :GeometryPluginName"
]
},
)
Mandatory methods
Currently, the AbstractGeometry class requires the implementation of the following mandatory methods and properties:
conf_schema which specify the user input json schema required by the geometric library.
def conf_schema(self):
"""
Returns the input configuration fields required by the geometry plugin
as a json checker schema. The available fields are defined in the
cars/conf/input_parameters.py file
:return: the geo configuration schema
"""
check_products_consistency which check if the geometrical model filled by the user is readable by the geometric library.
def check_products_consistency(cars_conf) -> bool:
"""
Test if the product is readable by the geometry plugin
:param: cars_conf: cars input configuration
:return: True if the products are readable, False otherwise
"""
triangulate which is a method performing the triangulation from a disparity map or a set of matching points (mode parameter).
def triangulate(
sensor1,
sensor2,
geomodel1,
geomodel2,
mode: str,
matches: Union[xr.Dataset, np.ndarray],
grid1: str,
grid2: str,
roi_key: Union[None, str] = None,
) -> np.ndarray:
"""
Performs triangulation from cars disparity or matches dataset
:param sensor1: path to left sensor image
:param sensor2: path to right sensor image
:param geomodel1: path and attriutes for left geomodel
:param geomodel2: path and attriutes for right geomodel
:param mode: triangulation mode
(constants.DISP_MODE or constants.MATCHES)
:param matches: cars disparity dataset or matches as numpy array
:param grid1: path to epipolar grid of img1
:param grid2: path to epipolar grid of image 2
:param roi_key: dataset roi to use
(can be cst.ROI or cst.ROI_WITH_MARGINS)
:return: the long/lat/height numpy array in output of the triangulation
"""
generate_epipolar_grids which generates the left and right epipolar grids from the images of the pair and their geometrical models.
def generate_epipolar_grids(
self,
sensor1,
sensor2,
geomodel1,
geomodel2,
epipolar_step: int = 30,
) -> Tuple[
np.ndarray, np.ndarray, List[float], List[float], List[int], float
]:
"""
Computes the left and right epipolar grids
:param sensor1: path to left sensor image
:param sensor2: path to right sensor image
:param geomodel1: path to left geomodel
:param geomodel2: path to right geomodel
:param epipolar_step: step to use to construct the epipolar grids
:return: Tuple composed of :
- the left epipolar grid as a numpy array
- the right epipolar grid as a numpy array
- the left grid origin as a list of float
- the left grid spacing as a list of float
- the epipolar image size as a list of int \
(x-axis size is given with the index 0, y-axis size with index 1)
- the disparity to altitude ratio as a float
"""
direct_loc which performs direct localization operations.
def direct_loc(
self,
sensor,
geomodel,
x_coord: list,
y_coord: list,
z_coord: list = None
) -> np.ndarray:
"""
For a given image points list, compute the latitudes, longitudes, altitudes
Advice: to be sure, use x,y,z list inputs only
:param sensor: path to sensor image
:param geomodel: path and attributes for geomodel
:param x_coord: X Coordinates list in input image sensor
:param y_coord: Y Coordinates list in input image sensor
:param z_coord: Z Altitude coordinates list to take the image
:return: Latitude, Longitude, Altitude coordinates list as a numpy array
"""
Where constants corresponds to the cars/core/constants.py module.
Available methods
Some methods are available in the AbstractGeometry class that might be useful for any geometry plugin which would only perform the triangulation using sensor coordinates. CARS’ API only provides as inputs of the geometry plugin triangulation method the epipolar coordinates for each image of the pair. Thus the matches_to_sensor_coords method enables any plugin to convert those coordinates into the corresponding sensor ones.
AbstractGeometry implements the method image_envelope. It computes the ground footprint of an image in sensor geometry by projecting its four corners using the direct localization method. This method can be overloaded by any geometry plugin if necessary.
Detailed interaction between concepts
Now that all the concepts have been presented in details, we can draw a more technical diagram:
