============ Orchestrator ============ .. role:: raw-html(raw) :format: html :raw-html:`

Orchestrator

` **Goals** The *orchestrator* is the central element of CARS concepts. Its role is to ensure the communication between the *computing technology*, the *applications* and the *CarsDatasets*. **Details** The *orchestrator* is unique and instantiated for each pipeline: .. sourcecode:: python with orchestrator.Orchestrator(distributed_conf=distributed_conf) as cars_orchestrator: It is mainly composed of: * a *cluster* * a *CarsDatasetRegistry* * a dictionary, later saved in a json output file, containing information given by applications .. tabs:: .. tab:: Cluster .. _cluster: **Cluster** The cluster is the component managing access to all the threads available to CARS for its calculations. .. sourcecode:: python class AbstractCluster(metaclass=ABCMeta): ... @abstractmethod def create_task(self, func, nout=1): """ Create task :param func: function :param nout: number of outputs """ @abstractmethod def start_tasks(self, task_list): """ Start all tasks :param task_list: task list """ @abstractmethod def future_iterator(self, future_list): """ Iterator, iterating on computed futures :param future_list: future_list list """ The two main functions are: * `create_task` to declare a task to the cluster. It returns a `delayed` object. * `start_tasks` to compute each task that has been declared. * `future_iterator`: iterate over the `future` objects There are already 3 plugins, each one representing a mode: * *dask* * *local_dask* * *pbs_dask* * *slurm_dask* * *mp* (for mutliprocessing) * *sequential* : (note: in this mode, `delayed` objects do not exist. They will instead directly be of type `Xarray.dataset` or `Panda.Dataframe`) .. tabs:: .. tab:: Multiprocessing Cluster .. include:: cluster_mp.rst .. tab:: Dask Cluster .. tab:: Sequential Cluster .. tab:: Registries The *CarsDatasetRegistry* is a class that enables the management of the list of *CarsDatasets* that the user wants to save. It is mainly composed of: * a registry *CarsDataset* list * an id associated to each registered *CarsDataset* There are some functions that allow a user to: * Add new *CarsDataset* to registry * Obtain an ID for a CarsDataset * Find a *CarsDataset* from an ID * Manage saving tile by tile (i.e future by future, related to `dask` terms), by using the `SingleCarsDatasetSaver` that wraps `CarsDataset` save functions. .. tabs:: .. tab:: Saver Registry WIP .. tab:: Replacer Registry WIP .. tab:: Unseen Registry WIP .. tab:: Compute Registry WIP **How it works** 1. Instantiate *orchestrator* before every pipeline with a configuration file that defines cluster mode and output directory .. sourcecode:: python with orchestrator.Orchestrator(distributed_conf=distributed_conf) as cars_orchestrator: *Cluster* and *CarsDatasetRegistry* are created .. sourcecode:: python def __init__(self, distributed_conf=None): """ Init function of Orchestrator. Creates Cluster and Registry for CarsDatasets :param distributed_conf: configuration of distribution """ # out_dir self.out_dir = None if "out_dir" in distributed_conf: self.out_dir = distributed_conf["out_dir"] else: logging.error("No out_dir defined") self.nb_workers = 1 if "nb_workers" in distributed_conf: self.nb_workers = distributed_conf["nb_workers"] # init cluster self.cluster = AbstractCluster( # pylint: disable=E0110 distributed_conf ) # init CarsDataset savers registry self.cars_ds_savers_registry = CarsDatasetsRegistry() # init saving lists self.cars_ds_list = [] # outjson self.out_json = {} 2. *Orchestrator* is used in every applications which can add *CarsDataset* to save (*orchestrator* interacts with *CarsDatasetRegistry*) .. sourcecode:: python def add_to_save_lists( self, file_name, tag, cars_ds, dtype="float32", nodata=0 ): """ Save file to list in order to be saved later :param file_name: file name :param tag: tag :param cars_ds: cars dataset to register """ self.cars_ds_savers_registry.add_file_to_save( file_name, cars_ds, tag=tag, dtype=dtype, nodata=nodata ) if cars_ds not in self.cars_ds_list: self.cars_ds_list.append(cars_ds) 3. *Orchestrator* can be used to obtain *CarsDataset* ID (see application) .. sourcecode:: python def get_saving_infos(self, cars_ds_list): """ Get saving infos of given cars datasets :param cars_ds_list: list of cars datasets :type cars_ds_list: list[CarsDataset] :return : list of saving infos :rtype: list[dict] """ saving_infos = [] for cars_ds in cars_ds_list: saving_infos.append( self.cars_ds_savers_registry.get_saving_infos(cars_ds) ) return saving_infos 4. At the end of the pipeline, the `__exit__` function is called automatically. It computes all `delayed` needed for saving *CarsDataset* using `cluster.start_tasks` function` that returns `future` objects. And the `save` function of *CarsDatasetRegistry* is called for saving by iterating on `future` objects. .. sourcecode:: python def __exit__(self, exc_type, exc_value, traceback): """ Function run on exit. Compute cluster tasks, save futures to be saved, and cleanup cluster and files """ # run compute and save files logging.info("Compute delayed ...") # Flatten to list delayed_objects = flatten_object( self.cars_ds_savers_registry.get_cars_datasets_list() ) # Compute delayed future_objects = self.cluster.start_tasks(delayed_objects) # Save objects when they are computed logging.info("Wait for futures results ...") pbar = tqdm(total=len(future_objects), desc="Processing Futures ...") for future_obj in tqdm(self.cluster.future_iterator(future_objects)): # get corresponding CarsDataset and save tile if future_obj is not None: self.cars_ds_savers_registry.save(future_obj) else: logging.debug("None tile : not saved") pbar.update() # close files logging.info("Close files ...") self.cars_ds_savers_registry.cleanup() # close cluster logging.info("Close cluster ...") self.cluster.cleanup()