#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=too-many-lines
"""
this module contains the dense_matching application class.
"""
import collections
# Third party imports
import copy
# Standard imports
import logging
import os
from typing import List
import numpy as np
import rasterio as rio
import xarray
from affine import Affine
from json_checker import Checker, Or
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
# CARS imports
from cars.applications.rasterization import (
rasterization_algo,
)
from cars.applications.rasterization import (
rasterization_constants as raster_cst,
)
from cars.applications.rasterization import (
rasterization_wrappers,
)
from cars.applications.rasterization.abstract_pc_rasterization_app import (
PointCloudRasterization,
)
from cars.applications.triangulation import pc_transform
from cars.core import constants as cst
from cars.core import projection, tiling
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset
# R0903 temporary disabled for error "Too few public methods"
# œgoing to be corrected by adding new methods as check_conf
[docs]
class SimpleGaussian(
PointCloudRasterization, short_name="simple_gaussian"
): # pylint: disable=R0903
"""
PointCloudRasterisation
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, conf=None):
"""
Init function of PointCloudRasterisation
:param conf: configuration for rasterization
:return: a application_to_use object
"""
super().__init__(conf=conf)
# check conf
# get rasterization parameter
self.used_method = self.used_config["method"]
self.dsm_radius = self.used_config["dsm_radius"]
self.sigma = self.used_config["sigma"]
self.grid_points_division_factor = self.used_config[
"grid_points_division_factor"
]
# get nodata values
self.dsm_no_data = self.used_config["dsm_no_data"]
self.texture_no_data = self.used_config["texture_no_data"]
self.color_dtype = self.used_config["texture_dtype"]
self.msk_no_data = self.used_config["msk_no_data"]
# Init orchestrator
self.orchestrator = None
[docs]
def check_conf(self, conf):
"""
Check configuration
:param conf: configuration to check
:type conf: dict
:return: overloaded configuration
:rtype: dict
"""
# init conf
if conf is not None:
overloaded_conf = conf.copy()
else:
conf = {}
overloaded_conf = {}
# Overload conf
# get rasterization parameter
overloaded_conf["method"] = conf.get("method", "simple_gaussian")
overloaded_conf["dsm_radius"] = conf.get("dsm_radius", 1)
overloaded_conf["sigma"] = conf.get("sigma", None)
overloaded_conf["grid_points_division_factor"] = conf.get(
"grid_points_division_factor", None
)
# get nodata values
overloaded_conf["dsm_no_data"] = conf.get("dsm_no_data", -32768)
overloaded_conf["texture_no_data"] = conf.get("texture_no_data", None)
overloaded_conf["texture_dtype"] = conf.get("texture_dtype", None)
overloaded_conf["msk_no_data"] = conf.get("msk_no_data", 255)
overloaded_conf["save_intermediate_data"] = conf.get(
"save_intermediate_data", False
)
rasterization_schema = {
"method": str,
"dsm_radius": Or(float, int),
"sigma": Or(float, None),
"grid_points_division_factor": Or(None, int),
"dsm_no_data": int,
"msk_no_data": int,
"texture_no_data": Or(None, int),
"texture_dtype": Or(None, str),
"save_intermediate_data": bool,
}
# Check conf
checker = Checker(rasterization_schema)
checker.validate(overloaded_conf)
return overloaded_conf
[docs]
def get_margins(self, resolution):
"""
Get the margin to use for terrain tiles
:param resolution: resolution of raster data (in target CRS unit)
:type resolution: float
:return: margin in meters or degrees
"""
margins = self.dsm_radius * resolution
return margins
[docs]
def get_optimal_tile_size(
self,
max_ram_per_worker,
superposing_point_clouds=1,
point_cloud_resolution=0.5,
):
"""
Get the optimal tile size to use, depending on memory available
:param max_ram_per_worker: maximum ram available
:type max_ram_per_worker: int
:param superposing_point_clouds: number of point clouds superposing
:type superposing_point_clouds: int
:param point_cloud_resolution: resolution of point cloud
:type point_cloud_resolution: float
:return: optimal tile size in meter
:rtype: float
"""
tot = 7000 * superposing_point_clouds / point_cloud_resolution
import_ = 200 # MiB
tile_size = int(
np.sqrt(float(((max_ram_per_worker - import_) * 2**23)) / tot)
)
logging.info(
"Estimated optimal tile size for rasterization: {} meters".format(
tile_size
)
)
return tile_size
# pylint: disable=too-many-positional-arguments
[docs]
def run( # noqa: C901 function is too complex
self,
point_clouds,
epsg,
output_crs,
resolution,
orchestrator=None,
dsm_file_name=None,
weights_file_name=None,
color_file_name=None,
classif_file_name=None,
performance_map_file_name=None,
ambiguity_file_name=None,
contributing_pair_file_name=None,
filling_file_name=None,
color_dtype=None,
dump_dir=None,
performance_map_classes=None,
phasing=None,
):
"""
Run PointCloudRasterisation application.
Creates a CarsDataset filled with dsm tiles.
:param point_clouds: merged point cloud or list of array point clouds
. CarsDataset contains:
- Z x W Delayed tiles. \
Each tile will be a future pandas DataFrame containing:
- data with keys "x", "y", "z", "corr_msk" \
optional: "texture", "mask", "data_valid", "z_inf", "z_sup"\
"coord_epi_geom_i", "coord_epi_geom_j", "idx_im_epi"
- attrs with keys "epsg", "ysize", "xsize", "xstart", "ystart"
- attributes containing "bounds", "ysize", "xsize", "epsg"
OR
Tuple(list of CarsDataset Arrays, bounds). With list of point
clouds:
list of CarsDataset of type array, with:
- data with keys x", "y", "z", "corr_msk", "z_inf", "z_sup"\
optional: "texture", "mask", "data_valid",\
"coord_epi_geom_i", "coord_epi_geom_j", "idx_im_epi"
:type point_clouds: CarsDataset filled with pandas.DataFrame
:param epsg: epsg of raster data
:type epsg: str
:param output_crs: output_crs of raster data
:type output_crs: str
:param resolution: resolution of raster data (in target CRS unit)
:type resolution: float
:param orchestrator: orchestrator used
:param dsm_file_name: path of dsm
:type dsm_file_name: str
:param weights_file_name: path of dsm weights
:type weights_file_name: str
:param color_file_name: path of color
:type color_file_name: str
:param classif_file_name: path of classification
:type classif_file_name: str
:param performance_map_file_name: path of performance map file
:type performance_map_file_name: str
:param ambiguity_file_name: path of ambiguity file
:type ambiguity_file_name: str
:param contributing_pair_file_name: path of contributing pair file
:type contributing_pair_file_name: str
:param filling_file_name: path of filling file
:type filling_file_name: str
:param color_dtype: output color image type
:type color_dtype: str (numpy type)
:param dump_dir: directory used for outputs with no associated filename
:type dump_dir: str
:param performance_map_classes: list for step defining border of class
:type performance_map_classes: list or None
:param phasing: if activated, we phase the dsm on this point
:type phasing: dict
:return: raster DSM. CarsDataset contains:
- Z x W Delayed tiles. \
Each tile will be a future xarray Dataset containing:
- data : with keys : "hgt", "img", "raster_msk",optional : \
"n_pts", "pts_in_cell", "hgt_mean", "hgt_stdev",\
"hgt_inf", "hgt_sup"
- attrs with keys: "epsg"
- attributes containing: None
:rtype : CarsDataset filled with xr.Dataset
"""
# only the saved layers will be saved
list_computed_layers = []
# Default orchestrator
if orchestrator is None:
# Create default sequential orchestrator for current application
# be awere, no out_json will be shared between orchestrators
# No files saved
self.orchestrator = ocht.Orchestrator(
orchestrator_conf={"mode": "sequential"}
)
else:
self.orchestrator = orchestrator
# Get if color and stats are saved
save_intermediate_data = self.used_config["save_intermediate_data"]
keep_dir = (
len(
list(
filter(
lambda x: x is not None,
[
weights_file_name,
color_file_name,
classif_file_name,
performance_map_file_name,
ambiguity_file_name,
contributing_pair_file_name,
filling_file_name,
],
)
)
)
> 0
)
if not self.color_dtype:
self.color_dtype = color_dtype
if self.texture_no_data is None:
if self.color_dtype is not None:
if "float" in self.color_dtype:
self.texture_no_data = np.finfo(self.color_dtype).max
else:
self.texture_no_data = np.iinfo(self.color_dtype).max
else:
self.texture_no_data = np.finfo("float32").max
# Setup dump directory
if dump_dir is not None:
out_dump_dir = dump_dir
safe_makedirs(dump_dir)
if not save_intermediate_data and not keep_dir:
self.orchestrator.add_to_clean(dump_dir)
else:
out_dump_dir = self.orchestrator.out_dir
# Check if input data is supported
if not (
isinstance(point_clouds, tuple)
and isinstance(point_clouds[0][0], cars_dataset.CarsDataset)
and point_clouds[0][0].dataset_type == "arrays"
):
message = (
"PointCloudRasterization application doesn't support "
"this input data "
"format : type : {}".format(type(point_clouds))
)
logging.error(message)
raise RuntimeError(message)
# Create CarsDataset
terrain_raster = cars_dataset.CarsDataset(
"arrays", name="rasterization"
)
paths_data = {}
bounds = point_clouds[1]
# tiling grid: all tiles from sources -> not replaceable.
# CarsDataset is only used for processing
nb_tiles = 0
for point_cld in point_clouds[0]:
nb_tiles += point_cld.shape[0] * point_cld.shape[1]
terrain_raster.tiling_grid = tiling.generate_tiling_grid(
0, 0, 1, nb_tiles, 1, 1
)
terrain_raster.generate_none_tiles()
if phasing is not None:
res = resolution
x_phase = phasing["point"][0]
y_phase = phasing["point"][1]
for index, value in enumerate(bounds):
if index in (0, 2):
bounds[index] = rasterization_wrappers.phased_dsm(
value, x_phase, res
)
else:
bounds[index] = rasterization_wrappers.phased_dsm(
value, y_phase, res
)
# Derive output image files parameters to pass to rasterio
_, _, xsize, ysize = tiling.roi_to_start_and_size(bounds, resolution)
logging.info("DSM output image size: {}x{} pixels".format(xsize, ysize))
try:
if isinstance(point_clouds, tuple):
source_pc_names = point_clouds[0][0].attributes[
"source_pc_names"
]
else:
source_pc_names = point_clouds.attributes["source_pc_names"]
except KeyError:
source_pc_names = None
# Save objects
if dsm_file_name is not None:
safe_makedirs(os.path.dirname(dsm_file_name))
out_dsm_file_name = dsm_file_name
if out_dsm_file_name is not None:
self.orchestrator.update_index(
{
"dsm": {
cst.INDEX_DSM_ALT: os.path.basename(out_dsm_file_name)
}
}
)
elif save_intermediate_data:
# File is not part of the official product, write it in dump_dir
out_dsm_file_name = os.path.join(out_dump_dir, "dsm.tif")
if out_dsm_file_name is not None:
list_computed_layers += ["dsm"]
self.orchestrator.add_to_save_lists(
out_dsm_file_name,
cst.RASTER_HGT,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm",
)
paths_data[cst.RASTER_HGT] = out_dsm_file_name
out_weights_file_name = weights_file_name
if out_weights_file_name is not None:
# add contributing pair filename to index
self.orchestrator.update_index(
{
"dsm": {
cst.INDEX_DSM_WEIGHTS: os.path.basename(
out_weights_file_name
)
}
}
)
else:
# Always write weights.tif, but dump_dir is in the orchestrator
# clean list if save_intermediate_data is not activated
out_weights_file_name = os.path.join(out_dump_dir, "weights.tif")
if out_weights_file_name is not None:
list_computed_layers += ["weights"]
self.orchestrator.add_to_save_lists(
out_weights_file_name,
cst.RASTER_WEIGHTS_SUM,
terrain_raster,
dtype=np.float32,
nodata=0,
cars_ds_name="dsm_weights",
)
paths_data[cst.RASTER_WEIGHTS_SUM] = out_weights_file_name
out_clr_file_name = color_file_name
if out_clr_file_name is not None:
# add contributing pair filename to index
self.orchestrator.update_index(
{
"dsm": {
cst.INDEX_DSM_COLOR: os.path.basename(out_clr_file_name)
}
}
)
elif save_intermediate_data:
# File is not part of the official product, write it in dump_dir
out_clr_file_name = os.path.join(out_dump_dir, "image.tif")
if out_clr_file_name is not None:
list_computed_layers += ["texture"]
self.orchestrator.add_to_save_lists(
out_clr_file_name,
cst.RASTER_COLOR_IMG,
terrain_raster,
dtype=self.color_dtype,
nodata=self.texture_no_data,
cars_ds_name="texture",
)
paths_data[cst.RASTER_COLOR_IMG] = out_clr_file_name
out_classif_file_name = classif_file_name
if out_classif_file_name is not None:
# add contributing pair filename to index
self.orchestrator.update_index(
{
"dsm": {
cst.INDEX_DSM_CLASSIFICATION: os.path.basename(
out_classif_file_name
)
}
}
)
elif save_intermediate_data:
# File is not part of the official product, write it in dump_dir
out_classif_file_name = os.path.join(
out_dump_dir, "classification.tif"
)
if out_classif_file_name is not None:
list_computed_layers += ["classif"]
self.orchestrator.add_to_save_lists(
out_classif_file_name,
cst.RASTER_CLASSIF,
terrain_raster,
dtype=np.uint8,
nodata=self.msk_no_data,
cars_ds_name="dsm_classif",
optional_data=True,
)
paths_data[cst.RASTER_CLASSIF] = out_classif_file_name
out_performance_map = performance_map_file_name
# save raw as performance map if performance_map_classes is None
# save classified performance map if performance_map_classes is not None
out_performance_map_raw = None
if out_performance_map is not None:
# add contributing pair filename to index
self.orchestrator.update_index(
{
"dsm": {
cst.INDEX_DSM_PERFORMANCE_MAP: os.path.basename(
out_performance_map
)
}
}
)
if performance_map_classes == [0]:
# No classes, we return raw data
out_performance_map_raw = out_performance_map
out_performance_map = None
elif save_intermediate_data:
# File is not part of the official product, write it in dump_dir
out_performance_map = os.path.join(
out_dump_dir, "performance_map.tif"
)
out_performance_map_raw = os.path.join(
out_dump_dir, "performance_map_raw.tif"
)
if out_performance_map_raw is not None:
list_computed_layers += ["performance_map_raw"]
self.orchestrator.add_to_save_lists(
out_performance_map_raw,
cst.RASTER_PERFORMANCE_MAP_RAW,
terrain_raster,
dtype=np.float32,
nodata=self.msk_no_data,
cars_ds_name="performance_map_raw",
optional_data=True,
)
paths_data[cst.RASTER_PERFORMANCE_MAP_RAW] = out_performance_map_raw
if out_performance_map is not None:
# classified performance map exists
list_computed_layers += ["performance_map"]
self.orchestrator.add_to_save_lists(
out_performance_map,
cst.RASTER_PERFORMANCE_MAP,
terrain_raster,
dtype=np.uint8,
nodata=self.msk_no_data,
cars_ds_name="performance_map",
optional_data=True,
)
paths_data[cst.RASTER_PERFORMANCE_MAP] = out_performance_map
out_ambiguity = ambiguity_file_name
if out_ambiguity is not None:
# add contributing pair filename to index
self.orchestrator.update_index(
{
"dsm": {
cst.INDEX_DSM_AMBIGUITY: os.path.basename(out_ambiguity)
}
}
)
elif save_intermediate_data:
# File is not part of the official product, write it in dump_dir
out_ambiguity = os.path.join(out_dump_dir, "ambiguity.tif")
if out_ambiguity:
list_computed_layers += ["ambiguity"]
self.orchestrator.add_to_save_lists(
out_ambiguity,
cst.RASTER_AMBIGUITY,
terrain_raster,
dtype=np.float32,
nodata=self.msk_no_data,
cars_ds_name="ambiguity",
optional_data=True,
)
paths_data[cst.RASTER_AMBIGUITY] = out_ambiguity
out_source_pc = contributing_pair_file_name
if out_source_pc is not None:
# add contributing pair filename to index
self.orchestrator.update_index(
{
"dsm": {
cst.INDEX_DSM_CONTRIBUTING_PAIR: os.path.basename(
out_source_pc
)
}
}
)
elif save_intermediate_data:
# File is not part of the official product, write it in dump_dir
out_source_pc = os.path.join(out_dump_dir, "contributing_pair.tif")
if out_source_pc:
list_computed_layers += ["source_pc"]
self.orchestrator.add_to_save_lists(
out_source_pc,
cst.RASTER_SOURCE_PC,
terrain_raster,
dtype=np.uint8,
nodata=self.msk_no_data,
cars_ds_name="contributing_pair",
optional_data=True,
)
paths_data[cst.RASTER_SOURCE_PC] = out_source_pc
out_filling = filling_file_name
if out_filling is not None:
# add filling filename to index
self.orchestrator.update_index(
{"dsm": {cst.INDEX_DSM_FILLING: os.path.basename(out_filling)}}
)
elif save_intermediate_data:
# File is not part of the official product, write it in dump_dir
out_filling = os.path.join(out_dump_dir, "filling.tif")
if out_filling:
list_computed_layers += ["filling"]
self.orchestrator.add_to_save_lists(
out_filling,
cst.RASTER_FILLING,
terrain_raster,
dtype=np.uint8,
nodata=self.msk_no_data,
cars_ds_name="filling",
optional_data=True,
)
paths_data[cst.RASTER_FILLING] = out_filling
# TODO Check that intervals indeed exist!
if save_intermediate_data:
list_computed_layers += [cst.POINT_CLOUD_LAYER_SUP_OR_INF_ROOT]
out_dsm_inf_file_name = os.path.join(out_dump_dir, "dsm_inf.tif")
self.orchestrator.add_to_save_lists(
out_dsm_inf_file_name,
cst.RASTER_HGT_INF,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_inf",
optional_data=True,
)
out_dsm_sup_file_name = os.path.join(out_dump_dir, "dsm_sup.tif")
self.orchestrator.add_to_save_lists(
out_dsm_sup_file_name,
cst.RASTER_HGT_SUP,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_sup",
optional_data=True,
)
out_dsm_mean_file_name = os.path.join(out_dump_dir, "dsm_mean.tif")
out_dsm_std_file_name = os.path.join(out_dump_dir, "dsm_std.tif")
out_dsm_n_pts_file_name = os.path.join(
out_dump_dir, "dsm_n_pts.tif"
)
out_dsm_points_in_cell_file_name = os.path.join(
out_dump_dir, "dsm_pts_in_cell.tif"
)
self.orchestrator.add_to_save_lists(
out_dsm_mean_file_name,
cst.RASTER_HGT_MEAN,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_mean",
)
self.orchestrator.add_to_save_lists(
out_dsm_std_file_name,
cst.RASTER_HGT_STD_DEV,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_std",
)
self.orchestrator.add_to_save_lists(
out_dsm_n_pts_file_name,
cst.RASTER_NB_PTS,
terrain_raster,
dtype=np.uint16,
nodata=0,
cars_ds_name="dsm_n_pts",
)
self.orchestrator.add_to_save_lists(
out_dsm_points_in_cell_file_name,
cst.RASTER_NB_PTS_IN_CELL,
terrain_raster,
dtype=np.uint16,
nodata=0,
cars_ds_name="dsm_pts_in_cells",
)
out_dsm_inf_mean_file_name = os.path.join(
out_dump_dir, "dsm_inf_mean.tif"
)
out_dsm_inf_std_file_name = os.path.join(
out_dump_dir, "dsm_inf_std.tif"
)
self.orchestrator.add_to_save_lists(
out_dsm_inf_mean_file_name,
cst.RASTER_HGT_INF_MEAN,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_inf_mean",
optional_data=True,
)
self.orchestrator.add_to_save_lists(
out_dsm_inf_std_file_name,
cst.RASTER_HGT_INF_STD_DEV,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_inf_std",
optional_data=True,
)
out_dsm_sup_mean_file_name = os.path.join(
out_dump_dir, "dsm_sup_mean.tif"
)
out_dsm_sup_std_file_name = os.path.join(
out_dump_dir, "dsm_sup_std.tif"
)
self.orchestrator.add_to_save_lists(
out_dsm_sup_mean_file_name,
cst.RASTER_HGT_SUP_MEAN,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_sup_mean",
optional_data=True,
)
self.orchestrator.add_to_save_lists(
out_dsm_sup_std_file_name,
cst.RASTER_HGT_SUP_STD_DEV,
terrain_raster,
dtype=np.float32,
nodata=self.dsm_no_data,
cars_ds_name="dsm_sup_std",
optional_data=True,
)
# Get saving infos in order to save tiles when they are computed
[saving_info] = self.orchestrator.get_saving_infos([terrain_raster])
# Generate profile
geotransform = (
bounds[0],
resolution,
0.0,
bounds[3],
0.0,
-resolution,
)
transform = Affine.from_gdal(*geotransform)
raster_profile = collections.OrderedDict(
{
"height": ysize,
"width": xsize,
"driver": "GTiff",
"dtype": "float32",
"transform": transform,
"crs": output_crs.to_wkt(),
"tiled": True,
}
)
# Get number of tiles
logging.info(
"Number of tiles in cloud rasterization: "
"row: {} "
"col: {}".format(
terrain_raster.tiling_grid.shape[0],
terrain_raster.tiling_grid.shape[1],
)
)
# Add infos to orchestrator.out_json
updating_dict = {
application_constants.APPLICATION_TAG: {
raster_cst.RASTERIZATION_RUN_TAG: {
raster_cst.EPSG_TAG: epsg,
raster_cst.DSM_NO_DATA_TAG: float(self.dsm_no_data),
},
}
}
if self.texture_no_data is not None:
updating_dict[application_constants.APPLICATION_TAG][
raster_cst.RASTERIZATION_RUN_TAG
][raster_cst.TEXTURE_NO_DATA_TAG] = float(self.texture_no_data)
self.orchestrator.update_out_info(updating_dict)
# Add attributrs
terrain_raster.attributes["paths"] = paths_data
# Add final function to apply
terrain_raster.final_function = raster_final_function
ind_tile = 0
for point_cloud in point_clouds[0]:
for row_pc in range(point_cloud.shape[0]):
for col_pc in range(point_cloud.shape[1]):
# update saving infos for potential replacement
full_saving_info = ocht.update_saving_infos(
saving_info, row=0, col=ind_tile
)
if point_cloud[row_pc, col_pc] is not None:
# Delayed call to rasterization operations using all
# required point clouds
terrain_raster[
0, ind_tile
] = self.orchestrator.cluster.create_task(
rasterization_wrapper
)(
point_cloud[row_pc, col_pc],
resolution,
epsg,
raster_profile,
window=None,
terrain_region=None,
terrain_full_roi=bounds,
list_computed_layers=list_computed_layers,
saving_info=full_saving_info,
radius=self.dsm_radius,
sigma=self.sigma,
dsm_no_data=self.dsm_no_data,
texture_no_data=self.texture_no_data,
color_dtype=color_dtype,
msk_no_data=self.msk_no_data,
source_pc_names=source_pc_names,
performance_map_classes=performance_map_classes,
)
ind_tile += 1
# Sort tiles according to rank TODO remove or implement it ?
return terrain_raster
# pylint: disable=too-many-positional-arguments
[docs]
def rasterization_wrapper( # noqa: C901
cloud,
resolution,
epsg,
profile,
window=None,
terrain_region=None,
terrain_full_roi=None,
list_computed_layers: List[str] = None,
saving_info=None,
sigma: float = None,
radius: int = 1,
dsm_no_data: int = np.nan,
texture_no_data: int = np.nan,
color_dtype: str = "float32",
msk_no_data: int = 255,
source_pc_names=None,
performance_map_classes=None,
):
"""
Wrapper for rasterization step :
- Convert a list of clouds to correct epsg
- Rasterize it with associated colors
if terrain_region is not provided: region is computed from point cloud,
with margin to use
:param cloud: combined cloud
:type cloud: pandas.DataFrame
:param terrain_region: terrain bounds
:param resolution: Produced DSM resolution (meter, degree [EPSG dependent])
:type resolution: float
:param epsg_code: epsg code for the CRS of the output DSM
:type epsg_code: int
:param window: Window considered
:type window: int
:param margin: margin in pixel to use
:type margin: int
:param profile: rasterio profile
:param list_computed_layers: list of computed output data
:type profile: dict
:param saving_info: information about CarsDataset ID.
:type saving_info: dict
:param sigma: sigma for gaussian interpolation.
(If None, set to resolution)
:param radius: Radius for hole filling.
:param dsm_no_data: no data value to use in the final raster
:param texture_no_data: no data value to use in the final colored raster
:param msk_no_data: no data value to use in the final mask image
:param source_pc_names: list of names of point cloud before merging :
name of sensors pair or name of point cloud file
:param performance_map_classes: list for step defining border of class
:type performance_map_classes: list or None
:return: digital surface model + projected colors
:rtype: xr.Dataset
"""
# update attributes
attributes = copy.deepcopy(cloud.attrs)
attributes.update(attributes.get("attributes", {}))
if "attributes" in attributes:
del attributes["attributes"]
if "saving_info" in attributes:
del attributes["saving_info"]
# convert back to correct epsg
# If the point cloud is not in the right epsg referential, it is converted
if isinstance(cloud, xarray.Dataset):
# Transform Dataset to Dataframe
cloud, cloud_epsg = pc_transform.depth_map_dataset_to_dataframe(
cloud, epsg
)
elif cloud is None:
logging.warning("Input cloud is None")
return None
else:
cloud_epsg = attributes.get("epsg")
if "number_of_pc" not in attributes:
if source_pc_names is not None:
attributes["number_of_pc"] = len(source_pc_names)
else:
attributes["number_of_pc"] = None
# update attributes
cloud.attrs = {}
cars_dataset.fill_dataframe(cloud, attributes=attributes)
if epsg != cloud_epsg:
projection.point_cloud_conversion_dataframe(cloud, cloud_epsg, epsg)
# filter cloud
if "mask" in cloud:
cloud = cloud[cloud["mask"] == 0]
if cloud.dropna(subset=["x", "y", "z"]).empty:
return None
# Compute start and size
if terrain_region is None:
# compute region from cloud
xmin = np.nanmin(cloud["x"])
xmax = np.nanmax(cloud["x"])
ymin = np.nanmin(cloud["y"])
ymax = np.nanmax(cloud["y"])
# Add margin to be sure every point is rasterized
terrain_region = [
xmin - radius * resolution,
ymin - radius * resolution,
xmax + radius * resolution,
ymax + radius * resolution,
]
if terrain_full_roi is not None:
# Modify start (used in tiling.roi_to_start_and_size) [0, 3]
# to share the same global grid
terrain_region[0] = (
terrain_full_roi[0]
+ np.round(
(terrain_region[0] - terrain_full_roi[0]) / resolution
)
* resolution
)
terrain_region[3] = (
terrain_full_roi[3]
+ np.round(
(terrain_region[3] - terrain_full_roi[3]) / resolution
)
* resolution
)
# Crop
terrain_region = [
max(terrain_full_roi[0], terrain_region[0]),
max(terrain_full_roi[1], terrain_region[1]),
min(terrain_full_roi[2], terrain_region[2]),
min(terrain_full_roi[3], terrain_region[3]),
]
if (
terrain_region[0] > terrain_region[2]
or terrain_region[1] > terrain_region[3]
):
return None
xstart, ystart, xsize, ysize = tiling.roi_to_start_and_size(
terrain_region, resolution
)
if xsize == 0 or ysize == 0:
logging.warning("Tile is empty")
return None
if window is None:
transform = rio.Affine(*profile["transform"][0:6])
row_pix_pos, col_pix_pos = rio.transform.AffineTransformer(
transform
).rowcol(xstart, ystart)
window = [
row_pix_pos,
row_pix_pos + ysize,
col_pix_pos,
col_pix_pos + xsize,
]
window = cars_dataset.window_array_to_dict(window)
# Call simple_rasterization
raster = rasterization_algo.simple_rasterization_dataset_wrapper(
cloud,
resolution,
epsg,
xstart=xstart,
ystart=ystart,
xsize=xsize,
ysize=ysize,
sigma=sigma,
radius=radius,
dsm_no_data=dsm_no_data,
texture_no_data=texture_no_data,
msk_no_data=msk_no_data,
list_computed_layers=list_computed_layers,
source_pc_names=source_pc_names,
performance_map_classes=performance_map_classes,
cloud_global_id=attributes["cloud_id"],
)
# Fill raster
attributes = {
"color_type": color_dtype,
cst.CROPPED_DISPARITY_RANGE: (ocht.get_disparity_range_cropped(cloud)),
}
if raster is not None:
cars_dataset.fill_dataset(
raster,
saving_info=saving_info,
window=window,
profile=profile,
attributes=attributes,
overlaps=None,
)
return raster
[docs]
def raster_final_function(orchestrator, future_object):
"""
Apply function to current object, reading already rasterized data
:param orchestrator: orchestrator
:param future_object: Dataset
:return: update object
"""
# Get data weights
old_weights, _ = orchestrator.get_data(
cst.RASTER_WEIGHTS_SUM, future_object
)
weights = future_object[cst.RASTER_WEIGHTS_SUM].values
future_object[cst.RASTER_WEIGHTS_SUM].values = np.reshape(
rasterization_wrappers.update_weights(old_weights, weights),
weights.shape,
)
# Get color type
color_type = future_object.attrs["attributes"]["color_type"]
# Get data dsm
for tag in future_object.keys():
if tag != cst.RASTER_WEIGHTS_SUM:
if tag in [cst.RASTER_NB_PTS, cst.RASTER_NB_PTS_IN_CELL]:
method = "sum"
elif tag in [
cst.RASTER_FILLING,
cst.RASTER_CLASSIF,
cst.RASTER_SOURCE_PC,
]:
method = "bool"
else:
method = "basic"
old_data, nodata_raster = orchestrator.get_data(tag, future_object)
current_data = future_object[tag].values
if tag == cst.RASTER_COLOR_IMG and np.issubdtype(
color_type, np.integer
):
current_data = np.round(current_data).astype(color_type)
future_object[tag].values = np.reshape(
rasterization_wrappers.update_data(
old_data,
current_data,
weights,
old_weights,
nodata_raster,
method=method,
),
current_data.shape,
)
return future_object