cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster

Contains abstract function for multiprocessing Cluster

Attributes

SYS_PLATFORM

IS_WIN

RUN

TERMINATE

REFRESH_TIME

job_counter

Classes

MultiprocessingCluster

MultiprocessingCluster

MpFutureTask

multiprocessing version of distributed.future

Functions

get_job_ids_from_futures(future_list)

Get list of jobs ids in future list

replace_job_by_data(args_or_kawargs, done_task_results)

Replace MpJob in list or dict by their real data

compute_dependencies(args, kw_args)

Compute dependencies from args and kw_args

log_error_hook(args)

Exception hook for cluster thread

update_job_id_priority(job_ids_to_launch_prioritized, ...)

Update job to launch list with new priority list and ready list

compute_conf_auto_mode(is_windows, max_ram_per_worker)

Compute confuration to use in auto mode

get_slurm_data()

Get slurm data

Module Contents

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.SYS_PLATFORM
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.IS_WIN
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.RUN = 0
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.TERMINATE = 1
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.REFRESH_TIME = 0.05
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.job_counter
class cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.MultiprocessingCluster(conf_cluster, out_dir, log_dir, launch_worker=True, data_to_propagate=None)[source]

Bases: cars.orchestrator.cluster.abstract_cluster.AbstractCluster

MultiprocessingCluster

out_dir
log_dir
nb_workers
mp_mode
task_timeout
max_tasks_per_worker
dump_to_disk
per_job_timeout
factorize_tasks
launch_worker = True
tmp_dir = None
check_conf(conf)[source]

Check configuration

Parameters:

conf (dict) – configuration to check

Returns:

overloaded configuration

Return type:

dict

get_delayed_type()[source]

Get delayed type

cleanup(keep_shared_dir=False)[source]

Cleanup cluster :param keep_shared_dir: do not clean directory of shared objects

scatter(data)[source]

Distribute data through workers

Parameters:

data – data to dump

create_task_wrapped(func, nout=1)[source]

Create task

Parameters:
  • func – function

  • nout – number of outputs

start_tasks(task_list)[source]

Start all tasks

Parameters:

task_list – task list

rec_start(delayed_object, memorize)[source]

Record task

Parameters:
  • delayed_object (MpDelayed) – delayed object to record

  • memorize – list of MpDelayed already recorded

static refresh_task_cache(pool, task_cache, in_queue, per_job_timeout, cl_future_list, nb_workers, wrapper_obj)[source]

Refresh task cache

Parameters:
  • task_cache – task cache list

  • in_queue – queue

  • per_job_timeout – per job timeout

  • cl_future_list – current future list used in iterator

  • nb_workers – number of workers

static get_ready_failed_tasks(wait_list, dependencies_list, done_task_results)[source]

Return the new ready tasks without constraint and failed tasks

static get_tasks_without_deps(dependencies_list, ready_list, nb_ready_task)[source]

Return the list of ready tasks without dependencies and not considered like initial task (dependance = -1)

future_iterator(future_list, timeout=None)[source]

Start all tasks

Parameters:

future_list – future_list list

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.get_job_ids_from_futures(future_list)[source]

Get list of jobs ids in future list

Parameters:

future_list (MpFuture) – list of futures

Returns:

list of job id

Return type:

list(int)

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.replace_job_by_data(args_or_kawargs, done_task_results)[source]

Replace MpJob in list or dict by their real data

Parameters:
  • args_or_kawargs – list or dict of data

  • done_task_results – dict of done tasks

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.compute_dependencies(args, kw_args)[source]

Compute dependencies from args and kw_args

Parameters:
  • args (list) – arguments

  • kw_args (dict) – key arguments

Returns:

dependencies

Return type:

list

class cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.MpFutureTask(cluster)[source]

multiprocessing version of distributed.future

_cluster
result = None
_success = None
event
job_id
task_cache
associated_futures = []
set(obj)[source]

Set result to associated delayed object, and clean cache

Parameters:

obj (tuple(bool, Union(dataset, dataframe))) – result object

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.log_error_hook(args)[source]

Exception hook for cluster thread

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.update_job_id_priority(job_ids_to_launch_prioritized, priority_list, ready_list)[source]

Update job to launch list with new priority list and ready list

Returns:

updated list

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.compute_conf_auto_mode(is_windows, max_ram_per_worker)[source]

Compute confuration to use in auto mode

Parameters:
  • is_windows (bool) – True if runs on windows

  • max_ram_per_worker (int) – max ram per worker in MB

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.get_slurm_data()[source]

Get slurm data