#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=too-many-lines
# attribute-defined-outside-init is disabled so that we can create and use
# attributes however we need, to stick to the "everything is attribute" logic
# introduced in issue#895
# pylint: disable=attribute-defined-outside-init
# pylint: disable=too-many-nested-blocks
"""
CARS filling pipeline class file
"""
from __future__ import print_function
import warnings
import numpy as np
import rasterio
import xarray as xr
from rasterio.errors import NodataShadowWarning
from rasterio.windows import Window
from cars.data_structures import cars_dataset
[docs]
def merge_filling_bands_wrapper( # pylint: disable=R0917
in_filling_path,
aux_filling,
dsm_file,
window=None,
saving_info=None,
profile=None,
):
"""
Merge filling bands to get mono band in output
"""
# swap the keys and values of aux_filling
inverse_aux_filling = {}
for k, v in aux_filling.items():
if isinstance(v, list):
for item in v:
inverse_aux_filling[item] = k
else:
inverse_aux_filling[v] = k
# Get rasterio window
col_min = window["col_min"]
row_min = window["row_min"]
col_max = window["col_max"]
row_max = window["row_max"]
rasterio_window = Window(
col_off=col_min,
row_off=row_min,
width=(col_max - col_min),
height=(row_max - row_min),
)
with rasterio.open(dsm_file) as in_dsm:
dsm_msk = in_dsm.read_masks(1, window=rasterio_window)
profile = in_dsm.profile
with rasterio.open(in_filling_path) as src:
nb_bands = src.count
if nb_bands == 1:
return False
filling_multi_bands = src.read(window=rasterio_window)
filling_mono_bands = np.zeros(filling_multi_bands.shape[1:3])
descriptions = src.descriptions
dict_temp = {name: i for i, name in enumerate(descriptions)}
if "cropped_disp_range" in dict_temp:
del dict_temp["cropped_disp_range"]
profile = src.profile
with warnings.catch_warnings():
warnings.simplefilter("ignore", NodataShadowWarning)
filling_mask = src.read_masks(1, window=rasterio_window)
filling_mono_bands[filling_mask == 0] = 0
filling_bands_list = {
"fill_with_geoid": ["filling_exogenous"],
"interpolate_from_borders": [
"bulldozer",
"border_interpolation",
],
"fill_with_endogenous_dem": ["bulldozer"],
"fill_with_exogenous_dem": ["filling_exogenous", "bulldozer"],
"no_edition": [],
}
# To get the right footprint
filling_mono_bands = np.logical_or(dsm_msk, filling_mask).astype(
np.uint8
)
# to keep the previous classif convention
filling_mono_bands[filling_mono_bands == 0] = src.nodata
filling_mono_bands[filling_mono_bands == 1] = 0
for key, value in aux_filling.items():
if isinstance(value, str):
value = [value]
if isinstance(value, list):
for elem in value:
if elem == "no_edition":
mask_1 = np.all(
filling_multi_bands[1:, :, :] == 0, axis=0
)
mask_2 = filling_mono_bands == 0
filling_mono_bands[mask_1 & mask_2] = key
continue
if elem in ("other", "interpolation"):
continue
filling_method = filling_bands_list[elem]
if all(method in descriptions for method in filling_method):
indices_true = [dict_temp[m] for m in filling_method]
mask_true = np.all(
filling_multi_bands[indices_true, :, :] == 1,
axis=0,
)
indices_false = [
i
for i in range(filling_multi_bands.shape[0])
if i not in indices_true
and i in list(dict_temp.values())
]
mask_false = np.all(
filling_multi_bands[indices_false, :, :] == 0,
axis=0,
)
mask = mask_true & mask_false
filling_mono_bands[mask] = key
mask_1 = np.any(filling_multi_bands[1:, :, :], axis=0)
mask_2 = filling_mono_bands == 0
filling_mono_bands[mask_1 & mask_2] = (
inverse_aux_filling["other"]
if "other" in inverse_aux_filling
else 7
)
filling_mono_bands[filling_mono_bands == src.nodata] = 0
output_dataset = xr.Dataset(
data_vars={
"mono_filling": (["row", "col"], filling_mono_bands),
},
coords={
"row": np.arange(filling_mono_bands.shape[0]),
"col": np.arange(filling_mono_bands.shape[1]),
},
)
cars_dataset.fill_dataset(
output_dataset,
saving_info=saving_info,
window=window,
profile=profile,
attributes=None,
overlaps=None,
)
return output_dataset
[docs]
def merge_classif_bands_wrapper( # pylint: disable=R0917
in_classif_path,
aux_classif,
dsm_file,
window=None,
saving_info=None,
profile=None,
):
"""
Merge classif bands to get mono band in output
"""
# Get rasterio window
col_min = window["col_min"]
row_min = window["row_min"]
col_max = window["col_max"]
row_max = window["row_max"]
rasterio_window = Window(
col_off=col_min,
row_off=row_min,
width=(col_max - col_min),
height=(row_max - row_min),
)
with rasterio.open(dsm_file) as in_dsm:
dsm_msk = in_dsm.read_masks(1, window=rasterio_window)
with rasterio.open(in_classif_path) as src:
nb_bands = src.count
if nb_bands == 1:
return False
classif_multi_bands = src.read(window=rasterio_window)
classif_mono_band = np.zeros(classif_multi_bands.shape[1:3])
descriptions = src.descriptions
classif_mask = src.read_masks(1, window=rasterio_window)
classif_mono_band[classif_mask == 0] = 0
# To get the right footprint
classif_mono_band = np.logical_or(dsm_msk, classif_mask).astype(
np.uint8
)
# to keep the previous classif convention
classif_mono_band[classif_mono_band == 0] = src.nodata
classif_mono_band[classif_mono_band == 1] = 0
for key, value in aux_classif.items():
if isinstance(value, int):
num_band = descriptions.index(str(value))
mask_1 = classif_mono_band == 0
mask_2 = classif_multi_bands[num_band, :, :] == 1
classif_mono_band[mask_1 & mask_2] = key
elif isinstance(value, list):
for elem in value:
num_band = descriptions.index(str(elem))
mask_1 = classif_mono_band == 0
mask_2 = classif_multi_bands[num_band, :, :] == 1
classif_mono_band[mask_1 & mask_2] = key
output_dataset = xr.Dataset(
data_vars={
"classification": (["row", "col"], classif_mono_band),
},
coords={
"row": np.arange(classif_mono_band.shape[0]),
"col": np.arange(classif_mono_band.shape[1]),
},
)
cars_dataset.fill_dataset(
output_dataset,
saving_info=saving_info,
window=window,
profile=profile,
attributes=None,
overlaps=None,
)
return output_dataset
[docs]
def monoband_to_multiband_wrapper( # pylint: disable=R0917
input_raster,
bands_classif,
nodata_value,
window=None,
saving_info=None,
profile=None,
):
"""
Convert classification from monoband to multiband
:param input_raster: the intput classification path
:type input_raster: str
:param bands_classif: the bands values
:type bands_classif: list
"""
# Get rasterio window
col_min = window["col_min"]
row_min = window["row_min"]
col_max = window["col_max"]
row_max = window["row_max"]
rasterio_window = Window(
col_off=col_min,
row_off=row_min,
width=(col_max - col_min),
height=(row_max - row_min),
)
with rasterio.open(input_raster) as src:
mono = src.read(1, window=rasterio_window)
mono_msk = src.read_masks(1, window=rasterio_window)
multiband = np.zeros(
(len(bands_classif), mono.shape[0], mono.shape[1]), dtype=np.uint8
)
multiband_msk = np.broadcast_to(mono_msk, multiband.shape)
for i, cls in enumerate(bands_classif):
multiband[i] = mono == cls
multiband[multiband_msk == 0] = nodata_value
output_dataset = xr.Dataset(
data_vars={
"classification": (["band", "row", "col"], multiband),
},
coords={
"band": bands_classif,
"row": np.arange(mono.shape[0]),
"col": np.arange(mono.shape[1]),
},
)
cars_dataset.fill_dataset(
output_dataset,
saving_info=saving_info,
window=window,
profile=profile,
attributes=None,
overlaps=None,
)
return output_dataset