Source code for cars.orchestrator.cluster.abstract_dask_cluster

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Contains abstract function for abstract dask Cluster
"""

# Standard imports
import logging
import os
import time

# Third party imports
from abc import abstractmethod

import dask
import numpy as np
import psutil
import xarray as xr
import yaml
from dask.config import global_config as global_dask_config
from dask.config import set as dask_config_set
from dask.delayed import Delayed
from dask.distributed import as_completed
from dask.sizeof import sizeof as dask_sizeof
from distributed.diagnostics.plugin import WorkerPlugin
from distributed.utils import CancelledError

from cars.core import cars_logging

# CARS imports
from cars.orchestrator.cluster import abstract_cluster


[docs]class AbstractDaskCluster( abstract_cluster.AbstractCluster ): # pylint: disable=R0902 """ AbstractDaskCluster """ def __init__(self, conf_cluster, out_dir, launch_worker=True): """ Init function of AbstractDaskCluster :param conf_cluster: configuration for cluster """ # call parent init super().__init__(conf_cluster, out_dir, launch_worker=launch_worker) # retrieve parameters self.nb_workers = self.checked_conf_cluster["nb_workers"] self.task_timeout = self.checked_conf_cluster["task_timeout"] self.walltime = self.checked_conf_cluster["walltime"] self.use_memory_logger = self.checked_conf_cluster["use_memory_logger"] self.config_name = self.checked_conf_cluster["config_name"] self.profiling = self.checked_conf_cluster["profiling"] self.launch_worker = launch_worker self.activate_dashboard = self.checked_conf_cluster[ "activate_dashboard" ] self.python = self.checked_conf_cluster["python"] if self.checked_conf_cluster["mode"] == "slurm_dask": self.account = self.checked_conf_cluster["account"] self.qos = self.checked_conf_cluster["qos"] if self.launch_worker: # Set DASK CARS specific config # TODO: update with adequate configuration through tests set_config() # Save dask config used save_config(self.out_dir, "dask_config_" + self.config_name) # Create cluster self.cluster, self.client = self.start_dask_cluster() # Add plugin to monitor memory of workers if self.use_memory_logger: plugin = ComputeDSMMemoryLogger(self.out_dir) self.client.register_worker_plugin(plugin)
[docs] @abstractmethod def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """
[docs] @abstractmethod def start_dask_cluster(self): """ Start dask cluster """
[docs] def create_task_wrapped(self, func, nout=1): """ Create task :param func: function :param nout: number of outputs """ return dask.delayed( cars_logging.wrap_logger(func, self.worker_log_dir, self.log_level), nout=nout, )
[docs] def get_delayed_type(self): """ Get delayed type """ return Delayed
[docs] def start_tasks(self, task_list): """ Start all tasks :param task_list: task list """ return self.client.compute(task_list)
[docs] def scatter(self, data, broadcast=True): """ Distribute data through workers :param data: task data """ return self.client.scatter(data, broadcast=broadcast)
[docs] def future_iterator(self, future_list, timeout=None): """ Start all tasks :param future_list: future_list list """ return DaskFutureIterator(future_list, timeout=timeout)
[docs]class DaskFutureIterator: """ iterator on dask futures, similar to as_completed Only returns the actual results, delete the future after usage """ def __init__(self, future_list, timeout=None): # pylint: disable=W0613 # TODO: python 3.9: add timeout=timeout as parameter self.dask_a_c = as_completed(future_list, with_results=True) self.prev = None
[docs] def __iter__(self): return self
[docs] def __next__(self): try: fut, res = self.dask_a_c.__next__() except StopIteration as exception: if self.prev is not None: self.prev.cancel() self.prev = None raise exception except dask.distributed.TimeoutError as exception: raise TimeoutError("No tasks available") from exception # release previous future if self.prev is not None: self.prev.cancel() # store current future self.prev = fut if isinstance(res, CancelledError): raise RuntimeError("CancelError from worker {}".format(res)) return res
[docs]def set_config(): """ Set particular DASK config such as: - scheduler """ # TODO: export API to prepare.run and compute_dsm.run() to set scheduler # example mode debug: dask_config_set(scheduler='single-threaded') # example mode multithread: dask_config_set(scheduler='threads') # Here set Multiprocess mode instead multithread because of GIL blocking dask_config_set(scheduler="processes")
[docs]def save_config(output_dir: str, file_name: str): """ Save DASK global config :param output_dir: output directory path :param file_name: output file name """ logging.info( "Save DASK global merged config for debug " "(1: $DASK_DIR if exists, 2: ~/.config/dask/, ... ) " ) # write global merged DASK config in YAML write_yaml_config(global_dask_config, output_dir, file_name)
[docs]def write_yaml_config(yaml_config: dict, output_dir: str, file_name: str): """ Writes a YAML config to disk. TODO: put in global file if needed elsewhere than DASK conf save. :param yaml_config: YAML config to write :param output_dir: output directory path :param file_name: output file name """ # file path where to store the dask config yaml_config_path = os.path.join(output_dir, file_name + ".yaml") with open(yaml_config_path, "w", encoding="utf-8") as yaml_config_file: yaml.dump(yaml_config, yaml_config_file)
[docs]@dask_sizeof.register_lazy("xarray") def register_xarray(): """ Add hook to dask so it correctly estimates memory used by xarray """ @dask_sizeof.register(xr.DataArray) # pylint: disable=unused-variable def sizeof_xarray_dataarray(xarr): """ Inner function for total size of xarray_dataarray """ total_size = dask_sizeof(xarr.values) for __, carray in xarr.coords.items(): total_size += dask_sizeof(carray.values) total_size += dask_sizeof(xarr.attrs) return total_size @dask_sizeof.register(xr.Dataset) # pylint: disable=unused-variable def sizeof_xarray_dataset(xdat): """ Inner function for total size of xarray_dataset """ total_size = 0 for __, varray in xdat.data_vars.items(): total_size += dask_sizeof(varray.values) for __, carray in xdat.coords.items(): total_size += dask_sizeof(carray) total_size += dask_sizeof(xdat.attrs) return total_size
[docs]class ComputeDSMMemoryLogger(WorkerPlugin): """A subclass of WorkerPlugin dedicated to monitoring workers memory This plugin enables two things: - Additional dask log traces (for each worker internal state change): - amount of tasks - associated memory - A numpy data file with memory metrics and timing """ def __init__(self, outdir): """ Constructor :param outdir: output directory :type outdir: string """ self.outdir = outdir
[docs] def setup(self, worker): """ Associate plugin with a worker :param worker: The worker to associate the plugin with """ # Pylint Exception : Inherited attributes outside __init__ # pylint: disable=attribute-defined-outside-init self.worker = worker self.name = worker.name # Measure plugin registration time self.start_time = time.time() # Data will hold the memory traces as numpy array self.data = [[0, 0, 0, 0]]
[docs] def transition(self, key, start, finish, **kwargs): """ Callback when worker changes internal state """ # TODO Pylint Exception : Inherited attributes outside __init__ # pylint: disable=attribute-defined-outside-init # Define cumulants total_in_memory = 0 total_nbytes = 0 # Measure elapsed time for the state change elapsed_time = time.time() - self.start_time # Walk the worker known memory for task_key in self.worker.state.tasks.keys(): task_size = self.worker.state.tasks[task_key].get_nbytes() total_in_memory += task_size total_nbytes += 1 # Use psutil to capture python process memory as well process = psutil.Process(os.getpid()) process_memory = process.memory_info().rss # Update data records self.data = np.concatenate( ( self.data, np.array( [ [ elapsed_time, total_in_memory, total_nbytes, process_memory, ] ] ), ) ) # Convert nbytes size for logger total_nbytes = float(total_nbytes) / 1000000 process_memory = float(process_memory) / 1000000 # Log memory state logging.info( "Memory report: data created = {} ({} Mb), " "python process memory = {} Mb".format( total_in_memory, total_nbytes, process_memory, ) ) # Save data records in npy file # TODO: Save only every x seconds ? file = os.path.join( self.outdir, "dask_log", "memory_" + repr(self.name) + ".npy" ) np.save(file, self.data)