The MP cluster is built upon Python’s multiprocessing module using the forkserver mode. In this mode, a pool of worker processes handles the parallel execution of functions. Each worker process is single-threaded, and only essential resources are inherited. By design, CARS utilizes disk-based registry for data storage, distributing data across the processes. If specified in configuration, data distribution can be done in memory, with degraded performance.
How it works
The main class is the MP Cluster, which inherits from the AbstractCluster class. It is instantiated within the orchestrator.
Inspired by the Dask cluster approach, the MP cluster initiates a list of delayed tasks and factorize the tasks that can be run sequentially. Factorisation of tasks allows to reduce the number of tasks without losing any time. Reducing the number of tasks permits to reduce the number of dumps on disk and to save time. For each task that has available data (intermediate results input from the linked previous task), the MP cluster transforms the delayed task into an MpFutureTask.
Upon completion of these jobs, the results are saved on disk, and the reference is passed to the next job. The refresh_task_cache function serves as the primary control function of the MP cluster.
The next sections illustrates the architecture of the MP cluster, while the API provides detailed functions that offer more insight into interactions and operations.
API detailed functions
Cluster allocation using a Python thread pool. The worker pool is set up in forkserver mode with a specified number of workers, job timeouts, and wrapper configuration for cluster logging.
Declare task as MpDelayed within the cluster. MpDelayed are instantiated using the mp_delayed_builder wrapper builder. Furthermore, the wrapper provides parameters for the job logger.
Factorize tasks with mp_factorizer.factorize_tasks and add future tasks in the cluster queue. The cluster processes tasks from the queue. Transform MpDelayed with rec_start to MpJob, and calculate task dependencies for each job.
Take as input a list of final MpDelayed and factorize all the dependent tasks that are factorizable.
A task t of the class MpDelayedTask is factorizable if :
Only one task depends on the task t
The task t depends on only one task
If a task t2 is factorizable and depends on a task t1, then a new MpDelayedTask t_new is created with :
t_new.func = factorized_func
t_new.args = [factorized_object]
The task t2 is replaced by the task t_new.
The object factorized_object is of the class FactorizedObject and contains a list of tasks with all the information needed by factorized_func to run the tasks t1 and t2 sequentially (functions of tasks t1 and t2 and arguments of task t1) :
Arguments of t2 that are MpDelayed are replaced by objects PreviousData during factorization. Then, during the run of factorized_func, PreviousData objects will be replaced by output of t1 and then t2 will be computed. Thus, the computing of t_new = t1 + t2 will be treated as a single task by the cluster.
Transform delayed tasks to MpJob and create MpFuture objects to retrieve results.
For each task:
The function transforms args and kwargs into actual data.
Determine the result dependency of the job and verify the status of the ready task.
Append the task to the queue with the ready task status, and updated args and kwargs.
Create MpFuture to share result object, and remove future at the task’s completion.
Create additional futures linked to this task.
At each refresh:
Sleep (refresh time).
Populate the cluster pool with nb_workers tasks based on tasks without dependencies. The remaining tasks are added to the wait_list.
Check for ready results in in_progress_list. Add job with ready results to done_list and map results with statuses in the done_task_results..
Update/remove dependency for each successfully completed job.
Search for next task with higher priority. If jobs that have succeeded depend on tasks in the wait_list, move these jobs to the next_priority_tasks list (ensuring duplicates are removed).
Remove completed jobs from the in_progress_list.
Obtain the lists of ready_list and failed_list jobs.
Filter tasks from the next_priority_tasks based on their presence in the ready_list, and place them into the priority_list.
Calculate nb_ready_task = nb_workers - size(priority_list) to add only nb_ready_task tasks without dependency.
If the priority tasks have completed, proceed with the remaining tasks of the ready_list in their initial order.
Remove failed jobs from the wait_list and copy results to corresponding future, and them remove themselves from task_cache.
Launch tasks nb_workers tasks from priority_list.
Replace jobs with actual data. Launch task. Eliminate launched tasks from the wait_list.
Clean unused future jobs with wrapper done_task_results.
Retrieve the new ready tasks and failed tasks.
A static method evaluates a list of tasks that are ready and lack dependencies, excluding those deemed as initial tasks. The initial tasks of the graph have no priority. In order to enhance disk usage efficiency, the cluster initiates with N initial tasks (where N equals the number of workers), assigning priority to the subsequent connected tasks. After finishing a segment of the task graph, the cluster introduces N new initial tasks to continue the process.
Enable the initiation of all tasks from the orchestrator controller.
Obtain a list of job IDs from the future list.
Substitute MpJob instances in lists or dict with their actual data.
Compute job result dependencies from args and kw_args.
A multiprocessing version of the Dask distributed.future. This class encapsulates data and references to job cluster threads. It also facilitates the sharing of references between jobs and cleaning cache operations.
A custom Exception hook to manage cluster thread exceptions.