cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster

Contains abstract function for multiprocessing Cluster

Module Contents

Classes

MultiprocessingCluster

MultiprocessingCluster

MpFutureTask

multiprocessing version of distributed.future

Functions

get_job_ids_from_futures(future_list)

Get list of jobs ids in future list

replace_job_by_data(args_or_kawargs, done_task_results)

Replace MpJob in list or dict by their real data

compute_dependencies(args, kw_args)

Compute dependencies from args and kw_args

log_error_hook(args)

Exception hook for cluster thread

update_job_id_priority(job_ids_to_launch_prioritized, ...)

Update job to launch list with new priority list and ready list

Attributes

SYS_PLATFORM

IS_WIN

RUN

TERMINATE

REFRESH_TIME

job_counter

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.SYS_PLATFORM
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.IS_WIN
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.RUN = 0
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.TERMINATE = 1
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.REFRESH_TIME = 0.05
cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.job_counter
class cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.MultiprocessingCluster(conf_cluster, out_dir, launch_worker=True)

Bases: cars.orchestrator.cluster.abstract_cluster.AbstractCluster

MultiprocessingCluster

check_conf(conf)

Check configuration

Parameters

conf (dict) – configuration to check

Returns

overloaded configuration

Return type

dict

get_delayed_type()

Get delayed type

cleanup()

Cleanup cluster

scatter(data, broadcast=True)

Distribute data through workers

Parameters

data – task data

create_task_wrapped(func, nout=1)

Create task

Parameters
  • func – function

  • nout – number of outputs

start_tasks(task_list)

Start all tasks

Parameters

task_list – task list

rec_start(delayed_object, memorize)

Record task

Parameters
  • delayed_object (MpDelayed) – delayed object to record

  • memorize – list of MpDelayed already recorded

static refresh_task_cache(pool, task_cache, in_queue, per_job_timeout, cl_future_list, nb_workers, wrapper_obj)

Refresh task cache

Parameters
  • task_cache – task cache list

  • in_queue – queue

  • per_job_timeout – per job timeout

  • cl_future_list – current future list used in iterator

  • nb_workers – number of workers

static get_ready_failed_tasks(wait_list, dependencies_list, done_task_results)

Return the new ready tasks without constraint and failed tasks

static get_tasks_without_deps(dependencies_list, ready_list, nb_ready_task)

Return the list of ready tasks without dependencies and not considered like initial task (dependance = -1)

future_iterator(future_list, timeout=None)

Start all tasks

Parameters

future_list – future_list list

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.get_job_ids_from_futures(future_list)

Get list of jobs ids in future list

Parameters

future_list (MpFuture) – list of futures

Returns

list of job id

Return type

list(int)

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.replace_job_by_data(args_or_kawargs, done_task_results)

Replace MpJob in list or dict by their real data

Parameters
  • args_or_kawargs – list or dict of data

  • done_task_results – dict of done tasks

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.compute_dependencies(args, kw_args)

Compute dependencies from args and kw_args

Parameters
  • args (list) – arguments

  • kw_args (dict) – key arguments

Returns

dependencies

Return type

list

class cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.MpFutureTask(cluster)

multiprocessing version of distributed.future

set(obj)

Set result to associated delayed object, and clean cache

Parameters

obj (tuple(bool, Union(dataset, dataframe))) – result object

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.log_error_hook(args)

Exception hook for cluster thread

cars.orchestrator.cluster.mp_cluster.multiprocessing_cluster.update_job_id_priority(job_ids_to_launch_prioritized, priority_list, ready_list)

Update job to launch list with new priority list and ready list

Returns

updated list