#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=too-many-lines
"""
cars_dataset module:
"""
import copy
import logging
import math
# Standard imports
import os
import pickle
from typing import Dict
# Third party imports
import numpy as np
import pandas
import pyproj
import rasterio as rio
import xarray as xr
import yaml
from rasterio.profiles import DefaultGTiffProfile
from rasterio.windows import Window
# CARS imports
from cars.core import constants as cst
from cars.core import outputs
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dict, dataframe_converter
# cars dataset dtype
CARS_DS_TYPE_ARRAY = "arrays"
CARS_DS_TYPE_POINTS = "points"
CARS_DS_TYPE_DICT = "dict"
# cars_dataset names
TILES_INFO_FILE = "tiles_info.yaml"
OVERLAP_FILE = "overlaps.npy"
GRID_FILE = "grid.npy"
PROFILE_FILE = "profile.json"
# single tile names
ATTRIBUTE_FILE = "attributes.yaml"
DATASET_FILE = "dataset"
DATAFRAME_FILE = "dataframe.csv"
CARSDICT_FILE = "cars_dict"
PROFILE = "profile"
WINDOW = "window"
OVERLAPS = "overlaps"
ATTRIBUTES = "attributes"
SAVING_INFO = "saving_info"
[docs]
class CarsDataset:
"""
CarsDataset.
Internal CARS structure for organazing tiles
(xr.Datasets or pd.DataFrames).
"""
def __init__(self, dataset_type, load_from_disk=None, name="unknown"):
"""
Init function of CarsDataset.
If a path is provided, restore CarsDataset saved on disk.
:param dataset_type: type of dataset : 'arrays' or 'points'
:type dataset_type: str
:param load_from_disk: path to saved CarsDataset
:type load_from_disk: str
"""
self.dataset_type = dataset_type
if dataset_type not in [
CARS_DS_TYPE_ARRAY,
CARS_DS_TYPE_POINTS,
CARS_DS_TYPE_DICT,
]:
raise ValueError("wrong dataset type")
self.tiles = None
self.tiles_info = {}
self._tiling_grid = None
self.overlaps = None
self.attributes = {}
self.name = name
# Method to apply at the reception of tile
self.final_function = None
if load_from_disk is not None:
self.load_cars_dataset_from_disk(load_from_disk)
[docs]
def __repr__(self):
"""
Repr function
:return: printable self CarsDataset
"""
return self.custom_print()
[docs]
def __str__(self):
"""
Str function
:return: printable self CarsDataset
"""
return self.custom_print()
[docs]
def custom_print(self):
"""
Return string of self
:return: printable self
"""
res = str(self.__class__) + ": \n" "dataset_type: " + str(
self.dataset_type
) + "\n" + "shape: " + str(self.shape) + "\n" + "tiling_grid: " + str(
self._tiling_grid
) + "\n" + "overlaps: " + str(
self.overlaps
) + "\n" + "tiles_info: " + str(
self.tiles_info
) + "\n" + "attributes: " + str(
self.attributes
) + "\n" + "tiles:" + str(
self.tiles
)
return res
@property
def shape(self):
"""
Return the shape of tiling grid (nb_row, nb_col)
:return: shape of grid
"""
return self.tiling_grid.shape[0], self.tiling_grid.shape[1]
@property
def tiling_grid(self):
"""
Tiling grid, containing pixel windows of tiles
:return: tiling grid, of shape [N, M, 4],
containing [row_min, row_max, col_min, col_max]
:rtype: np.ndarray
"""
return self._tiling_grid
@tiling_grid.setter
def tiling_grid(self, new_grid):
"""
Set tiling_grid
:param new_grid: new grid
:type new_grid: np.ndarray
"""
self._tiling_grid = new_grid
# reset overlaps to zeros
self.overlaps = np.zeros(new_grid.shape)
# fill dataset grid with Nones
self.generate_none_tiles()
[docs]
def __getitem__(self, key):
"""
Get item : return the [row, col] dataset
:param key: tuple index
:return: tile
:rtype: xr.Dataset or pd.DataFrame
"""
if isinstance(key, (tuple, list)):
if len(key) == 2:
res = self.tiles[key[0]][key[1]]
elif len(key) == 1:
res = self.tiles[key[0]]
else:
raise ValueError("Too many indexes, expected 1 or 2")
else:
if isinstance(key, int):
res = self.tiles[key]
else:
raise ValueError("Index type not supported")
return res
[docs]
def __setitem__(self, key, newvalue):
"""
Set new tile
:param key: tuple of row and col indexes
:type key: tuple(int, int)
:param newvalue: tile to set
"""
if isinstance(key, (tuple, list)):
if len(key) == 2:
self.tiles[key[0]][key[1]] = newvalue
else:
raise ValueError("Too many indexes, expected 2")
else:
raise ValueError("Index type not supported")
[docs]
def load_single_tile(self, tile_path_name: str):
"""
Load a single tile
:param tile_path_name: Path of tile to load
:type tile_path_name: str
:return: single tile
:rtype: xarray Dataset or Panda dataframe to file
"""
functions = {
CARS_DS_TYPE_ARRAY: load_single_tile_array,
CARS_DS_TYPE_POINTS: load_single_tile_points,
CARS_DS_TYPE_DICT: load_single_tile_dict,
}
return functions[self.dataset_type](tile_path_name)
[docs]
def save_single_tile(self, tile, tile_path_name: str):
"""
Save xarray Dataset or Panda dataframe to file
:param tile: tile to save
:type tile: xr.Dataset or pd.DataFrame
:param tile_path_name: Path of file to save in
"""
functions = {
CARS_DS_TYPE_ARRAY: save_single_tile_array,
CARS_DS_TYPE_POINTS: save_single_tile_points,
CARS_DS_TYPE_DICT: save_single_tile_dict,
}
return functions[self.dataset_type](tile, tile_path_name)
[docs]
def run_save(self, future_result, file_name: str, **kwargs):
"""
Save future result when arrived
:param future_result: xarray.Dataset received
:param file_name: filename to save data to
"""
functions = {
CARS_DS_TYPE_ARRAY: run_save_arrays,
CARS_DS_TYPE_POINTS: run_save_points,
}
return functions[self.dataset_type](future_result, file_name, **kwargs)
[docs]
def get_window_as_dict(self, row, col, from_terrain=False, resolution=1):
"""
Get window in pixels for rasterio. Set from_terrain if tiling grid
was defined in geographic coordinates.
:param row: row
:type row: int
:param col: col
:type col: int
:param from_terrain: true if in terrain coordinates
:type from_terrain: bool
:param resolution: resolution
:type resolution: float
:return: New window : {
"row_min" : row_min ,
"row_max" : row_max
"col_min" : col_min
"col_max" : col_max
}
:rtype: Dict
"""
row_min = np.min(self.tiling_grid[:, :, 0])
col_min = np.min(self.tiling_grid[:, :, 2])
col_max = np.max(self.tiling_grid[:, :, 3])
window_arr = np.copy(self.tiling_grid[row, col, :])
if from_terrain:
# row -> y axis : reversed by convention
window = np.array(
[
col_max - window_arr[3],
col_max - window_arr[2],
window_arr[0] - row_min,
window_arr[1] - row_min,
]
)
else:
window = np.array(
[
window_arr[0] - row_min,
window_arr[1] - row_min,
window_arr[2] - col_min,
window_arr[3] - col_min,
]
)
# normalize with resolution
window = np.round(window / resolution)
new_window = {
"row_min": int(window[0]),
"row_max": int(window[1]),
"col_min": int(window[2]),
"col_max": int(window[3]),
}
return new_window
[docs]
def create_grid( # pylint: disable=too-many-positional-arguments
self,
nb_col: int,
nb_row: int,
row_split: int,
col_split: int,
row_overlap: int,
col_overlap: int,
):
"""
Generate grid of positions by splitting [0, nb_row]x[0, nb_col]
in splits of xsplit x ysplit size
:param nb_col : number of columns
:param nb_row : number of lines
:param col_split: width of splits
:param row_split: height of splits
:param col_overlap: overlap to apply on rows
:param row_overlap: overlap to apply on cols
"""
nb_col_splits = math.ceil(nb_col / row_split)
nb_row_splits = math.ceil(nb_row / col_split)
row_min, row_max = 0, nb_row
col_min, col_max = 0, nb_col
out_grid = np.ndarray(
shape=(nb_row_splits, nb_col_splits, 4), dtype=int
)
out_overlap = np.ndarray(
shape=(nb_row_splits, nb_col_splits, 4), dtype=int
)
for i in range(0, nb_row_splits):
for j in range(0, nb_col_splits):
row_down = row_min + row_split * i
col_left = col_min + col_split * j
row_up = min(row_max, row_min + (i + 1) * row_split)
col_right = min(col_max, col_min + (j + 1) * col_split)
out_grid[i, j, 0] = row_down
out_grid[i, j, 1] = row_up
out_grid[i, j, 2] = col_left
out_grid[i, j, 3] = col_right
# fill overlap [OL_row_down, OL_row_up, OL_col_left,
# OL_col_right]
out_overlap[i, j, 0] = row_down - max(
row_min, row_down - row_overlap
)
out_overlap[i, j, 1] = (
min(row_max, row_up + row_overlap) - row_up
)
out_overlap[i, j, 2] = col_left - max(
col_min, col_left - col_overlap
)
out_overlap[i, j, 3] = (
min(col_right, col_right + col_overlap) - col_right
)
self.tiling_grid = out_grid
self.overlaps = out_overlap
[docs]
def generate_none_tiles(self):
"""
Generate the structure of data tiles, with Nones, according
to grid shape.
"""
self.tiles = create_none(
self.tiling_grid.shape[0], self.tiling_grid.shape[1]
)
[docs]
def create_empty_copy(self, cars_ds):
"""
Copy attributes, grid, overlaps, and create Nones.
:param cars_ds: CarsDataset to copy
:type cars_ds: CarsDataset
"""
self.tiles_info = copy.deepcopy(cars_ds.tiles_info)
self.tiling_grid = copy.deepcopy(cars_ds.tiling_grid)
self.overlaps = copy.deepcopy(cars_ds.overlaps)
self.tiles = []
for _ in range(cars_ds.overlaps.shape[0]):
tiles_row = []
for _ in range(cars_ds.overlaps.shape[1]):
tiles_row.append(None)
self.tiles.append(tiles_row)
[docs]
def generate_descriptor( # pylint: disable=too-many-positional-arguments
self,
future_result,
file_name,
tag=None,
dtype=None,
nodata=None,
rio_tags=None,
rio_band_description=None,
nbits=None,
):
"""
Generate de rasterio descriptor for the given future result
Only works with pixelic tiling grid
:param future_result: Future result
:type future_result: xr.Dataset
:param file_name: file name to save futures to
:type file_name: str
:param tag: tag to save
:type tag: str
:param dtype: dtype
:type dtype: str
:param nodata: no data value
:type nodata: float
"""
# Get profile from 1st finished future
new_profile = get_profile_for_tag_dataset(future_result, tag)
if "width" not in new_profile or "height" not in new_profile:
logging.debug(
"CarsDataset doesn't have a profile, default is given"
)
new_profile = DefaultGTiffProfile(count=new_profile["count"])
new_profile["height"] = np.max(self.tiling_grid[:, :, 1])
new_profile["width"] = np.max(self.tiling_grid[:, :, 3])
# Change dtype
new_profile["dtype"] = dtype
if nodata is not None:
new_profile["nodata"] = nodata
if nbits is None:
nbits = np.dtype(dtype).itemsize * 8
descriptor = rio.open(
file_name, "w+", nbits=nbits, **new_profile, BIGTIFF="IF_SAFER"
)
# add tags and desciption
if rio_tags is not None:
for band, classif in rio_tags:
descriptor.update_tags(band=band, class_name=str(classif))
if rio_band_description is not None:
for band, classif in rio_band_description:
descriptor.set_band_description(band, str(classif))
return descriptor
[docs]
def save_cars_dataset(self, directory):
"""
Save whole CarsDataset to given directory, including tiling grids,
attributes, overlaps, and all the xr.Dataset or pd.DataFrames.
:param directory: Path where to save self CarsDataset
:type directory: str
"""
# Create CarsDataset folder
safe_makedirs(directory)
if self.tiles is None:
logging.error("No tiles managed by CarsDatasets")
raise RuntimeError("No tiles managed by CarsDatasets")
# save tiles info
tiles_info_file = os.path.join(directory, TILES_INFO_FILE)
save_dict(self.tiles_info, tiles_info_file)
# save grid
grid_file = os.path.join(directory, GRID_FILE)
save_numpy_array(self.tiling_grid, grid_file)
# save overlap
overlap_file = os.path.join(directory, OVERLAP_FILE)
save_numpy_array(self.overlaps, overlap_file)
nb_rows, nb_cols = self.tiling_grid.shape[0], self.tiling_grid.shape[1]
# save each tile
for col in range(nb_cols):
for row in range(nb_rows):
# Get name
current_tile_path_name = create_tile_path(col, row, directory)
# save tile
self.save_single_tile(
self.tiles[row][col], current_tile_path_name
)
[docs]
def load_cars_dataset_from_disk(self, directory):
"""
Load whole CarsDataset from given directory
:param directory: Path where is saved CarsDataset to load
:type directory: str
"""
# get tiles info
tiles_info_file = os.path.join(directory, TILES_INFO_FILE)
self.tiles_info = load_dict(tiles_info_file)
# load grid
grid_file = os.path.join(directory, GRID_FILE)
self.tiling_grid = load_numpy_array(grid_file)
nb_rows, nb_cols = self.tiling_grid.shape[0], self.tiling_grid.shape[1]
# load overlap
overlap_file = os.path.join(directory, OVERLAP_FILE)
self.overlaps = load_numpy_array(overlap_file)
# load each tile
self.tiles = []
for row in range(nb_rows):
tiles_row = []
for col in range(nb_cols):
# Get name
current_tile_path_name = create_tile_path(col, row, directory)
# load tile
tiles_row.append(self.load_single_tile(current_tile_path_name))
self.tiles.append(tiles_row)
[docs]
def run_save_arrays(
future_result, file_name, tag=None, descriptor=None, nbits=None
):
"""
Save future when arrived
:param future_result: xarray.Dataset received
:type future_result: xarray.Dataset
:param file_name: filename to save data to
:type file_name: str
:param tag: dataset tag to rasterize
:type tag: str
:param descriptor: rasterio descriptor
"""
# write future result using saved window and overlaps
save_dataset(
future_result,
file_name,
tag,
use_windows_and_overlaps=True,
descriptor=descriptor,
nbits=nbits,
)
[docs]
def run_save_points( # pylint: disable=too-many-positional-arguments
future_result,
file_name,
overwrite=False,
save_by_pair=False,
point_cloud_format="csv",
overwrite_file_name=True,
nbits=None, # pylint: disable=unused-argument
):
"""
Save future result when arrived
:param future_result: pandas Dataframe received
:type future_result: pandas Dataframe
:param file_name: filename to save data to
:type file_name: str
:param overwrite: overwrite file
:type overwrite: bool
:param point_cloud_format: output point cloud format
:type point_cloud_format: str
:param overwrite_file_name: generate a new filename from input filename
matching input dataframe attributes
:type overwrite_file_name: bool
"""
# Save
save_all_dataframe(
future_result,
file_name,
save_by_pair=save_by_pair,
overwrite=overwrite,
point_cloud_format=point_cloud_format,
overwrite_file_name=overwrite_file_name,
)
[docs]
def load_single_tile_array(tile_path_name: str) -> xr.Dataset:
"""
Load a xarray tile
:param tile_path_name: Path of tile to load
:type tile_path_name: str
:return: tile dataset
:rtype: xr.Dataset
"""
# get dataset
dataset_file_name = os.path.join(tile_path_name, DATASET_FILE)
if not os.path.exists(dataset_file_name):
logging.error("Tile {} does not exists".format(dataset_file_name))
return None
with open(dataset_file_name, "rb") as handle:
dataset = pickle.load(handle)
# get attributes
attributes_file_name = os.path.join(tile_path_name, ATTRIBUTE_FILE)
attributes = load_dict(attributes_file_name)
# Format transformation
if PROFILE in attributes:
attributes[PROFILE] = dict_profile_to_rio_profile(attributes[PROFILE])
# add to dataset
dataset.attrs.update(attributes)
return dataset
[docs]
def load_single_tile_points(tile_path_name: str):
"""
Load a panda dataframe
:param tile_path_name: Path of tile to load
:type tile_path_name: str
:return: Tile dataframe
:rtype: Panda dataframe
"""
# get dataframe
dataframe_file_name = os.path.join(tile_path_name, DATAFRAME_FILE)
if not os.path.exists(dataframe_file_name):
logging.error("Tile {} does not exists".format(dataframe_file_name))
return None
with open(dataframe_file_name, "rb") as handle:
dataframe = pickle.load(handle)
# get attributes
attributes_file_name = os.path.join(tile_path_name, ATTRIBUTE_FILE)
attributes = load_dict(attributes_file_name)
# Format transformation
# add to dataframe
dataframe.attrs.update(attributes)
return dataframe
[docs]
def load_single_tile_dict(tile_path_name: str):
"""
Load a CarsDict
:param tile_path_name: Path of tile to load
:type tile_path_name: str
:return: Tile dataframe
:rtype: Panda dataframe
"""
# get dataframe
dict_file_name = os.path.join(tile_path_name, CARSDICT_FILE)
if not os.path.exists(dict_file_name):
logging.error("Tile {} does not exists".format(dict_file_name))
return None
with open(dict_file_name, "rb") as handle:
dict_cars = pickle.load(handle)
# get attributes
attributes_file_name = os.path.join(tile_path_name, ATTRIBUTE_FILE)
attributes = load_dict(attributes_file_name)
# Format transformation
# add to dataframe
dict_cars.attrs.update(attributes)
return dict_cars
[docs]
def save_single_tile_array(dataset: xr.Dataset, tile_path_name: str):
"""
Save xarray to directory, saving the data in a different file that
the attributes (saved in a .json next to it).
:param dataset: dataset to save
:type dataset: xr.Dataset
:param tile_path_name: Path of file to save in
:type tile_path_name: str
"""
if dataset is None:
logging.debug("Tile is None: not saved")
return
# Create tile folder
safe_makedirs(tile_path_name)
# save attributes
saved_dataset_attrs = copy.copy(dataset.attrs)
attributes_file_name = os.path.join(tile_path_name, ATTRIBUTE_FILE)
if dataset.attrs is None:
attributes = {}
else:
attributes = dataset.attrs
# Format transformation
if PROFILE in attributes:
attributes[PROFILE] = rio_profile_to_dict_profile(attributes[PROFILE])
# dump
# separate attributes
dataset.attrs, custom_attributes = separate_dicts(
attributes, [PROFILE, WINDOW, OVERLAPS, SAVING_INFO, ATTRIBUTES]
)
# save
save_dict(custom_attributes, attributes_file_name)
dataset_file_name = os.path.join(tile_path_name, DATASET_FILE)
with open(dataset_file_name, "wb") as handle:
pickle.dump(dataset, handle, protocol=pickle.HIGHEST_PROTOCOL)
# Retrieve attrs
dataset.attrs = saved_dataset_attrs
[docs]
def save_single_tile_points(dataframe, tile_path_name: str):
"""
Save dataFrame to directory, saving the data in a different file that
the attributes (saved in a .json next to it).
:param dataframe: dataframe to save
:type dataframe: pd.DataFrame
:param tile_path_name: Path of file to save in
:type tile_path_name: str
"""
if dataframe is None:
logging.debug("Tile is None: not saved")
return
# Create tile folder
safe_makedirs(tile_path_name)
# save attributes
saved_dataframe_attrs = copy.copy(dataframe.attrs)
attributes_file_name = os.path.join(tile_path_name, ATTRIBUTE_FILE)
if dataframe.attrs is None:
attributes = {}
else:
attributes = dataframe.attrs
# Format transformation
# dump
# separate attributes
dataframe.attrs, custom_attributes = separate_dicts(
attributes, [SAVING_INFO, ATTRIBUTES]
)
# save
save_dict(custom_attributes, attributes_file_name)
dataframe_file_name = os.path.join(tile_path_name, DATAFRAME_FILE)
with open(dataframe_file_name, "wb") as handle:
pickle.dump(dataframe, handle, protocol=pickle.HIGHEST_PROTOCOL)
# Retrieve attrs
dataframe.attrs = saved_dataframe_attrs
[docs]
def save_single_tile_dict(dict_cars, tile_path_name: str):
"""
Save cars_dict to directory, saving the data in a different file that
the attributes (saved in a .json next to it).
:param dict_cars: dataframe to save
:type dict_cars: pd.DataFrame
:param tile_path_name: Path of file to save in
:type tile_path_name: str
"""
# Create tile folder
safe_makedirs(tile_path_name)
# save attributes
saved_dict_cars_attrs = copy.copy(dict_cars.attrs)
attributes_file_name = os.path.join(tile_path_name, ATTRIBUTE_FILE)
if dict_cars.attrs is None:
attributes = {}
else:
attributes = dict_cars.attrs
# Format transformation
# dump
# separate attributes
dict_cars.attrs, custom_attributes = separate_dicts(
attributes, [SAVING_INFO, ATTRIBUTES]
)
# save
save_dict(custom_attributes, attributes_file_name)
dict_cars_file_name = os.path.join(tile_path_name, CARSDICT_FILE)
with open(dict_cars_file_name, "wb") as handle:
pickle.dump(dict_cars, handle, protocol=pickle.HIGHEST_PROTOCOL)
# Retrieve attrs
dict_cars.attrs = saved_dict_cars_attrs
[docs]
def fill_dataset( # pylint: disable=too-many-positional-arguments
dataset,
saving_info=None,
window=None,
profile=None,
attributes=None,
overlaps=None,
):
"""
From a full xarray dataset, fill info properly.
User can fill with saving information (containing CarsDataset id),
window of current tile and its overlaps,
rasterio profile of full data, and attributes associated to data
:param dataset: dataset to fill
:type dataset: xarray_dataset
:param saving_info: created by Orchestrator.get_saving_infos
:type saving_info: dict
:param window:
:type window: dict
:param profile:
:type profile: dict
:param attributes:
:type attributes: dict
"""
if attributes is not None:
dataset.attrs[ATTRIBUTES] = attributes
if saving_info is not None:
dataset.attrs[SAVING_INFO] = saving_info
if window is not None:
dataset.attrs[WINDOW] = window
if overlaps is not None:
dataset.attrs[OVERLAPS] = overlaps
if profile is not None:
dataset.attrs[PROFILE] = profile
[docs]
def fill_dataframe(dataframe, saving_info=None, attributes=None):
"""
From a full pandas dataframe, fill info properly.
User can fill with saving information (containing CarsDataset id),
and attributes associated to data
:param dataframe: dataframe to fill
:type dataframe: pandas dataframe
:param saving_info: created by Orchestrator.get_saving_infos
:type saving_info: dict
:param attributes:
:type attributes: dict
"""
if attributes is not None:
dataframe.attrs[ATTRIBUTES] = attributes
if saving_info is not None:
dataframe.attrs[SAVING_INFO] = saving_info
[docs]
def fill_dict(data_dict, saving_info=None, attributes=None):
"""
From a fulldict, fill info properly.
User can fill with saving information (containing CarsDataset id),
and attributes associated to data
:param data_dict: dictionnary to fill
:type data_dict: Dict
:param saving_info: created by Orchestrator.get_saving_infos
:type saving_info: dict
:param attributes: attributes associated to data
:type attributes: dict
"""
# TODO only use CarsDict
if isinstance(data_dict, dict):
if attributes is not None:
data_dict[ATTRIBUTES] = attributes
if saving_info is not None:
data_dict[SAVING_INFO] = saving_info
elif isinstance(data_dict, cars_dict.CarsDict):
if attributes is not None:
data_dict.attrs[ATTRIBUTES] = attributes
if saving_info is not None:
data_dict.attrs[SAVING_INFO] = saving_info
[docs]
def save_all_dataframe( # pylint: disable=too-many-positional-arguments
dataframe,
file_name,
save_by_pair=False,
overwrite=True,
point_cloud_format="csv",
overwrite_file_name=True,
):
"""
Save DataFrame to csv and laz format. The content of dataframe is merged to
the content of existing saved Dataframe, if overwrite==False
The option save_by_pair separate the dataframe by pair (one folder by pair)
:param file_name: file name to save data to
:type file_name: str
:param overwrite: overwrite file if exists
:type overwrite: bool
:param point_cloud_format: point cloud format (csv or laz)
:type point_cloud_format: str
:param overwrite_file_name: generate a new filename from input filename
matching input dataframe attributes, using only directory from input
filename
:type overwrite_file_name: bool
"""
# generate filename if attributes have xstart and ystart settings
if overwrite_file_name:
if (
"attributes" in dataframe.attrs
and "xmin" in dataframe.attrs["attributes"]
):
file_name = os.path.dirname(file_name)
file_name = os.path.join(
file_name,
(
str(dataframe.attrs["attributes"]["xmin"])
+ "_"
+ str(dataframe.attrs["attributes"]["ymax"])
),
)
elif "saving_info" in dataframe.attrs:
file_name = os.path.dirname(file_name)
file_name = os.path.join(
file_name,
(
str(dataframe.attrs["saving_info"]["cars_ds_col"])
+ "_"
+ str(dataframe.attrs["saving_info"]["cars_ds_row"])
),
)
if not save_by_pair:
save_dataframe(dataframe, file_name, overwrite, point_cloud_format)
else:
pairing_indexes = set(np.array(dataframe["global_id"]).flat)
source_pc_names = dataframe.attrs["attributes"]["source_pc_names"]
for pair_index in pairing_indexes:
dir_name = os.path.join(
os.path.dirname(file_name), source_pc_names[int(pair_index)]
)
safe_makedirs(dir_name)
base_name = os.path.basename(file_name)
points_indexes = dataframe["global_id"] == pair_index
file_name_by_pair = os.path.join(dir_name, base_name)
save_dataframe(
dataframe.loc[points_indexes],
file_name_by_pair,
overwrite,
point_cloud_format,
)
[docs]
def save_dataframe(
dataframe, file_name, overwrite=True, point_cloud_format="csv"
):
"""
Save dataframe (csv, laz, attr file)
"""
# Save attributes
attributes_file_name = file_name + "_attrs.yaml"
save_dict(dataframe.attrs, attributes_file_name)
# Save point cloud to laz format
if point_cloud_format == "laz":
_, extension = os.path.splitext(file_name)
if "laz" not in extension:
file_name = file_name + ".laz"
dataframe_converter.convert_pcl_to_laz(dataframe, file_name)
elif point_cloud_format == "csv":
_, extension = os.path.splitext(file_name)
if "csv" not in extension:
file_name = file_name + ".csv"
if overwrite and os.path.exists(file_name):
dataframe.to_csv(file_name, index=False)
else:
if os.path.exists(file_name):
# merge files
existing_dataframe = pandas.read_csv(file_name)
merged_dataframe = pandas.concat(
[existing_dataframe, dataframe],
ignore_index=True,
sort=False,
)
merged_dataframe.to_csv(file_name, index=False)
else:
dataframe.to_csv(file_name, index=False)
else:
raise RuntimeError(
"Invalid point cloud format {0}".format(point_cloud_format)
)
[docs]
def save_dataset( # pylint: disable=too-many-positional-arguments
dataset,
file_name,
tag,
use_windows_and_overlaps=False,
descriptor=None,
nbits=None,
):
"""
Reconstruct and save data.
In order to save properly the dataset to corresponding tiff file,
dataset must have been filled with saving info, profile, window,
overlaps (if not 0), and rasterio descriptor if already created.
See fill_dataset.
:param dataset: dataset to save
:type dataset: xr.Dataset
:param file_name: file name to save data to
:type file_name: str
:param tag: tag to reconstruct
:type tag: str
:param use_windows_and_overlaps: use saved window and overlaps
:type use_windows_and_overlaps: bool
:param descriptor: descriptor to use with rasterio
:type descriptor: rasterio dataset
"""
if dataset is None:
logging.error("Tile is None: not saved ")
return
overlaps = get_overlaps_dataset(dataset)
window = get_window_dataset(dataset)
rio_window = None
overlap = [0, 0, 0, 0]
if use_windows_and_overlaps:
if window is None:
logging.debug("User wants to use window but none was set")
else:
rio_window = generate_rasterio_window(window)
if overlaps is not None:
overlap = [
overlaps["up"],
overlaps["down"],
overlaps["left"],
overlaps["right"],
]
if len(dataset[tag].values.shape) > 2:
nb_rows, nb_cols = (
dataset[tag].values.shape[1],
dataset[tag].values.shape[2],
)
data = dataset[tag].values[
:,
overlap[0] : nb_rows - overlap[1],
overlap[2] : nb_cols - overlap[3],
]
else:
nb_rows, nb_cols = (
dataset[tag].values.shape[0],
dataset[tag].values.shape[1],
)
data = dataset[tag].values[
overlap[0] : nb_rows - overlap[1],
overlap[2] : nb_cols - overlap[3],
]
if tag == cst.EPI_TEXTURE and "int" in descriptor.dtypes[0]:
# Prepare color data for cast
data = np.nan_to_num(data, nan=descriptor.nodata)
data = np.round(data)
profile = get_profile_for_tag_dataset(dataset, tag)
new_profile = profile
if "width" not in new_profile or "height" not in new_profile:
logging.debug("CarsDataset doesn't have a profile, default is given")
new_profile = DefaultGTiffProfile(count=new_profile["count"])
new_profile["height"] = data.shape[0]
new_profile["width"] = data.shape[1]
new_profile["dtype"] = "float32"
bands_description = None
if (
tag in (cst.EPI_CLASSIFICATION, cst.RASTER_CLASSIF)
and cst.BAND_CLASSIF in dataset.coords
):
bands_description = dataset.coords[cst.BAND_CLASSIF].values
if tag in (cst.EPI_TEXTURE, cst.POINT_CLOUD_CLR_KEY_ROOT, cst.DSM_COLOR):
bands_description = dataset.coords[cst.BAND_IM].values
if tag in (cst.RASTER_SOURCE_PC, cst.DSM_SOURCE_PC):
bands_description = dataset.coords[cst.BAND_SOURCE_PC].values
if tag in (cst.EPI_FILLING, cst.RASTER_FILLING):
bands_description = dataset.coords[cst.BAND_FILLING].values
if tag in (
cst.RASTER_PERFORMANCE_MAP,
cst.RASTER_PERFORMANCE_MAP_RAW,
cst.DSM_PERFORMANCE_MAP,
):
if cst.BAND_PERFORMANCE_MAP in dataset.coords:
bands_description = dataset.coords[cst.BAND_PERFORMANCE_MAP].values
classes_info_tag = None
if tag == cst.RASTER_PERFORMANCE_MAP:
classes_info_tag = dataset.attrs.get(
cst.RIO_TAG_PERFORMANCE_MAP_CLASSES, None
)
outputs.rasterio_write_georaster(
file_name,
data,
new_profile,
window=rio_window,
descriptor=descriptor,
bands_description=bands_description,
classes_info_tag=classes_info_tag,
nbits=nbits,
)
[docs]
def create_tile_path(col: int, row: int, directory: str) -> str:
"""
Create path of tile, according to its position in CarsDataset grid
:param col: numero of column
:type col: int
:param row: numero of row
:type row: int
:param directory: path where to save tile
:type directory: str
:return: full path
:rtype: str
"""
tail = "col_" + repr(col) + "_row_" + repr(row)
name = os.path.join(directory, tail)
return name
[docs]
def save_numpy_array(array: np.ndarray, file_name: str):
"""
Save numpy array to file
:param array: array to save
:type array: np.ndarray
:param file_name: numero of row
:type file_name: str
"""
with open(file_name, "wb") as descriptor:
np.save(descriptor, array)
[docs]
def load_numpy_array(file_name: str) -> np.ndarray:
"""
Load numpy array from file
:param file_name: numero of row
:type file_name: str
:return: array
:rtype: np.ndarray
"""
with open(file_name, "rb") as descriptor:
return np.load(descriptor)
[docs]
def create_none(nb_row: int, nb_col: int):
"""
Create a grid filled with None. The created grid is a 2D list :
ex: [[None, None], [None, None]]
:param nb_row: number of rows
:param nb_col: number of cols
:return: Grid filled with None
:rtype: list of list
"""
grid = []
for _ in range(nb_row):
tmp = []
for _ in range(nb_col):
tmp.append(None)
grid.append(tmp)
return grid
[docs]
def overlap_array_to_dict(overlap):
"""
Convert matrix of overlaps, to dict format used in CarsDatasets.
Input is : [o_up, o_down, o_left, o_right].
Output is : {"up": o_up, "down": o_down, "left": o_left, "right": o_right}
:param overlap: overlaps
:type overlap: List
:return: New overlaps
:rtype: Dict
"""
new_overlap = {
"up": int(overlap[0]),
"down": int(overlap[1]),
"left": int(overlap[2]),
"right": int(overlap[3]),
}
return new_overlap
[docs]
def window_array_to_dict(window, overlap=None):
"""
Convert matrix of windows, to dict format used in CarsDatasets.
Use overlaps if you want to get window with overlaps
inputs are :
- window : [row_min, row_max, col_min, col_max], with pixel format
- overlap (optional): [o_row_min, o_row_max, o_col_min, o_col_max]
outputs are :
{
"row_min" : row_min - o_row_min,
"row_max" : row_max + o_row_max,
"col_min" : col_min - o_col_min,
"col_max" : col_max - o_col_max,
}
:param window: window
:type window: List
:param overlap: overlaps
:type overlap: List
:return: New window
:rtype: Dict
"""
new_window = {
"row_min": int(window[0]),
"row_max": int(window[1]),
"col_min": int(window[2]),
"col_max": int(window[3]),
}
if overlap is not None:
new_window["row_min"] -= int(overlap[0])
new_window["row_max"] += int(overlap[1])
new_window["col_min"] -= int(overlap[2])
new_window["col_max"] += int(overlap[3])
return new_window
[docs]
def dict_profile_to_rio_profile(dict_profile: Dict) -> Dict:
"""
Transform a rasterio Profile transformed into serializable Dict,
into a rasterio profile.
:param profile: rasterio Profile transformed into serializable Dict
:type profile: Dict
:return: Profile
:rtype: Rasterio Profile
"""
rio_profile = copy.copy(dict_profile)
transform = None
if "transform" in dict_profile:
if dict_profile["transform"] is not None:
transform = rio.Affine(
*dict_profile["transform"][0:6],
)
crs = None
if "crs" in dict_profile:
if dict_profile["crs"] is not None:
if (
isinstance(dict_profile["crs"], str)
and "EPSG:" in dict_profile["crs"]
):
crs = pyproj.CRS(dict_profile["crs"].replace("EPSG:", ""))
else:
crs = pyproj.CRS(dict_profile["crs"])
rio_profile["crs"] = crs
rio_profile["transform"] = transform
return rio_profile
[docs]
def rio_profile_to_dict_profile(in_profile: Dict) -> Dict:
"""
Transform a rasterio profile into a serializable Dict.
:param in_profile: rasterio Profile transformed into serializable Dict
:type in_profile: Dict
:return: Profile
:rtype: Dict
"""
profile = copy.copy(in_profile)
profile = {**profile}
crs = None
if "crs" in profile:
if profile["crs"] is not None:
if isinstance(profile["crs"], str):
crs = profile["crs"]
else:
crs = profile["crs"].to_epsg()
transform = None
if "transform" in profile:
if profile["transform"] is not None:
transform = list(profile["transform"])[:6]
profile.update(crs=crs, transform=transform)
return profile
[docs]
def save_dict(dictionary, file_path: str):
"""
Save dict to json file
:param dictionary: dictionary to save
:type dictionary: Dict
:param file_path: file path to use
:type file_path: str
"""
dictionary_yaml = make_yaml_safe(dictionary)
with open(file_path, "w", encoding="utf8") as fstream:
yaml.safe_dump(
dictionary_yaml,
fstream,
allow_unicode=True,
sort_keys=False,
)
[docs]
def make_yaml_safe(obj): # pylint: disable=too-many-return-statements
"""
Converter
"""
if isinstance(obj, dict):
return {make_yaml_safe(k): make_yaml_safe(v) for k, v in obj.items()}
if isinstance(obj, list):
return [make_yaml_safe(v) for v in obj]
if isinstance(obj, tuple):
return tuple(make_yaml_safe(v) for v in obj)
if isinstance(obj, (np.generic, np.number)):
return obj.item()
return obj
[docs]
def load_dict(file_path: str) -> Dict:
"""
Load dict from json file
:param file_path: file path to use
:type file_path: str
"""
with open(file_path, "r", encoding="utf8") as fstream:
dictionary = yaml.safe_load(fstream)
return dictionary
[docs]
def separate_dicts(dictionary, list_tags):
"""
Separate a dict into two, the second one containing the given tags.
For example, {key1: val1, key2: val2, key3: val3}
with list_tags = [key2] will be split in :
{key1: val1, key3: val3} and {key2: val2}
"""
dict1 = {}
dict2 = {}
for key in dictionary:
if key in list_tags:
dict2[key] = dictionary[key]
else:
dict1[key] = dictionary[key]
return dict1, dict2
[docs]
def get_attributes(obj):
"""
Get attributes in .attrs of dataset or dataframe
:param obj: dataset or dataframe
:type obj: xr.Dataset or pandas.Dataframe
"""
return obj.attrs.get(ATTRIBUTES, None)
[docs]
def get_window_dataset(dataset):
"""
Get window in dataset
:param dataset: dataset
:type dataset: xr.Dataset
"""
return dataset.attrs.get(WINDOW, None)
[docs]
def get_overlaps_dataset(dataset):
"""
Get overlaps in dataset
:param dataset: dataset
:type dataset: xr.Dataset
"""
return dataset.attrs.get(OVERLAPS, None)
[docs]
def get_profile_rasterio(dataset):
"""
Get profile in dataset
:param dataset: dataset
:type dataset: xr.Dataset
"""
return dataset.attrs.get(PROFILE, None)
[docs]
def get_profile_for_tag_dataset(dataset, tag: str) -> Dict:
"""
Get profile according to layer to save.
This function modify current rasterio dataset to fix the number of
bands of the data associated to given tag.
:param tag: tag to use
:type tag: str
:return: Profile
:rtype: Rasterio Profile
"""
new_profile = get_profile_rasterio(dataset)
if new_profile is None:
new_profile = {}
new_profile["count"] = 1
if len(dataset[tag].values.shape) > 2:
new_profile["count"] = dataset[tag].values.shape[0]
return new_profile
[docs]
def generate_rasterio_window(window: Dict) -> rio.windows.Window:
"""
Generate rasterio window to use.
:param window: window to convert, containing 'row_min',
'row_max', 'col_min', 'col_max
:type window: dict
:return: rasterio window
:rtype: rio.windows.Window
"""
returned_window = None
if window is not None:
return Window.from_slices(
(window["row_min"], window["row_max"]),
(window["col_min"], window["col_max"]),
)
return returned_window