Source code for cars.orchestrator.cluster.local_dask_cluster

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Contains functions for local dask Cluster
"""

# Standard imports
import logging

# Third party imports
from dask.distributed import Client, LocalCluster

# CARS imports
from cars.orchestrator.cluster import abstract_cluster, abstract_dask_cluster
from cars.orchestrator.cluster.dask_cluster_tools import (
    check_configuration,
    create_checker_schema,
)


[docs]@abstract_cluster.AbstractCluster.register_subclass("local_dask") class LocalDaskCluster(abstract_dask_cluster.AbstractDaskCluster): """ LocalDaskCluster """
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ return check_configuration(*create_checker_schema(conf))
[docs] def start_dask_cluster(self): """ Start dask cluster """ return start_local_cluster( self.nb_workers, activate_dashboard=self.activate_dashboard )
[docs] def cleanup(self): """ Cleanup cluster """ stop_local_cluster(self.cluster, self.client) logging.info("Dask cluster closed")
[docs]def start_local_cluster(nb_workers, timeout=600, activate_dashboard=False): """ Start a local cluster :param nb_workers: Number of dask workers :type nb_workers: int :param timeout: Connection timeout :type timeout: int :return: Local cluster and Dask client :rtype: (dask.distributed.LocalCluster, dask.distributed.Client) tuple """ logging.info("Local cluster with {} workers started".format(nb_workers)) if activate_dashboard: dashboard_address = ":0" else: dashboard_address = None cluster = LocalCluster( n_workers=nb_workers, threads_per_worker=1, dashboard_address=dashboard_address, ) client = Client(cluster, timeout=timeout) return cluster, client
[docs]def stop_local_cluster(cluster, client): """ Stop a local cluster :param cluster: Local cluster :type cluster: dask.distributed.LocalCluster :param client: Dask client :type client: dask.distributed.Client """ client.close() cluster.close() logging.info("Local cluster correctly stopped")