Source code for cars.orchestrator.cluster.abstract_cluster

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Contains abstract function for Abstract Cluster
"""

# Standard imports
import logging
import os
from abc import ABCMeta, abstractmethod
from typing import Dict

# CARS imports
from cars.conf.input_parameters import ConfigType
from cars.orchestrator.cluster import log_wrapper


[docs]class AbstractCluster(metaclass=ABCMeta): """ AbstractCluster """ # Available cluster modes to instanciate AbstractCluster subclasses. available_modes: Dict = {} # Define abstract attributes # profiling config parameter: mode, loop_testing, memray profiling: ConfigType # cluster mode output directory out_dir: str def __new__( # pylint: disable=W0613 cls, conf_cluster, out_dir, launch_worker=True, data_to_propagate=None ): """ Return the required cluster :raises: - KeyError when the required cluster is not registered :param conf_cluster: configuration for cluster :param out_dir: output directory for results :param launch_worker: launcher of the new worker :return: a cltser object """ cluster_mode = "multiprocessing" if "mode" not in conf_cluster: logging.warning("Cluster mode not defined, default is used") else: cluster_mode = conf_cluster["mode"] if cluster_mode not in cls.available_modes: logging.error("No mode named {} registered".format(cluster_mode)) raise KeyError("No mode named {} registered".format(cluster_mode)) logging.info("The AbstractCluster {} will be used".format(cluster_mode)) return super(AbstractCluster, cls).__new__( cls.available_modes[cluster_mode] )
[docs] @classmethod def register_subclass(cls, *short_names: str): """ Allows to register the subclass with its short name :param short_names: the subclasses to be registered :type short_names: string """ def decorator(subclass): """ Registers the subclass in the available methods :param subclass: the subclass to be registered :type subclass: object """ for short_name in short_names: cls.available_modes[short_name] = subclass return subclass return decorator
def __init__( self, conf_cluster, out_dir, launch_worker=True, data_to_propagate=None ): # pylint: disable=W0613 """ Init function of AbstractCluster :param conf_cluster: configuration for cluster :param data_to_propagate: data to propagate to new cluster if reset :type data_to_propagate: dict """ self.out_dir = out_dir # data to propagate self.data_to_propagate = data_to_propagate self.worker_log_dir = os.path.join(out_dir, "logs", "workers_log") if not os.path.exists(self.worker_log_dir): os.makedirs(self.worker_log_dir) self.log_level = logging.getLogger().getEffectiveLevel() handlers = logging.getLogger().handlers for hand in handlers: if "stdout" == hand.get_name(): self.log_level = hand.level # Check conf self.checked_conf_cluster = self.check_conf(conf_cluster) self.profiling_logger = ( log_wrapper.AbstractLogWrapper( # pylint: disable=E0110 self.checked_conf_cluster["profiling"], out_dir ) ) self.checked_conf_cluster["profiling"] = ( self.profiling_logger.checked_conf_profiling )
[docs] @abstractmethod def get_delayed_type(self): """ Get delayed type """
[docs] @abstractmethod def cleanup(self, **kwargs): """ Cleanup cluster """
[docs] @abstractmethod def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """
[docs] def get_conf(self): """ Get overriden configuration :return: overriden configuration """ return self.checked_conf_cluster
[docs] def create_task(self, func, nout=1): """ Create task :param func: function :param nout: number of outputs """ def create_task_builder(*argv, **kwargs): """ Create task builder to select the type of log according to the configured profiling mode :param argv: list of input arguments :param kwargs: list of named input arguments """ ( wrapper_func, additionnal_kwargs, ) = self.profiling_logger.get_func_args_plus(func) return self.create_task_wrapped(wrapper_func, nout=nout)( *argv, **kwargs, **additionnal_kwargs ) return create_task_builder
[docs] @abstractmethod def create_task_wrapped(self, func, nout=1): """ Create task :param func: function :param nout: number of outputs """
[docs] @abstractmethod def start_tasks(self, task_list): """ Start all tasks :param task_list: task list """
[docs] @abstractmethod def scatter(self, data, broadcast=True): """ Distribute data through workers :param data: task data """
[docs] @abstractmethod def future_iterator(self, future_list, timeout=None): """ Iterator, iterating on computed futures :param future_list: future_list list :param timeout: time to wait for next job """