Source code for cars.applications.dem_generation.dichotomic_generation

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the dichotomic dem generation application class.
"""


# Standard imports
import collections
import logging
import os

# Third party imports
import numpy as np
import pandas
import rasterio
import xarray as xr
from json_checker import And, Checker, Or

import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.dem_generation import (
    dem_generation_constants as dem_gen_cst,
)
from cars.applications.dem_generation.dem_generation import DemGeneration
from cars.applications.dem_generation.dem_generation_tools import (
    fit_initial_elevation_on_dem_median,
)
from cars.applications.triangulation import triangulation_tools

# CARS imports
from cars.core import constants as cst
from cars.core import preprocessing, projection
from cars.data_structures import cars_dataset
from cars.orchestrator.cluster.log_wrapper import cars_profile


[docs]class DichotomicGeneration(DemGeneration, short_name="dichotomic"): """ DichotomicGeneration """ # pylint: disable=too-many-instance-attributes def __init__(self, conf=None): """ Init function of DichotomicGeneration :param conf: configuration for DichotomicGeneration :return: an application_to_use object """ super().__init__(conf=conf) # check conf self.used_method = self.used_config["method"] self.resolution = self.used_config["resolution"] # Saving bools self.margin = self.used_config["margin"] height_margin = self.used_config["height_margin"] if isinstance(height_margin, list): self.min_height_margin = height_margin[0] self.max_height_margin = height_margin[1] else: self.min_height_margin = height_margin self.max_height_margin = height_margin self.percentile = self.used_config["percentile"] self.min_number_matches = self.used_config["min_number_matches"] self.fillnodata_max_search_distance = self.used_config[ "fillnodata_max_search_distance" ] self.min_dem = self.used_config["min_dem"] self.max_dem = self.used_config["max_dem"] self.coregistration = self.used_config["coregistration"] self.coregistration_max_shift = self.used_config[ "coregistration_max_shift" ] self.save_intermediate_data = self.used_config[ application_constants.SAVE_INTERMEDIATE_DATA ] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "dichotomic") overloaded_conf["resolution"] = conf.get("resolution", 90) # default margin: (z max - zmin) * tan(teta) # with z max = 9000, z min = 0, teta = 30 degrees overloaded_conf["margin"] = conf.get("margin", 6000) overloaded_conf["height_margin"] = conf.get("height_margin", 20) overloaded_conf["percentile"] = conf.get("percentile", 1) overloaded_conf["min_number_matches"] = conf.get( "min_number_matches", 100 ) overloaded_conf["min_dem"] = conf.get("min_dem", -500) overloaded_conf["max_dem"] = conf.get("max_dem", 1000) overloaded_conf["fillnodata_max_search_distance"] = conf.get( "fillnodata_max_search_distance", 5 ) overloaded_conf["coregistration"] = conf.get("coregistration", True) overloaded_conf["coregistration_max_shift"] = conf.get( "coregistration_max_shift", 180 ) overloaded_conf[application_constants.SAVE_INTERMEDIATE_DATA] = ( conf.get(application_constants.SAVE_INTERMEDIATE_DATA, False) ) rectification_schema = { "method": str, "resolution": And(Or(float, int), lambda x: x > 0), "margin": And(Or(float, int), lambda x: x > 0), "height_margin": Or(list, int), "percentile": And(Or(int, float), lambda x: x >= 0), "min_number_matches": And(int, lambda x: x > 0), "min_dem": And(Or(int, float), lambda x: x < 0), "max_dem": And(Or(int, float), lambda x: x > 0), "fillnodata_max_search_distance": And(int, lambda x: x > 0), "coregistration": bool, "coregistration_max_shift": And(Or(int, float), lambda x: x > 0), application_constants.SAVE_INTERMEDIATE_DATA: bool, } # Check conf checker = Checker(rectification_schema) checker.validate(overloaded_conf) return overloaded_conf
@cars_profile(name="DEM Generation") def run( self, triangulated_matches_list, output_dir, geoid_path, dem_roi_to_use=None, initial_elevation=None, cars_orchestrator=None, ): """ Run dichotomic dem generation using matches :param triangulated_matches_list: list of triangulated matches positions must be in a metric system :type triangulated_matches_list: list(pandas.Dataframe) :param output_dir: directory to save dem :type output_dir: str :param geoid_path: geoid path :param cars_orchrestrator: the main cars orchestrator :param dem_roi_to_use: dem roi polygon to use as roi :param initial_elevation: the path to the initial elevation file :return: dem data computed with mean, min and max, and new initial elevation file. dem is also saved in disk, and paths are available in attributes. (DEM_MEDIAN_PATH, DEM_MIN_PATH, DEM_MAX_PATH) :rtype: Tuple(CarsDataset, str | None) """ # Create sequential orchestrator for savings self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) # Generate point cloud epsg = 4326 for pair_pc in triangulated_matches_list: # convert to degrees for geoid offset if pair_pc.attrs["epsg"] != epsg: projection.point_cloud_conversion_dataset(pair_pc, epsg) merged_point_cloud = pandas.concat( triangulated_matches_list, ignore_index=True, sort=False, ) merged_point_cloud.attrs["epsg"] = epsg # Get bounds if dem_roi_to_use is not None: bounds_poly = dem_roi_to_use.bounds xmin = min(bounds_poly[0], bounds_poly[2]) xmax = max(bounds_poly[0], bounds_poly[2]) ymin = min(bounds_poly[1], bounds_poly[3]) ymax = max(bounds_poly[1], bounds_poly[3]) else: # Get min max with margin mins = merged_point_cloud.min(skipna=True) maxs = merged_point_cloud.max(skipna=True) xmin = mins["x"] ymin = mins["y"] xmax = maxs["x"] ymax = maxs["y"] bounds_cloud = [xmin, ymin, xmax, ymax] # Convert resolution and margin to degrees utm_epsg = preprocessing.get_utm_zone_as_epsg_code(xmin, ymin) conversion_factor = preprocessing.get_conversion_factor( bounds_cloud, epsg, utm_epsg ) self.margin *= conversion_factor self.resolution *= conversion_factor # Get borders, adding margin xmin = xmin - self.margin ymin = ymin - self.margin xmax = xmax + self.margin ymax = ymax + self.margin # Generate regular grid xnew, ynew = generate_grid( merged_point_cloud, self.resolution, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax, ) # define functions to use funcs = [ np.median, (np.percentile, self.percentile), (np.percentile, 100 - self.percentile), ] list_z_grid = [] for _ in range(len(funcs)): list_z_grid.append(np.full_like(xnew, np.nan)) row_min = 0 col_min = 0 row_max = list_z_grid[0].shape[0] col_max = list_z_grid[0].shape[1] # use 100% overlap for dem overlap = 1 * self.resolution # Modify output grids multi_res_rec( merged_point_cloud, funcs, xnew, ynew, list_z_grid, row_min, row_max, col_min, col_max, self.min_number_matches, overlap, ) # Generate dense dataset with z = 0 alti_zeros_dataset = xr.Dataset( { cst.X: (["row", "col"], xnew), cst.Y: (["row", "col"], ynew), cst.Z: (["row", "col"], np.zeros(xnew.shape)), }, coords={ "row": np.arange(xnew.shape[0]), "col": np.arange(xnew.shape[1]), }, ) alti_zeros_dataset.attrs[cst.EPSG] = epsg # Transform to lon lat projection.point_cloud_conversion_dataset(alti_zeros_dataset, 4326) geoid_offset = triangulation_tools.geoid_offset( alti_zeros_dataset, geoid_path ) # fillnodata valid = np.isfinite(list_z_grid[0]) msd = self.fillnodata_max_search_distance for idx in range(3): list_z_grid[idx] = rasterio.fill.fillnodata( list_z_grid[idx], mask=valid, max_search_distance=msd ) list_z_grid[idx] += geoid_offset[cst.Z].values dem_median = list_z_grid[0] dem_min = list_z_grid[1] dem_max = list_z_grid[2] if np.any((dem_max - dem_min) < 0): logging.error("dem min > dem max") raise RuntimeError("dem min > dem max") # apply height margin dem_min -= self.min_height_margin dem_max += self.max_height_margin dem_min = np.where( dem_median - dem_min < self.min_dem, dem_median + self.min_dem, dem_min, ) dem_max = np.where( dem_max - dem_median > self.max_dem, dem_median + self.max_dem, dem_max, ) # Generate CarsDataset dem = cars_dataset.CarsDataset("arrays", name="dem_generation") # Compute tiling grid # Only one tile dem.tiling_grid = np.array([[[0, row_max, 0, col_max]]]) # saving infos # dem mean dem_median_path = os.path.join(output_dir, "dem_median.tif") self.orchestrator.add_to_save_lists( dem_median_path, dem_gen_cst.DEM_MEDIAN, dem, dtype=np.float32, nodata=-32768, cars_ds_name="dem_median", ) dem.attributes[dem_gen_cst.DEM_MEDIAN_PATH] = dem_median_path # dem min dem_min_path = os.path.join(output_dir, "dem_min.tif") self.orchestrator.add_to_save_lists( dem_min_path, dem_gen_cst.DEM_MIN, dem, dtype=np.float32, nodata=-32768, cars_ds_name="dem_min", ) dem.attributes[dem_gen_cst.DEM_MIN_PATH] = dem_min_path # dem max dem_max_path = os.path.join(output_dir, "dem_max.tif") self.orchestrator.add_to_save_lists( dem_max_path, dem_gen_cst.DEM_MAX, dem, dtype=np.float32, nodata=-32768, cars_ds_name="dem_max", ) dem.attributes[dem_gen_cst.DEM_MAX_PATH] = dem_max_path bounds = [xmin, ymin, xmax, ymax] # Generate profile geotransform = ( self.resolution, 0, bounds[0], 0, -self.resolution, bounds[3], ) transform = rasterio.Affine(*geotransform) raster_profile = collections.OrderedDict( { "height": row_max, "width": col_max, "driver": "GTiff", "dtype": "float32", "transform": transform, "crs": "EPSG:{}".format(epsg), "tiled": True, } ) # Generate dataset dem_tile = xr.Dataset( data_vars={ "dem_median": (["row", "col"], np.flip(dem_median, axis=0)), "dem_min": (["row", "col"], np.flip(dem_min, axis=0)), "dem_max": (["row", "col"], np.flip(dem_max, axis=0)), }, coords={ "row": np.arange(0, row_max), "col": np.arange(0, col_max), }, ) [saving_info] = ( # pylint: disable=unbalanced-tuple-unpacking self.orchestrator.get_saving_infos([dem]) ) saving_info = ocht.update_saving_infos(saving_info, row=0, col=0) window = cars_dataset.window_array_to_dict(dem.tiling_grid[0, 0]) cars_dataset.fill_dataset( dem_tile, saving_info=saving_info, window=window, profile=raster_profile, attributes=None, overlaps=None, ) dem[0, 0] = dem_tile # Save self.orchestrator.breakpoint() # cleanup self.orchestrator.cleanup() # after saving, fit initial elevation if required if initial_elevation is not None and self.coregistration: initial_elevation_out_path = os.path.join( output_dir, "initial_elevation_fit.tif" ) coreg_offsets = fit_initial_elevation_on_dem_median( initial_elevation, dem_median_path, initial_elevation_out_path ) coreg_info = { application_constants.APPLICATION_TAG: { "dem_generation": {"coregistration": coreg_offsets} } } # save the coreg shift info in cars's main orchestrator cars_orchestrator.update_out_info(coreg_info) if ( coreg_offsets is None or abs(coreg_offsets["shift_x"]) > self.coregistration_max_shift or abs(coreg_offsets["shift_y"]) > self.coregistration_max_shift ): logging.warning( "The initial elevation will be used as-is because " "coregistration failed or gave inconsistent results" ) return dem, None return dem, initial_elevation_out_path return dem, None
[docs]def generate_grid( pd_pc, resolution, xmin=None, xmax=None, ymin=None, ymax=None ): """ Generate regular grid :param pd_pc: point cloud :type pd_pc: Pandas Dataframe :param resolution: resolution in meter :type resolution: float :param xmin: x min position in metric system :type xmin: float :param xmax: x max position in metric system :type xmax: float :param ymin: y min position in metric system :type ymin: float :param ymax: y max position in metric system :type ymax: float :return: regular grid :rtype: numpy array """ if None in (xmin, xmax, ymin, ymax): mins = pd_pc.min(skipna=True) maxs = pd_pc.max(skipna=True) xmin = mins["x"] ymin = mins["y"] xmax = maxs["x"] ymax = maxs["y"] nb_x = int((xmax - xmin) / resolution) x_range = np.linspace(xmin, xmax, nb_x) nb_y = int((ymax - ymin) / resolution) y_range = np.linspace(ymin, ymax, nb_y) x_grid, y_grid = np.meshgrid(x_range, y_range) # 2D grid for interpolation return x_grid, y_grid
[docs]def multi_res_rec( pd_pc, list_fun, x_grid, y_grid, list_z_grid, row_min, row_max, col_min, col_max, min_number_matches, overlap, ): """ Recursive function to fill grid with results of given functions :param pd_pc: point cloud :type pd_pc: Pandas Dataframe :param list_fun: list of functions :type list_fun: list(function) :param x_grid: x grid :type x_grid: numpy array :param y_grid: y grid :type y_grid: numpy array :param list_z_grid: list of z grid computed with functions :type list_z_grid: list(numpy array) :param row_min: row min :type row_min: int :param row_max: row max :type row_max: int :param col_min: col min :type col_min: int :param col_max: col max :type col_max: int :param min_number_matches: minimum of matches: stop condition :type min_number_matches: int :param overlap: overlap to use for include condition :type overlap: float """ if pd_pc.shape[0] < min_number_matches: raise RuntimeError("Not enough matches") if len(list_fun) != len(list_z_grid): raise RuntimeError( "Number of functions must match the number of z layers" ) x_values = x_grid[row_min:row_max, col_min:col_max] y_values = y_grid[row_min:row_max, col_min:col_max] xmin = np.nanmin(x_values) ymin = np.nanmin(y_values) xmax = np.nanmax(x_values) ymax = np.nanmax(y_values) xcenter = (xmax + xmin) / 2 ycenter = (ymax + ymin) / 2 # find points tile_pc = pd_pc.loc[ (pd_pc["x"] >= xmin - overlap) & (pd_pc["x"] < xmax + overlap) & (pd_pc["y"] >= ymin - overlap) & (pd_pc["y"] < ymax + overlap) ] nb_matches = tile_pc.shape[0] if ( nb_matches > min_number_matches and (row_max - row_min > 0) and (col_max - col_min > 0) ): # apply global value for fun, z_grid in zip(list_fun, list_z_grid): # noqa: B905 if ( np.abs(xcenter - np.median(tile_pc["x"])) < overlap and np.abs(ycenter - np.median(tile_pc["y"])) < overlap ): if isinstance(fun, tuple): # percentile z_grid[row_min:row_max, col_min:col_max] = fun[0]( tile_pc["z"], fun[1] ) else: z_grid[row_min:row_max, col_min:col_max] = fun(tile_pc["z"]) else: z_grid[row_min:row_max, col_min:col_max] = np.nan list_row = [] if row_max - row_min >= 2: med = int((row_max + row_min) / 2) list_row.append((row_min, med)) list_row.append((med, row_max)) else: list_row.append((row_min, row_max)) list_col = [] if col_max - col_min >= 2: med = int((col_max + col_min) / 2) list_col.append((col_min, med)) list_col.append((med, col_max)) else: list_col.append((col_min, col_max)) # if not ( len(list_row) == 1 and len(list_col) == 1): if len(list_row) + len(list_col) > 2: for row_min_tile, row_max_tile in list_row: for col_min_tile, col_max_tile in list_col: multi_res_rec( tile_pc, list_fun, x_grid, y_grid, list_z_grid, row_min_tile, row_max_tile, col_min_tile, col_max_tile, min_number_matches, overlap, )