cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster
Contains abstract function for multiprocessing Cluster
Attributes
Classes
MultiprocessingCluster |
|
multiprocessing version of distributed.future |
Functions
|
Get list of jobs ids in future list |
|
Replace MpJob in list or dict by their real data |
|
Compute dependencies from args and kw_args |
|
Exception hook for cluster thread |
|
Update job to launch list with new priority list and ready list |
|
Compute confuration to use in auto mode |
Get slurm data |
Module Contents
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.SYS_PLATFORM
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.IS_WIN
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.RUN = 0
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.TERMINATE = 1
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.REFRESH_TIME = 0.05
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.job_counter
- class cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.MultiprocessingCluster(conf_cluster, out_dir, log_dir, launch_worker=True, data_to_propagate=None)[source]
Bases:
cars.orchestrator.cluster.abstract_cluster.AbstractClusterMultiprocessingCluster
- out_dir
- log_dir
- nb_workers
- mp_mode
- task_timeout
- max_tasks_per_worker
- dump_to_disk
- per_job_timeout
- factorize_tasks
- launch_worker = True
- tmp_dir = None
- check_conf(conf)[source]
Check configuration
- Parameters:
conf (dict) – configuration to check
- Returns:
overloaded configuration
- Return type:
dict
- cleanup(keep_shared_dir=False)[source]
Cleanup cluster :param keep_shared_dir: do not clean directory of shared objects
- create_task_wrapped(func, nout=1)[source]
Create task
- Parameters:
func – function
nout – number of outputs
- rec_start(delayed_object, memorize)[source]
Record task
- Parameters:
delayed_object (MpDelayed) – delayed object to record
memorize – list of MpDelayed already recorded
- static refresh_task_cache(pool, task_cache, in_queue, per_job_timeout, cl_future_list, nb_workers, wrapper_obj)[source]
Refresh task cache
- Parameters:
task_cache – task cache list
in_queue – queue
per_job_timeout – per job timeout
cl_future_list – current future list used in iterator
nb_workers – number of workers
- static get_ready_failed_tasks(wait_list, dependencies_list, done_task_results)[source]
Return the new ready tasks without constraint and failed tasks
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.get_job_ids_from_futures(future_list)[source]
Get list of jobs ids in future list
- Parameters:
future_list (MpFuture) – list of futures
- Returns:
list of job id
- Return type:
list(int)
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.replace_job_by_data(args_or_kawargs, done_task_results)[source]
Replace MpJob in list or dict by their real data
- Parameters:
args_or_kawargs – list or dict of data
done_task_results – dict of done tasks
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.compute_dependencies(args, kw_args)[source]
Compute dependencies from args and kw_args
- Parameters:
args (list) – arguments
kw_args (dict) – key arguments
- Returns:
dependencies
- Return type:
list
- class cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.MpFutureTask(cluster)[source]
multiprocessing version of distributed.future
- _cluster
- result = None
- _success = None
- event
- job_id
- task_cache
- associated_futures = []
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.log_error_hook(args)[source]
Exception hook for cluster thread
- cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.update_job_id_priority(job_ids_to_launch_prioritized, priority_list, ready_list)[source]
Update job to launch list with new priority list and ready list
- Returns:
updated list