#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Contains functions for wrapper disk
"""
# Standard imports
import logging
import os
import pickle
import shutil
from abc import ABCMeta, abstractmethod
from multiprocessing.pool import ThreadPool
import pandas
# Third party imports
import xarray as xr
# CARS imports
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset, cars_dict
from cars.orchestrator.cluster.mp_cluster.mp_tools import replace_data
# Third party imports
DENSE_NAME = "DenseDO"
SPARSE_NAME = "SparseDO"
DICT_NAME = "DictDO"
SHARED_NAME = "SharedDO"
[docs]class AbstractWrapper(metaclass=ABCMeta):
"""
AbstractWrapper
"""
[docs] @abstractmethod
def get_obj(self, obj):
"""
Get Object
:param obj: object to transform
:return: object
"""
[docs] @abstractmethod
def get_function_and_kwargs(self, func, kwargs, nout=1):
"""
Get function to apply and overloaded key arguments
:param func: function to run
:param kwargs: key arguments of func
:param nout: number of outputs
:return: function to apply, overloaded key arguments
"""
[docs] @abstractmethod
def cleanup(self, **kwargs):
"""
Cleanup tmp_dir
"""
[docs] @abstractmethod
def cleanup_future_res(self, future_res):
"""
Cleanup future result
:param future_res: future result to clean
"""
[docs]class WrapperNone(AbstractWrapper):
"""
AbstractWrapper
"""
def __init__(self, tmp_dir):
"""
Init function of WrapperDisk
:param tmp_dir: temporary directory
"""
[docs] def get_obj(self, obj):
"""
Get Object
:param obj: object to transform
:return: object
"""
return obj
[docs] def get_function_and_kwargs(self, func, kwargs, nout=1):
"""
Get function to apply and overloaded key arguments
:param func: function to run
:param kwargs: key arguments of func
:param nout: number of outputs
:return: function to apply, overloaded key arguments
"""
# apply disk wrapper
new_func = none_wrapper_fun
# Get overloaded key arguments
new_kwargs = kwargs
new_kwargs["fun"] = func
return new_func, kwargs
[docs] def cleanup(self, **kwargs):
"""
Cleanup tmp_dir
"""
[docs] def cleanup_future_res(self, future_res):
"""
Cleanup future result
:param future_res: future result to clean
"""
del future_res
[docs]class WrapperDisk(AbstractWrapper):
"""
WrapperDisk
"""
def __init__(self, tmp_dir):
"""
Init function of WrapperDisk
:param tmp_dir: temporary directory
"""
# Directory for data passing from a wrapper to another
self.tmp_dir = os.path.join(tmp_dir, "tmp")
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
# Directory for data shared by multiple wrappers
self.shared_dir = os.path.join(tmp_dir, "shared")
if not os.path.exists(self.shared_dir):
os.makedirs(self.shared_dir)
self.current_object_id = 0
# Create a thead pool for removing data
self.removing_pool = ThreadPool(1)
[docs] def cleanup(self, keep_shared_dir=False):
"""
Cleanup tmp_dir
:param keep_shared_dir: do not clean directory of shared objects
"""
logging.info("Clean removing thread pool ...")
self.removing_pool.close()
self.removing_pool.join()
logging.info("Clean tmp directory ...")
removing_disk_data(self.tmp_dir)
if not keep_shared_dir:
logging.info("Clean shared directory ...")
removing_disk_data(self.shared_dir)
[docs] def cleanup_future_res(self, future_res):
"""
Cleanup future result
:param future_res: future result to clean
"""
if isinstance(future_res, tuple):
for future_res_i in future_res:
if is_dumped_object(future_res_i):
self.removing_pool.apply_async(
removing_disk_data, args=[future_res_i]
)
else:
if is_dumped_object(future_res):
self.removing_pool.apply_async(
removing_disk_data, args=[future_res]
)
[docs] def get_function_and_kwargs(self, func, kwargs, nout=1):
"""
Get function to apply and overloaded key arguments
:param func: function to run
:param kwargs: key arguments of func
:param nout: number of outputs
:return: function to apply, overloaded key arguments
"""
# apply disk wrapper
new_func = disk_wrapper_fun
# Get overloaded key arguments
# Create ids
id_list = []
for _ in range(nout):
id_list.append(self.current_object_id)
self.current_object_id += 1
new_kwargs = kwargs
new_kwargs["id_list"] = id_list
new_kwargs["fun"] = func
new_kwargs["tmp_dir"] = self.tmp_dir
return new_func, new_kwargs
[docs] def get_obj(self, obj):
"""
Get Object
:param obj: object to transform
:return: object
"""
res = load(obj)
return res
[docs] def scatter_obj(self, obj):
"""
Distribute data through workers
:param obj: object to dump
"""
directory = os.path.join(
self.shared_dir, SHARED_NAME + "_" + repr(self.current_object_id)
)
safe_makedirs(directory)
self.current_object_id += 1
path = os.path.join(directory, "obj")
with open(path, "wb") as handle:
pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)
return path
[docs]def removing_disk_data(path):
"""
Remove directory from disk
:param path: path to delete
"""
shutil.rmtree(path)
[docs]def none_wrapper_fun(*argv, **kwargs):
"""
Create a wrapper for functionn running it
:param argv: args of func
:param kwargs: kwargs of func
:return: path to results
"""
func = kwargs["fun"]
kwargs.pop("fun")
return func(*argv, **kwargs)
[docs]def disk_wrapper_fun(*argv, **kwargs):
"""
Create a wrapper for function
:param argv: args of func
:param kwargs: kwargs of func
:return: path to results
"""
# Get function to wrap and id_list
try:
id_list = kwargs["id_list"]
func = kwargs["fun"]
tmp_dir = kwargs["tmp_dir"]
kwargs.pop("id_list")
kwargs.pop("fun")
kwargs.pop("tmp_dir")
except Exception as exc: # pylint: disable=W0702 # noqa: B001, E722
raise RuntimeError(
"Failed in unwrapping. \n Args: {}, \n Kwargs: {}\n".format(
argv, kwargs
)
) from exc
# load args
loaded_argv = load_args_or_kwargs(argv)
loaded_kwargs = load_args_or_kwargs(kwargs)
# call function
res = func(*loaded_argv[:], **loaded_kwargs)
if res is not None:
to_disk_res = dump(res, tmp_dir, id_list)
else:
to_disk_res = res
return to_disk_res
[docs]def load_args_or_kwargs(args_or_kwargs):
"""
Load args or kwargs from disk to memory
:param args_or_kwargs: args or kwargs of func
:return: new args
"""
def transform_path_to_obj(obj):
"""
Transform path to object
:param obj: object
"""
res = obj
if is_dumped_object(obj):
res = load(obj)
return res
# replace data
return replace_data(args_or_kwargs, transform_path_to_obj)
[docs]def is_dumped_object(obj):
"""
Check if a given object is dumped
:param obj: object
:return: is dumped
:rtype: bool
"""
is_dumped = False
if isinstance(obj, str):
if (
DENSE_NAME in obj
or SPARSE_NAME in obj
or DICT_NAME in obj
or SHARED_NAME in obj
):
is_dumped = True
return is_dumped
[docs]def load(path):
"""
Load object from disk
:param path: path
:type path: str
:return: object
"""
if path is not None:
obj = path
if DENSE_NAME in path:
obj = cars_dataset.CarsDataset("arrays").load_single_tile(path)
elif SPARSE_NAME in path:
obj = cars_dataset.CarsDataset("points").load_single_tile(path)
elif DICT_NAME in path:
obj = cars_dataset.CarsDataset("dict").load_single_tile(path)
elif SHARED_NAME in path:
obj = load_shared_data(path)
else:
logging.warning(
"Not a dumped arrays or points or dict or shared data"
)
else:
obj = None
return obj
[docs]def load_shared_data(path):
"""
Load shared object from disk
:param path: path
:type path: str
:return: object
"""
with open(path, "rb") as handle:
obj = pickle.load(handle)
return obj
[docs]def dump_single_object(obj, path):
"""
Dump object to disk
:param path: path
:type path: str
"""
if isinstance(obj, xr.Dataset):
# is from array
cars_dataset.CarsDataset("arrays").save_single_tile(obj, path)
elif isinstance(obj, pandas.DataFrame):
# is from points
cars_dataset.CarsDataset("points").save_single_tile(obj, path)
elif isinstance(obj, cars_dict.CarsDict):
# is from points
cars_dataset.CarsDataset("dict").save_single_tile(obj, path)
else:
raise TypeError("Not an arrays or points or dict")
[docs]def create_path(obj, tmp_dir, id_num):
"""
Create path where to dump object
:param tmp_dir: tmp_dir
:param id_num: id of object
:return: path
"""
path = None
if isinstance(obj, xr.Dataset):
# is from array
path = DENSE_NAME
elif isinstance(obj, pandas.DataFrame):
# is from points
path = SPARSE_NAME
elif isinstance(obj, cars_dict.CarsDict):
# is from dict
path = DICT_NAME
else:
logging.warning("Not an arrays or points or dict")
path = obj
path = os.path.join(tmp_dir, path + "_" + repr(id_num))
return path
[docs]def dump(res, tmp_dir, id_list):
"""
Dump results to tmp_dir, according to ids
:param res: objects to dump
:param tmp_dir: tmp_dir
:param id_list: list of ids of objects
:return: path
"""
paths = None
if len(id_list) > 1:
paths = []
for i, single_id in enumerate(id_list):
if res[i] is not None:
path = create_path(res[i], tmp_dir, single_id)
dump_single_object(res[i], path)
paths.append(path)
else:
paths.append(None)
paths = (*paths,)
else:
paths = create_path(res, tmp_dir, id_list[0])
dump_single_object(res, paths)
return paths