Orchestrator
Orchestrator
Goals
The orchestrator is the central element of CARS concepts. Its role is to ensure the communication between the computing technology, the applications and the CarsDatasets.
Details
The orchestrator is unique and instantiated for each pipeline:
with orchestrator.Orchestrator(distributed_conf=distributed_conf) as cars_orchestrator:
It is mainly composed of:
a cluster
a CarsDatasetRegistry
a dictionary, later saved in a json output file, containing information given by applications
Cluster
The cluster is the component managing access to all the threads available to CARS for its calculations.
class AbstractCluster(metaclass=ABCMeta):
...
@abstractmethod
def create_task(self, func, nout=1):
"""
Create task
:param func: function
:param nout: number of outputs
"""
@abstractmethod
def start_tasks(self, task_list):
"""
Start all tasks
:param task_list: task list
"""
@abstractmethod
def future_iterator(self, future_list):
"""
Iterator, iterating on computed futures
:param future_list: future_list list
"""
The two main functions are:
create_task to declare a task to the cluster. It returns a delayed object.
start_tasks to compute each task that has been declared.
future_iterator: iterate over the future objects
There are already 3 plugins, each one representing a mode:
dask
local_dask
pbs_dask
slurm_dask
mp (for mutliprocessing)
sequential : (note: in this mode, delayed objects do not exist. They will instead directly be of type Xarray.dataset or Panda.Dataframe)
Cluster Multiprocessing
Goals
The multiprocessing (MP) cluster facilitates the distribution of computing for the application and the management of cars_dataset data.
Details
The MP cluster is built upon Python’s multiprocessing module using the forkserver mode. In this mode, a pool of worker processes handles the parallel execution of functions. Each worker process is single-threaded, and only essential resources are inherited. By design, CARS utilizes disk-based registry for data storage, distributing data across the processes. If specified in configuration, data distribution can be done in memory, with degraded performance.
How it works
The main class is the MP Cluster, which inherits from the AbstractCluster class. It is instantiated within the orchestrator.
Inspired by the Dask cluster approach, the MP cluster initiates a list of delayed tasks and factorize the tasks that can be run sequentially. Factorisation of tasks allows to reduce the number of tasks without losing any time. Reducing the number of tasks permits to reduce the number of dumps on disk and to save time. For each task that has available data (intermediate results input from the linked previous task), the MP cluster transforms the delayed task into an MpFutureTask.
Upon completion of these jobs, the results are saved on disk, and the reference is passed to the next job. The refresh_task_cache function serves as the primary control function of the MP cluster.
The next sections illustrates the architecture of the MP cluster, while the API provides detailed functions that offer more insight into interactions and operations.
Class diagram
API detailed functions
init
Cluster allocation using a Python thread pool. The worker pool is set up in forkserver mode with a specified number of workers, job timeouts, and wrapper configuration for cluster logging.
create_task_wrapped
Declare task as MpDelayed within the cluster. MpDelayed are instantiated using the mp_delayed_builder wrapper builder. Furthermore, the wrapper provides parameters for the job logger.
start_tasks
Factorize tasks with mp_factorizer.factorize_tasks and add future tasks in the cluster queue. The cluster processes tasks from the queue. Transform MpDelayed with rec_start to MpJob, and calculate task dependencies for each job.
mp_factorizer.factorize_tasks
Take as input a list of final MpDelayed and factorize all the dependent tasks that are factorizable.
A task t of the class MpDelayedTask is factorizable if :
Only one task depends on the task t
The task t depends on only one task
If a task t2 is factorizable and depends on a task t1, then a new MpDelayedTask t_new is created with :
t_new.func = factorized_func
t_new.args = [factorized_object]
The task t2 is replaced by the task t_new.
The object factorized_object is of the class FactorizedObject and contains a list of tasks with all the information needed by factorized_func to run the tasks t1 and t2 sequentially (functions of tasks t1 and t2 and arguments of task t1) :
Arguments of t2 that are MpDelayed are replaced by objects PreviousData during factorization. Then, during the run of factorized_func, PreviousData objects will be replaced by output of t1 and then t2 will be computed. Thus, the computing of t_new = t1 + t2 will be treated as a single task by the cluster.
rec_start
Transform delayed tasks to MpJob and create MpFuture objects to retrieve results.
For each task:
The function transforms args and kwargs into actual data.
Determine the result dependency of the job and verify the status of the ready task.
Append the task to the queue with the ready task status, and updated args and kwargs.
Create MpFuture to share result object, and remove future at the task’s completion.
Create additional futures linked to this task.
# refresh_task_cache
At each refresh:
Sleep (refresh time).
Populate the cluster pool with nb_workers tasks based on tasks without dependencies. The remaining tasks are added to the wait_list.
Check for ready results in in_progress_list. Add job with ready results to done_list and map results with statuses in the done_task_results..
Update/remove dependency for each successfully completed job.
Search for next task with higher priority. If jobs that have succeeded depend on tasks in the wait_list, move these jobs to the next_priority_tasks list (ensuring duplicates are removed).
Remove completed jobs from the in_progress_list.
Obtain the lists of ready_list and failed_list jobs.
Filter tasks from the next_priority_tasks based on their presence in the ready_list, and place them into the priority_list.
Calculate nb_ready_task = nb_workers - size(priority_list) to add only nb_ready_task tasks without dependency.
If the priority tasks have completed, proceed with the remaining tasks of the ready_list in their initial order.
Remove failed jobs from the wait_list and copy results to corresponding future, and them remove themselves from task_cache.
Launch tasks nb_workers tasks from priority_list.
Replace jobs with actual data. Launch task. Eliminate launched tasks from the wait_list.
Clean unused future jobs with wrapper done_task_results.
get_ready_failed_tasks
Retrieve the new ready tasks and failed tasks.
get_tasks_without_deps
A static method evaluates a list of tasks that are ready and lack dependencies, excluding those deemed as initial tasks. The initial tasks of the graph have no priority. In order to enhance disk usage efficiency, the cluster initiates with N initial tasks (where N equals the number of workers), assigning priority to the subsequent connected tasks. After finishing a segment of the task graph, the cluster introduces N new initial tasks to continue the process.
future_iterator
Enable the initiation of all tasks from the orchestrator controller.
get_job_ids_from_futures
Obtain a list of job IDs from the future list.
replace_job_by_data
Substitute MpJob instances in lists or dict with their actual data.
compute_dependencies
Compute job result dependencies from args and kw_args.
MpFutureTask
A multiprocessing version of the Dask distributed.future. This class encapsulates data and references to job cluster threads. It also facilitates the sharing of references between jobs and cleaning cache operations.
log_error_hook
A custom Exception hook to manage cluster thread exceptions.
The CarsDatasetRegistry is a class that enables the management of the list of CarsDatasets that the user wants to save. It is mainly composed of:
a registry CarsDataset list
an id associated to each registered CarsDataset
There are some functions that allow a user to:
Add new CarsDataset to registry
Obtain an ID for a CarsDataset
Find a CarsDataset from an ID
Manage saving tile by tile (i.e future by future, related to dask terms), by using the SingleCarsDatasetSaver that wraps CarsDataset save functions.
WIP
WIP
WIP
WIP
How it works
Instantiate orchestrator before every pipeline with a configuration file that defines cluster mode and output directory
with orchestrator.Orchestrator(distributed_conf=distributed_conf) as cars_orchestrator:
Cluster and CarsDatasetRegistry are created
def __init__(self, distributed_conf=None):
"""
Init function of Orchestrator.
Creates Cluster and Registry for CarsDatasets
:param distributed_conf: configuration of distribution
"""
# out_dir
self.out_dir = None
if "out_dir" in distributed_conf:
self.out_dir = distributed_conf["out_dir"]
else:
logging.error("No out_dir defined")
self.nb_workers = 1
if "nb_workers" in distributed_conf:
self.nb_workers = distributed_conf["nb_workers"]
# init cluster
self.cluster = AbstractCluster( # pylint: disable=E0110
distributed_conf
)
# init CarsDataset savers registry
self.cars_ds_savers_registry = CarsDatasetsRegistry()
# init saving lists
self.cars_ds_list = []
# outjson
self.out_json = {}
Orchestrator is used in every applications which can add CarsDataset to save (orchestrator interacts with CarsDatasetRegistry)
def add_to_save_lists(
self, file_name, tag, cars_ds, dtype="float32", nodata=0
):
"""
Save file to list in order to be saved later
:param file_name: file name
:param tag: tag
:param cars_ds: cars dataset to register
"""
self.cars_ds_savers_registry.add_file_to_save(
file_name, cars_ds, tag=tag, dtype=dtype, nodata=nodata
)
if cars_ds not in self.cars_ds_list:
self.cars_ds_list.append(cars_ds)
Orchestrator can be used to obtain CarsDataset ID (see application)
def get_saving_infos(self, cars_ds_list):
"""
Get saving infos of given cars datasets
:param cars_ds_list: list of cars datasets
:type cars_ds_list: list[CarsDataset]
:return : list of saving infos
:rtype: list[dict]
"""
saving_infos = []
for cars_ds in cars_ds_list:
saving_infos.append(
self.cars_ds_savers_registry.get_saving_infos(cars_ds)
)
return saving_infos
4. At the end of the pipeline, the __exit__ function is called automatically. It computes all delayed needed for saving CarsDataset using cluster.start_tasks function` that returns future objects. And the save function of CarsDatasetRegistry is called for saving by iterating on future objects.
def __exit__(self, exc_type, exc_value, traceback):
"""
Function run on exit.
Compute cluster tasks, save futures to be saved, and cleanup cluster
and files
"""
# run compute and save files
logging.info("Compute delayed ...")
# Flatten to list
delayed_objects = flatten_object(
self.cars_ds_savers_registry.get_cars_datasets_list()
)
# Compute delayed
future_objects = self.cluster.start_tasks(delayed_objects)
# Save objects when they are computed
logging.info("Wait for futures results ...")
pbar = tqdm(total=len(future_objects), desc="Processing Futures ...")
for future_obj in tqdm(self.cluster.future_iterator(future_objects)):
# get corresponding CarsDataset and save tile
if future_obj is not None:
self.cars_ds_savers_registry.save(future_obj)
else:
logging.debug("None tile : not saved")
pbar.update()
# close files
logging.info("Close files ...")
self.cars_ds_savers_registry.cleanup()
# close cluster
logging.info("Close cluster ...")
self.cluster.cleanup()