#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains the saver registry class
"""
# Standard imports
import logging
import os
# CARS imports
from cars.orchestrator.registry.abstract_registry import (
AbstractCarsDatasetRegistry,
)
[docs]class CarsDatasetsRegistrySaver(AbstractCarsDatasetRegistry):
"""
CarsDatasetsRegistrySaver
This registry manages the saving of arriving future results
"""
def __init__(self, id_generator):
"""
Init function of CarsDatasetsRegistrySaver
:param id_generator: id generator
:type id_generator: IdGenerator
"""
super().__init__(id_generator)
self.registered_cars_datasets_savers = []
[docs] def get_cars_datasets_list(self):
"""
Get a list of registered CarsDataset
:return list of CarsDataset
:rtype: list(CarsDataset)
"""
cars_ds_list = []
for cars_ds_saver in self.registered_cars_datasets_savers:
cars_ds_list.append(cars_ds_saver.cars_ds)
return cars_ds_list
[docs] def cars_dataset_in_registry(self, cars_ds):
"""
Check if a CarsDataset is already registered, return id if exists
:param cars_ds: cars dataset
:type cars_ds: CarsDataset
:return : True if in registry, if of cars dataset
:rtype : Tuple(bool, int)
"""
in_registry = False
registered_id = None
for obj in self.registered_cars_datasets_savers:
if cars_ds == obj.cars_ds:
in_registry = True
registered_id = obj.obj_id
break
return in_registry, registered_id
[docs] def get_cars_ds_saver_corresponding_cars_dataset(self, cars_ds):
"""
Get the SingleCarsDatasetSaver corresponding to given cars dataset
:param cars_ds: cars dataset
:return : single cars dataset saver
:rtype : SingleCarsDatasetSaver
"""
cars_ds_saver = None
for obj in self.registered_cars_datasets_savers:
if cars_ds == obj.cars_ds:
cars_ds_saver = obj
break
return cars_ds_saver
[docs] def get_cars_ds_saver_corresponding_id(self, obj_id):
"""
Get the SingleCarsDatasetSaver corresponding to given id
:param obj_id: cars dataset id
:type obj_id: int
:return : single cars dataset saver
:rtype : SingleCarsDatasetSaver
"""
cars_ds_saver = None
for obj in self.registered_cars_datasets_savers:
if obj_id == obj.obj_id:
cars_ds_saver = obj
break
return cars_ds_saver
[docs] def save(self, future_result):
"""
Save future result
:param future_result: xr.Dataset or pd.DataFrame
"""
obj_id = self.get_future_cars_dataset_id(future_result)
cars_ds_saver = self.get_cars_ds_saver_corresponding_id(obj_id)
if cars_ds_saver is not None:
# save
if future_result is not None:
cars_ds_saver.save(future_result)
else:
logging.debug("Future result tile is None -> not saved")
[docs] def add_file_to_save(
self, file_name, cars_ds, tag=None, dtype=None, nodata=None
):
"""
Add file corresponding to cars_dataset to registered_cars_datasets
:param file_name: file name to save futures to
:type file_name: str
:param cars_ds: CarsDataset to register
:type cars_ds: CarsDataset
:param tag: tag to save
:type tag: str
:param dtype: dtype
:type dtype: str
:param nodata: no data value
:type nodata: float
"""
if not self.cars_dataset_in_registry(cars_ds)[0]:
# Generate_id
new_id = self.id_generator.get_new_id(cars_ds)
# create CarsDataset saver
cars_ds_saver = SingleCarsDatasetSaver(new_id, cars_ds)
# add to list
self.registered_cars_datasets_savers.append(cars_ds_saver)
else:
cars_ds_saver = self.get_cars_ds_saver_corresponding_cars_dataset(
cars_ds
)
# update cars_ds_saver
cars_ds_saver.add_file(file_name, tag=tag, dtype=dtype, nodata=nodata)
[docs] def cleanup(self):
"""
Cleanup function.
Close correctly all opened files.
"""
for obj in self.registered_cars_datasets_savers:
obj.cleanup()
[docs]class SingleCarsDatasetSaver:
"""
SingleCarsDatasetSaver
Structure managing the descriptors of each CarsDataset.
"""
def __init__(self, obj_id, cars_ds):
"""
Init function of SingleCarsDatasetSaver
"""
self.obj_id = obj_id
self.cars_ds = cars_ds
self.file_names = []
self.tags = []
self.dtypes = []
self.nodatas = []
self.descriptors = []
self.already_seen = False
self.count = 0
self.folder_name = None
self.extension = None
[docs] def add_file(self, file_name, tag=None, dtype=None, nodata=None):
"""
Add file to current CarsDatasetSaver
:param file_name: file name to save futures to
:type file_name: str
:param tag: tag to save
:type tag: str
:param dtype: dtype
:type dtype: str
:param nodata: no data value
:type nodata: float
"""
self.file_names.append(file_name)
self.tags.append(tag)
self.dtypes.append(dtype)
self.nodatas.append(nodata)
[docs] def save(self, future_result):
"""
Save future result
:param future_result: xr.Dataset or pandas.DataFrame
"""
try:
if self.cars_ds.dataset_type == "arrays":
if not self.already_seen:
# generate descriptors
for count, file_name in enumerate(self.file_names):
desc = self.cars_ds.generate_descriptor(
future_result,
file_name,
tag=self.tags[count],
dtype=self.dtypes[count],
nodata=self.nodatas[count],
)
self.descriptors.append(desc)
self.already_seen = True
for count, file_name in enumerate(self.file_names):
self.cars_ds.run_save(
future_result,
file_name,
tag=self.tags[count],
descriptor=self.descriptors[count],
)
else:
# type points
if not self.already_seen:
# create tmp_folder
self.folder_name, self.extension = os.path.splitext(
self.file_names[0]
)
if not os.path.exists(self.folder_name):
os.makedirs(self.folder_name)
self.already_seen = True
self.cars_ds.run_save(
future_result,
os.path.join(
self.folder_name, repr(self.count) + self.extension
),
overwrite=not self.already_seen,
)
self.count += 1
except: # pylint: disable=W0702 # noqa: B001, E722
logging.error("Tile not saved")
[docs] def cleanup(self):
"""
Cleanup function
Close properly all opened files
"""
# close raster files
for desc in self.descriptors:
desc.close()
# TODO merge point clouds ?