#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the orchestrator class
"""
# pylint: disable=too-many-lines
import collections
import logging
# Standard imports
import multiprocessing
import os
import platform
import re
import shutil
import subprocess
import sys
# Third party imports
import tempfile
import threading
import time
import traceback
import pandas
import psutil
import xarray
from tqdm import tqdm
from cars.core import constants as cst
# CARS imports
from cars.core.cars_logging import add_progress_message
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset
from cars.orchestrator import achievement_tracker
from cars.orchestrator.cluster.abstract_cluster import AbstractCluster
from cars.orchestrator.cluster.log_wrapper import cars_profile
from cars.orchestrator.orchestrator_constants import (
CARS_DATASET_KEY,
CARS_DS_COL,
CARS_DS_ROW,
)
from cars.orchestrator.registry import compute_registry
from cars.orchestrator.registry import id_generator as id_gen
from cars.orchestrator.registry import replacer_registry, saver_registry
from cars.orchestrator.tiles_profiler import TileProfiler
SYS_PLATFORM = platform.system().lower()
IS_WIN = "windows" == SYS_PLATFORM
RAM_THRESHOLD_MB = 500
RAM_CHECK_SLEEP_TIME = 5
[docs]class Orchestrator:
"""
Orchestrator
"""
# pylint: disable=too-many-instance-attributes
# flake8: noqa: C901
def __init__(
self,
orchestrator_conf=None,
out_dir=None,
launch_worker=True,
out_json_path=None,
):
"""
Init function of Orchestrator.
Creates Cluster and Registry for CarsDatasets
:param orchestrator_conf: configuration of distribution
"""
# init list of path to clean at the end
self.tmp_dir_list = []
# out_dir
if out_dir is not None:
self.out_dir = out_dir
else:
self.out_dir = tempfile.mkdtemp()
self.add_to_clean(self.out_dir)
logging.debug("No out_dir defined")
self.launch_worker = launch_worker
# overload orchestrator_conf
if orchestrator_conf is None or (
"mode" in orchestrator_conf and orchestrator_conf["mode"] == "auto"
):
if orchestrator_conf is None:
logging.info(
"No orchestrator configuration given: auto mode is used"
)
logging.info(
"Auto mode is used for orchestrator: "
"number of workers and memory allocated per worker "
"will be set automatically"
)
if orchestrator_conf is not None and len(orchestrator_conf) > 1:
logging.warning(
"Auto mode is used for orchestator: "
"parameters set by user are ignored"
)
# Compute parameters for auto mode
nb_workers, max_ram_per_worker = compute_conf_auto_mode(IS_WIN)
orchestrator_conf = {
"mode": "multiprocessing",
"nb_workers": nb_workers,
"max_ram_per_worker": max_ram_per_worker,
}
self.orchestrator_conf = orchestrator_conf
# init cluster
self.cluster = AbstractCluster( # pylint: disable=E0110
orchestrator_conf, self.out_dir, launch_worker=self.launch_worker
)
self.conf = self.cluster.get_conf()
self.task_timeout = self.conf.get("task_timeout", 600)
# Init IdGenerator
self.id_generator = id_gen.IdGenerator()
# init CarsDataset savers registry
self.cars_ds_savers_registry = saver_registry.CarsDatasetsRegistrySaver(
self.id_generator
)
# init CarsDataset replacement registry
self.cars_ds_replacer_registry = (
replacer_registry.CarsDatasetRegistryReplacer(self.id_generator)
)
# init CarsDataset compute registry
self.cars_ds_compute_registry = (
compute_registry.CarsDatasetRegistryCompute(self.id_generator)
)
# Achievement tracker
self.achievement_tracker = achievement_tracker.AchievementTracker()
# init tile profiler
self.dir_tile_profiling = os.path.join(
self.out_dir, "dump_dir", "tile_processing"
)
if not os.path.exists(self.dir_tile_profiling):
os.makedirs(self.dir_tile_profiling)
self.tile_profiler = TileProfiler(
self.dir_tile_profiling,
self.cars_ds_savers_registry,
self.cars_ds_replacer_registry,
)
# init cars_ds_names_info for pbar printing
self.cars_ds_names_info = []
# outjson
self.out_json_path = out_json_path
if self.out_json_path is None:
os.path.join(self.out_dir, "metadata.json")
self.out_json = {}
# product index file
self.product_index = {}
# Start tread used in ram check
ram_check_thread = threading.Thread(target=check_ram_usage)
ram_check_thread.daemon = True
ram_check_thread.start()
[docs] def add_to_clean(self, tmp_dir):
self.tmp_dir_list.append(tmp_dir)
[docs] def get_conf(self):
"""
Get orchestrator conf
:return: orchestrator conf
"""
return self.conf
[docs] def add_to_save_lists(
self,
file_name,
tag,
cars_ds,
dtype="float32",
nodata=0,
cars_ds_name=None,
optional_data=False,
save_by_pair=False,
):
"""
Save file to list in order to be saved later
:param file_name: file name
:param tag: tag
:param cars_ds: cars dataset to register
:param cars_ds_name: name corresponding to CarsDataset,
for information during logging
:param optional_data: True if the data is optionnal
:type optional_data: bool
:param save_by_pair: True if data by pair
:type save_by_pair: bool
"""
self.cars_ds_savers_registry.add_file_to_save(
file_name,
cars_ds,
tag=tag,
dtype=dtype,
nodata=nodata,
optional_data=optional_data,
save_by_pair=save_by_pair,
)
# add name if exists
if cars_ds_name is not None:
self.cars_ds_names_info.append(cars_ds_name)
# add to tracking
self.achievement_tracker.track(
cars_ds, self.get_saving_infos([cars_ds])[0][CARS_DATASET_KEY]
)
[docs] def add_to_replace_lists(self, cars_ds, cars_ds_name=None):
"""
Add CarsDataset to replacing Registry
:param cars_ds: CarsDataset to replace
:type cars_ds: CarsDataset
:param cars_ds_name: name corresponding to CarsDataset,
for information during logging
"""
self.cars_ds_replacer_registry.add_cars_ds_to_replace(cars_ds)
# add name if exists
if cars_ds_name is not None:
self.cars_ds_names_info.append(cars_ds_name)
# add to tracking
self.achievement_tracker.track(
cars_ds, self.get_saving_infos([cars_ds])[0][CARS_DATASET_KEY]
)
[docs] def add_to_compute_lists(self, cars_ds, cars_ds_name=None):
"""
Add CarsDataset to compute Registry: computed, but not used
in main process
:param cars_ds: CarsDataset to comput
:type cars_ds: CarsDataset
:param cars_ds_name: name corresponding to CarsDataset,
for information during logging
"""
self.cars_ds_compute_registry.add_cars_ds_to_compute(cars_ds)
# add name if exists
if cars_ds_name is not None:
self.cars_ds_names_info.append(cars_ds_name)
# add to tracking
self.achievement_tracker.track(
cars_ds, self.get_saving_infos([cars_ds])[0][CARS_DATASET_KEY]
)
[docs] def save_out_json(self):
"""
Check out_json and save it to file
"""
# TODO check schema ?
# dump file
if self.out_json_path is not None:
cars_dataset.save_dict(
self.out_json, self.out_json_path, safe_save=True
)
[docs] def save_index(self):
"""
Save all product index files
"""
for product, index in self.product_index.items():
index_directory = os.path.join(self.out_dir, product)
safe_makedirs(index_directory)
cars_dataset.save_dict(
index,
os.path.join(index_directory, "index.json"),
safe_save=True,
)
[docs] def update_out_info(self, new_dict):
"""
Update self.out_json with new dict
:param new_dict: dict to merge
:type new_dict: dict
"""
# TODO merge with safe creation of new keys of application
# when 2 same applications are used
merge_dicts(self.out_json, new_dict)
[docs] def update_index(self, new_dict):
"""
Update self.product_index with new dict
:param new_dict: dict to merge
:type new_dict: dict
"""
merge_dicts(self.product_index, new_dict)
[docs] def get_saving_infos(self, cars_ds_list):
"""
Get saving infos of given cars datasets
:param cars_ds_list: list of cars datasets
:type cars_ds_list: list[CarsDataset]
:return : list of saving infos
:rtype: list[dict]
"""
saving_infos = []
for cars_ds in cars_ds_list:
saving_infos.append(self.id_generator.get_saving_infos(cars_ds))
return saving_infos
[docs] def get_data(self, tag, future_object):
"""
Get data already on disk corresponding to window of object
:param tag: tag
:type tag: str
:param future_object: object
:type future_object: xarray Dataset
:return: data on disk corresponding to tag
:rtype: np.ndarray
"""
data = None
# Get descriptor if exists
obj_id = self.cars_ds_savers_registry.get_future_cars_dataset_id(
future_object
)
cars_ds_saver = (
self.cars_ds_savers_registry.get_cars_ds_saver_corresponding_id(
obj_id
)
)
if len(cars_ds_saver.descriptors) == 0 or tag not in cars_ds_saver.tags:
# nothing is written yet
return data, None
index = cars_ds_saver.tags.index(tag)
descriptor = cars_ds_saver.descriptors[index]
nodata = cars_ds_saver.nodatas[index]
# Get window
window = cars_dataset.get_window_dataset(future_object)
rio_window = cars_dataset.generate_rasterio_window(window)
# Read data window
# Read data window
data = descriptor.read(window=rio_window)
return data, nodata
[docs] def compute_futures(self, only_remaining_delayed=None):
"""
Compute all futures from regitries
:param only_remaining_delayed: list of delayed if second run
"""
# save json
if self.launch_worker:
self.save_out_json()
self.save_index()
# run compute and save files
logging.info("Compute delayed ...")
# Flatten to list
if only_remaining_delayed is None:
delayed_objects = flatten_object(
self.cars_ds_savers_registry.get_cars_datasets_list()
+ self.cars_ds_replacer_registry.get_cars_datasets_list()
+ self.cars_ds_compute_registry.get_cars_datasets_list(),
self.cluster.get_delayed_type(),
)
else:
delayed_objects = only_remaining_delayed
if len(delayed_objects) == 0:
logging.info("No Object to compute")
return
# Compute delayed
future_objects = self.cluster.start_tasks(delayed_objects)
# Save objects when they are computed
logging.info("Wait for futures results ...")
add_progress_message(
"Data list to process: [ {} ] ...".format(
" , ".join(list(set(self.cars_ds_names_info)))
)
)
tqdm_message = "Tiles processing: "
# if loglevel > PROGRESS level tqdm display the data list
if logging.getLogger().getEffectiveLevel() > 21:
tqdm_message = "Processing Tiles: [ {} ] ...".format(
" , ".join(list(set(self.cars_ds_names_info)))
)
pbar = tqdm(
total=len(future_objects),
desc=tqdm_message,
position=0,
leave=True,
file=sys.stdout,
)
nb_tiles_computed = 0
interval_was_cropped = False
try:
for future_obj in self.cluster.future_iterator(
future_objects, timeout=self.task_timeout
):
# get corresponding CarsDataset and save tile
if future_obj is not None:
if get_disparity_range_cropped(future_obj):
interval_was_cropped = True
# Apply function if exists
final_function = None
current_cars_ds = (
self.cars_ds_savers_registry.get_cars_ds(future_obj)
)
if current_cars_ds is None:
self.cars_ds_replacer_registry.get_cars_ds(
future_obj
)
if current_cars_ds is not None:
final_function = current_cars_ds.final_function
if final_function is not None:
future_obj = final_function(self, future_obj)
# Save future if needs to
self.cars_ds_savers_registry.save(future_obj)
# Replace future in cars_ds if needs to
self.cars_ds_replacer_registry.replace(future_obj)
# notify tile profiler for new tile
self.tile_profiler.add_tile(future_obj)
# update achievement
self.achievement_tracker.add_tile(future_obj)
nb_tiles_computed += 1
else:
logging.debug("None tile: not saved")
pbar.update()
except TimeoutError:
logging.error("TimeOut")
if interval_was_cropped:
logging.warning(
"Disparity range was cropped in DenseMatching, "
"due to a lack of available memory for estimated"
" disparity range"
)
remaining_tiles = self.achievement_tracker.get_remaining_tiles()
if len(remaining_tiles) > 0:
# Some tiles have not been computed
logging.info(
"{} tiles have not been computed".format(
len(remaining_tiles)
)
)
if only_remaining_delayed is None:
# First try
logging.info("Retry failed tasks ...")
self.reset_cluster()
del pbar
self.compute_futures(only_remaining_delayed=remaining_tiles)
else:
# Second try
logging.error("Pipeline will pursue without failed tiles")
self.cars_ds_replacer_registry.replace_lasting_jobs(
self.cluster.get_delayed_type()
)
self.reset_registries()
if nb_tiles_computed == 0:
logging.warning(
"Result have not been saved because all tiles are None"
)
# close files
logging.info("Close files ...")
self.cars_ds_savers_registry.cleanup()
else:
logging.debug(
"orchestrator launch_worker is False, no metadata.json saved"
)
[docs] def reset_cluster(self):
"""
Reset Cluster
"""
data_to_propagate = self.cluster.data_to_propagate
if self.launch_worker:
self.cluster.cleanup(keep_shared_dir=True)
self.cluster = AbstractCluster( # pylint: disable=E0110
self.orchestrator_conf,
self.out_dir,
launch_worker=self.launch_worker,
data_to_propagate=data_to_propagate,
)
[docs] def reset_registries(self):
"""
Reset registries
"""
# cleanup the current registry before replacing it, to save files
self.cars_ds_savers_registry.cleanup()
# reset registries
# CarsDataset savers registry
self.cars_ds_savers_registry = saver_registry.CarsDatasetsRegistrySaver(
self.id_generator
)
# CarsDataset replacement registry
self.cars_ds_replacer_registry = (
replacer_registry.CarsDatasetRegistryReplacer(self.id_generator)
)
# Compute registry
self.cars_ds_compute_registry = (
compute_registry.CarsDatasetRegistryCompute(self.id_generator)
)
# tile profiler
self.tile_profiler = TileProfiler(
self.dir_tile_profiling,
self.cars_ds_savers_registry,
self.cars_ds_replacer_registry,
)
# achievement tracker
self.achievement_tracker = achievement_tracker.AchievementTracker()
# reset cars_ds names infos
self.cars_ds_names_info = []
@cars_profile(name="Compute futures")
def breakpoint(self):
"""
Breakpoint : compute all delayed, save and replace data
in CarsDatasets
"""
# Compute futures
try:
self.compute_futures()
except Exception as exc:
# reset registries
self.reset_registries()
raise RuntimeError(traceback.format_exc()) from exc
# reset registries
self.reset_registries()
[docs] def __enter__(self):
"""
Function run on enter
"""
return self
[docs] def __exit__(self, exc_type, exc_value, traceback_msg):
"""
Function run on exit.
Compute cluster tasks, save futures to be saved, and cleanup cluster
and files
"""
# Compute futures
self.breakpoint()
# save outjson
# TODO
# TODO : check_json
# close cluster
logging.info("Close cluster ...")
if self.launch_worker:
self.cluster.cleanup()
# # clean tmp dir
for tmp_dir in self.tmp_dir_list:
if tmp_dir is not None and os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
[docs]def merge_dicts(dict1, dict2):
"""
Merge dict2 into dict 1
:param dict1: dict 1
:type dict1: dict
:param dict2: dict 2
:type dict2: dict
"""
for key, value2 in dict2.items():
value1 = dict1.get(key)
if isinstance(value1, collections.abc.Mapping) and isinstance(
value2, collections.abc.Mapping
):
merge_dicts(value1, value2)
else:
dict1[key] = value2
[docs]def flatten_object(cars_ds_list, delayed_type):
"""
Flatten list of CarsDatasets to list of delayed
:param cars_ds_list: list of cars datasets flatten
:type cars_ds_list: list[CarsDataset]
:param delayed_type: type of delayed
:return: list of delayed
:rtype: list[Delayed]
"""
# remove duplicates
cleaned_cars_ds_list = list(dict.fromkeys(cars_ds_list))
# flatten datasets
flattened_objects = []
if len(cleaned_cars_ds_list) == 1 and cleaned_cars_ds_list[0] is None:
return []
# add obj flattened
for cars_ds in cleaned_cars_ds_list:
flattened_objects += [
obj
for obj_list in cars_ds.tiles
for obj in obj_list
if isinstance(obj, delayed_type) and obj is not None
]
return flattened_objects
[docs]def update_saving_infos(saving_info_left, row=None, col=None):
"""
Update saving infos dict with row and col arguments
:param saving_info_left: saving infos
:type saving_info_left: dict
:param row: row
:type row: int
:param col: col
:type col: int
:return: updated saving infos dict
:rtype: dict
"""
full_saving_infos = saving_info_left.copy()
if row is not None:
full_saving_infos[CARS_DS_ROW] = row
if col is not None:
full_saving_infos[CARS_DS_COL] = col
return full_saving_infos
[docs]def get_disparity_range_cropped(obj):
"""
Get CROPPED_DISPARITY_RANGE value in attributes
:param obj: object to look in
:rtype bool
"""
value = False
if isinstance(obj, (xarray.Dataset, pandas.DataFrame)):
obj_attributes = cars_dataset.get_attributes(obj)
if obj_attributes is not None:
value = obj_attributes.get(cst.CROPPED_DISPARITY_RANGE, False)
return value
[docs]def get_slurm_data():
"""
Get slurm data
"""
def get_data(chain, pattern):
"""
Get data from pattern
:param chain: chain of character to parse
:param pattern: pattern to find
:return: found data
"""
match = re.search(pattern, chain)
value = None
if match:
value = match.group(1)
return int(value)
on_slurm = False
slurm_nb_cpu = None
slurm_max_ram = None
try:
sub_res = subprocess.run(
"scontrol show job $SLURM_JOB_ID",
shell=True,
capture_output=True,
text=True,
check=False,
)
slurm_infos = sub_res.stdout
slurm_nb_cpu = get_data(slurm_infos, r"ReqTRES=cpu=(\d+)")
slurm_max_ram = get_data(slurm_infos, r"ReqTRES=cpu=.*?mem=(\d+)")
# convert to Mb
slurm_max_ram *= 1024
logging.info("Available CPUs in SLURM : {}".format(slurm_nb_cpu))
logging.info("Available RAM in SLURM : {}".format(slurm_max_ram))
except Exception as _:
logging.debug("Not on Slurm cluster")
if slurm_nb_cpu is not None and slurm_max_ram is not None:
on_slurm = True
return on_slurm, slurm_nb_cpu, slurm_max_ram
[docs]def compute_conf_auto_mode(is_windows):
"""
Compute confuration to use in auto mode
:param is_windows: True if runs on windows
:type is_windows: bool
"""
on_slurm, nb_cpu_slurm, max_ram_slurm = get_slurm_data()
if on_slurm:
available_cpu = nb_cpu_slurm
else:
available_cpu = (
multiprocessing.cpu_count()
if is_windows
else len(os.sched_getaffinity(0))
)
logging.info("available cpu : {}".format(available_cpu))
if available_cpu == 1:
logging.warning("Only one CPU detected.")
available_cpu = 2
elif available_cpu == 0:
logging.warning("No CPU detected.")
available_cpu = 2
if on_slurm:
ram_to_use = max_ram_slurm
else:
ram_to_use = get_total_ram()
logging.info("total ram : {}".format(ram_to_use))
# use 50% of total ram
ram_to_use *= 0.5
# non configurable
max_ram_per_worker = 2000
possible_workers = int(ram_to_use // max_ram_per_worker)
if possible_workers == 0:
logging.warning("Not enough memory available : failure might occur")
nb_workers_to_use = max(1, min(possible_workers, available_cpu - 1))
logging.info("Number of workers : {}".format(nb_workers_to_use))
logging.info("Max memory per worker : {} MB".format(max_ram_per_worker))
# Check with available ram
available_ram = get_available_ram()
if int(nb_workers_to_use) * int(max_ram_per_worker) > available_ram:
logging.warning(
"CARS will use 50% of total RAM, "
" more than currently available RAM"
)
return int(nb_workers_to_use), int(max_ram_per_worker)
[docs]def get_available_ram():
"""
Get available ram
:return : available ram in Mb
"""
ram = psutil.virtual_memory()
available_ram_mb = ram.available / (1024 * 1024)
return available_ram_mb
[docs]def get_total_ram():
"""
Get total ram
:return : available ram in Mb
"""
ram = psutil.virtual_memory()
total_ram_mb = ram.available / (1024 * 1024)
return total_ram_mb
[docs]def check_ram_usage():
"""
Check RAM usage
"""
while True:
# Get Ram information
available_ram_mb = get_available_ram()
if available_ram_mb < RAM_THRESHOLD_MB:
logging.warning(
"RAM available < {} Mb, available ram: {} Mb."
" Freeze might ocure".format(
RAM_THRESHOLD_MB, int(available_ram_mb)
)
)
time.sleep(RAM_CHECK_SLEEP_TIME)