Source code for cars.applications.sparse_matching.sift

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the dense_matching application class.
"""

# pylint: disable= C0302

# Standard imports
import logging
import os
from typing import Dict, Tuple

# Third party imports
import numpy as np
import pandas
import xarray as xr
from json_checker import And, Checker, Or

import cars.applications.sparse_matching.sparse_matching_constants as sm_cst
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants

# CARS imports
from cars.applications.sparse_matching import sparse_matching_tools
from cars.applications.sparse_matching.sparse_matching import SparseMatching
from cars.core import constants as cst
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset


[docs]class Sift(SparseMatching, short_name=["sift"]): """ SparseMatching """ # pylint: disable=too-many-instance-attributes def __init__(self, conf=None): """ Init function of SparseMatching :param conf: configuration for matching :return: a application_to_use object """ super().__init__(conf=conf) # check conf self.used_method = self.used_config["method"] self.disparity_margin = self.used_config["disparity_margin"] self.elevation_delta_lower_bound = self.used_config[ "elevation_delta_lower_bound" ] self.elevation_delta_upper_bound = self.used_config[ "elevation_delta_upper_bound" ] self.strip_margin = self.used_config["strip_margin"] self.epipolar_error_upper_bound = self.used_config[ "epipolar_error_upper_bound" ] self.epipolar_error_maximum_bias = self.used_config[ "epipolar_error_maximum_bias" ] # minimum number of matches to continue with self.minimum_nb_matches = self.used_config["minimum_nb_matches"] # sifts self.sift_matching_threshold = self.used_config[ "sift_matching_threshold" ] self.sift_n_octave = self.used_config["sift_n_octave"] self.sift_n_scale_per_octave = self.used_config[ "sift_n_scale_per_octave" ] self.sift_peak_threshold = self.used_config["sift_peak_threshold"] self.sift_edge_threshold = self.used_config["sift_edge_threshold"] self.sift_magnification = self.used_config["sift_magnification"] self.sift_window_size = self.used_config["sift_window_size"] self.sift_back_matching = self.used_config["sift_back_matching"] self.match_filter_knn = self.used_config["match_filter_knn"] self.match_filter_constant = self.used_config["match_filter_constant"] self.match_filter_mean_factor = self.used_config[ "match_filter_mean_factor" ] self.match_filter_dev_factor = self.used_config[ "match_filter_dev_factor" ] self.decimation_factor = self.used_config["decimation_factor"] # Saving files self.save_intermediate_data = self.used_config["save_intermediate_data"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf overloaded_conf["method"] = conf.get("method", "sift") overloaded_conf["disparity_margin"] = conf.get("disparity_margin", 0.02) overloaded_conf["elevation_delta_lower_bound"] = conf.get( "elevation_delta_lower_bound", None ) overloaded_conf["decimation_factor"] = conf.get("decimation_factor", 30) overloaded_conf["elevation_delta_upper_bound"] = conf.get( "elevation_delta_upper_bound", None ) overloaded_conf["strip_margin"] = conf.get("strip_margin", 10) overloaded_conf["epipolar_error_upper_bound"] = conf.get( "epipolar_error_upper_bound", 10.0 ) overloaded_conf["epipolar_error_maximum_bias"] = conf.get( "epipolar_error_maximum_bias", 0.0 ) # minimum number of matches to continue with overloaded_conf["minimum_nb_matches"] = conf.get( "minimum_nb_matches", 100 ) # sifts params overloaded_conf["sift_matching_threshold"] = conf.get( "sift_matching_threshold", 0.7 ) overloaded_conf["sift_n_octave"] = conf.get("sift_n_octave", 8) overloaded_conf["sift_n_scale_per_octave"] = conf.get( "sift_n_scale_per_octave", 3 ) overloaded_conf["sift_peak_threshold"] = conf.get( "sift_peak_threshold", 4.0 ) overloaded_conf["sift_edge_threshold"] = conf.get( "sift_edge_threshold", 10.0 ) overloaded_conf["sift_magnification"] = conf.get( "sift_magnification", 7.0 ) overloaded_conf["sift_window_size"] = conf.get("sift_window_size", 2) overloaded_conf["sift_back_matching"] = conf.get( "sift_back_matching", True ) overloaded_conf["match_filter_knn"] = conf.get("match_filter_knn", 25) overloaded_conf["match_filter_constant"] = conf.get( "match_filter_constant", 0.0 ) overloaded_conf["match_filter_mean_factor"] = conf.get( "match_filter_mean_factor", 1.3 ) overloaded_conf["match_filter_dev_factor"] = conf.get( "match_filter_dev_factor", 3.0 ) # Saving files overloaded_conf["save_intermediate_data"] = conf.get( "save_intermediate_data", False ) self.save_intermediate_data = overloaded_conf["save_intermediate_data"] sparse_matching_schema = { "method": str, "disparity_margin": float, "minimum_nb_matches": And(int, lambda x: x > 0), "elevation_delta_lower_bound": Or(int, float, None), "elevation_delta_upper_bound": Or(int, float, None), "strip_margin": And(int, lambda x: x > 0), "epipolar_error_upper_bound": And(float, lambda x: x > 0), "epipolar_error_maximum_bias": And(float, lambda x: x >= 0), "sift_matching_threshold": And(float, lambda x: x > 0), "sift_n_octave": And(int, lambda x: x > 0), "sift_n_scale_per_octave": And(int, lambda x: x > 0), "sift_peak_threshold": Or(float, None), "sift_edge_threshold": float, "sift_magnification": And(float, lambda x: x > 0), "sift_window_size": And(int, lambda x: x > 0), "decimation_factor": And(int, lambda x: x > 0), "sift_back_matching": bool, "match_filter_knn": int, "match_filter_constant": Or(int, float), "match_filter_mean_factor": Or(int, float), "match_filter_dev_factor": Or(int, float), "save_intermediate_data": bool, } # Check conf checker = Checker(sparse_matching_schema) checker.validate(overloaded_conf) # Check consistency between bounds for elevation delta elevation_delta_lower_bound = overloaded_conf[ "elevation_delta_lower_bound" ] elevation_delta_upper_bound = overloaded_conf[ "elevation_delta_upper_bound" ] if None not in ( elevation_delta_lower_bound, elevation_delta_upper_bound, ): if elevation_delta_lower_bound > elevation_delta_upper_bound: raise ValueError( "Upper bound must be bigger than " "lower bound for expected elevation delta" ) return overloaded_conf
[docs] def get_save_matches(self): """ Get save_matches parameter :return: true is save_matches activated :rtype: bool """ return self.save_intermediate_data
[docs] def get_disparity_margin(self): """ Get disparity margin corresponding to sparse matches :return: margin in percent """ return self.disparity_margin
[docs] def get_strip_margin(self): """ Get strip margin corresponding to sparse matches :return: margin in percent """ return self.strip_margin
[docs] def get_epipolar_error_upper_bound(self): """ Get epipolar error upper bound corresponding to sparse matches :return: margin """ return self.epipolar_error_upper_bound
[docs] def get_minimum_nb_matches(self): """ Get minimum_nb_matches : get the minimum number of matches :return: minimum_nb_matches """ return self.minimum_nb_matches
[docs] def get_epipolar_error_maximum_bias(self): """ Get epipolar error maximum bias corresponding to sparse matches :return: margin """ return self.epipolar_error_maximum_bias
[docs] def get_match_filter_knn(self): """ Get match_filter_knn : number of neighboors used to measure isolation of matches :return: match_filter_knn """ return self.match_filter_knn
[docs] def get_match_filter_constant(self): """ Get get_match_filter_constant : constant in the formula to compute threshold of outliers :return: match_filter_constant """ return self.match_filter_constant
[docs] def get_match_filter_mean_factor(self): """ Get match_filter_mean_factor : factor of mean in the formula to compute threshold of outliers :return: match_filter_mean_factor """ return self.match_filter_mean_factor
[docs] def get_match_filter_dev_factor(self): """ Get match_filter_dev_factor : factor of deviation in the formula to compute threshold of outliers :return: match_filter_dev_factor """ return self.match_filter_dev_factor
[docs] def get_decimation_factor(self): """ Get decimation_factor :return: decimation_factor """ return self.decimation_factor
[docs] def set_decimation_factor(self, value): """ set decimation_factor """ self.decimation_factor = value
[docs] def run( self, epipolar_image_left, epipolar_image_right, disp_to_alt_ratio, orchestrator=None, pair_folder=None, pair_key="PAIR_0", ): """ Run Matching application. Create left and right CarsDataset filled with pandas.DataFrame , corresponding to epipolar 2D disparities, on the same geometry that epipolar_image_left and epipolar_image_right. :param epipolar_image_left: tiled left epipolar. CarsDataset contains: - N x M Delayed tiles \ Each tile will be a future xarray Dataset containing: - data with keys : "im", "msk", "color" - attrs with keys: "margins" with "disp_min" and "disp_max" "transform", "crs", "valid_pixels", "no_data_mask", "no_data_img" - attributes containing: "largest_epipolar_region","opt_epipolar_tile_size" :type epipolar_image_left: CarsDataset :param epipolar_image_right: tiled right epipolar.CarsDataset contains: - N x M Delayed tiles \ Each tile will be a future xarray Dataset containing: - data with keys : "im", "msk", "color" - attrs with keys: "margins" with "disp_min" and "disp_max"\ "transform", "crs", "valid_pixels", "no_data_mask",\ "no_data_img" - attributes containing:"largest_epipolar_region", \ "opt_epipolar_tile_size" :type epipolar_image_right: CarsDataset :param disp_to_alt_ratio: disp to alti ratio :type disp_to_alt_ratio: float :param orchestrator: orchestrator used :param pair_folder: folder used for current pair :type pair_folder: str :param pair_key: pair key id :type pair_key: str :return left matches, right matches. Each CarsDataset contains: - N x M Delayed tiles \ Each tile will be a future pandas DataFrame containing: - data : (L, 4) shape matches - attributes containing "disp_lower_bound", "disp_upper_bound",\ "elevation_delta_lower_bound","elevation_delta_upper_bound" :rtype: Tuple(CarsDataset, CarsDataset) """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator if pair_folder is None: pair_folder = os.path.join(self.orchestrator.out_dir, "tmp") if epipolar_image_left.dataset_type == "arrays": # Create CarsDataset # Epipolar_disparity epipolar_disparity_map_left = cars_dataset.CarsDataset( "points", name="sparse_matching_" + pair_key ) epipolar_disparity_map_left.create_empty_copy(epipolar_image_left) # Update attributes to get epipolar info epipolar_disparity_map_left.attributes.update( epipolar_image_left.attributes ) # check sift_peak_threshold with image type # only if sift_peak_threshold is None tmp_sift_peak_threshold = self.sift_peak_threshold if not self.sift_peak_threshold: logging.info("The sift_peak_threshold is set to auto-mode.") # sift_peak_threshold is None or not specified # check input type if np.issubdtype( epipolar_disparity_map_left.attributes["image_type"], np.uint8, ): tmp_sift_peak_threshold = 1 else: tmp_sift_peak_threshold = 20 logging.info( "The sift_peak_threshold will be set to {}.".format( tmp_sift_peak_threshold ) ) self.sift_peak_threshold = tmp_sift_peak_threshold # Save disparity maps if self.save_intermediate_data: safe_makedirs(pair_folder) self.orchestrator.add_to_save_lists( os.path.join(pair_folder, "epi_matches_left"), None, epipolar_disparity_map_left, cars_ds_name="epi_matches_left", ) # Compute disparity range if self.elevation_delta_lower_bound is None: disp_upper_bound = np.inf else: disp_upper_bound = ( -self.elevation_delta_lower_bound / disp_to_alt_ratio ) if self.elevation_delta_upper_bound is None: disp_lower_bound = -np.inf else: disp_lower_bound = ( -self.elevation_delta_upper_bound / disp_to_alt_ratio ) attributes = { "disp_lower_bound": disp_lower_bound, "disp_upper_bound": disp_upper_bound, "elevation_delta_lower_bound": self.elevation_delta_lower_bound, "elevation_delta_upper_bound": self.elevation_delta_upper_bound, } epipolar_disparity_map_left.attributes.update(attributes) # Get saving infos in order to save tiles when they are computed [saving_info_left] = self.orchestrator.get_saving_infos( [epipolar_disparity_map_left] ) # Update orchestrator out_json updating_infos = { application_constants.APPLICATION_TAG: { sm_cst.SPARSE_MATCHING_RUN_TAG: { pair_key: { sm_cst.DISP_LOWER_BOUND: disp_lower_bound, sm_cst.DISP_UPPER_BOUND: disp_upper_bound, }, } } } orchestrator.update_out_info(updating_infos) logging.info( "Generate disparity: Number tiles: {}".format( epipolar_disparity_map_left.shape[1] * epipolar_disparity_map_left.shape[0] ) ) # Add to replace list so tiles will be readable at the same time self.orchestrator.add_to_replace_lists( epipolar_disparity_map_left, cars_ds_name="epi_matches_left" ) # Generate disparity maps total_nb_band_sift = epipolar_disparity_map_left.shape[0] step = int(np.round(100 / self.decimation_factor)) if total_nb_band_sift in (1, 2): step = 1 elif total_nb_band_sift == 3: step = 2 for row in range(0, total_nb_band_sift, step): # initialize list of matches full_saving_info_left = ocht.update_saving_infos( saving_info_left, row=row, col=0 ) # Compute matches if type(None) not in ( type(epipolar_image_left[row, 0]), type(epipolar_image_right[row, 0]), ): ( epipolar_disparity_map_left[row, 0] ) = self.orchestrator.cluster.create_task( compute_matches_wrapper, nout=1 )( epipolar_image_left[row, 0], epipolar_image_right[row, 0], matching_threshold=self.sift_matching_threshold, n_octave=self.sift_n_octave, n_scale_per_octave=self.sift_n_scale_per_octave, peak_threshold=tmp_sift_peak_threshold, edge_threshold=self.sift_edge_threshold, magnification=self.sift_magnification, window_size=self.sift_window_size, backmatching=self.sift_back_matching, disp_lower_bound=disp_lower_bound, disp_upper_bound=disp_upper_bound, saving_info_left=full_saving_info_left, ) else: logging.error( "SparseMatching application doesn't " "support this input data format" ) return epipolar_disparity_map_left, None
[docs]def compute_matches_wrapper( left_image_object: xr.Dataset, right_image_object: xr.Dataset, matching_threshold=None, n_octave=None, n_scale_per_octave=None, peak_threshold=None, edge_threshold=None, magnification=None, window_size=None, backmatching=None, disp_lower_bound=None, disp_upper_bound=None, saving_info_left=None, ) -> Dict[str, Tuple[xr.Dataset, xr.Dataset]]: """ Compute matches from image objects. This function will be run as a delayed task. User must provide saving infos to save properly created datasets :param left_image_object: tiled Left image dataset with : - cst.EPI_IMAGE - cst.EPI_MSK (if given) - cst.EPI_COLOR (for left, if given) :type left_image_object: xr.Dataset with : - cst.EPI_IMAGE - cst.EPI_MSK (if given) - cst.EPI_COLOR (for left, if given) :param right_image_object: tiled Right image :type right_image_object: xr.Dataset :return: Left matches object, Right matches object (if exists) Returned objects are composed of : - dataframe (None for right object) with : - TODO """ # Create mask # TODO : remove overwriting of EPI_MSK saved_left_mask = np.copy(left_image_object[cst.EPI_MSK].values) saved_right_mask = np.copy(right_image_object[cst.EPI_MSK].values) # Compute matches matches = sparse_matching_tools.dataset_matching( left_image_object, right_image_object, matching_threshold=matching_threshold, n_octave=n_octave, n_scale_per_octave=n_scale_per_octave, peak_threshold=peak_threshold, edge_threshold=edge_threshold, magnification=magnification, window_size=window_size, backmatching=backmatching, disp_lower_bound=disp_lower_bound, disp_upper_bound=disp_upper_bound, ) # Filter matches outside disparity range if disp_lower_bound is not None and disp_upper_bound is not None: filtered_nb_matches = matches.shape[0] matches = matches[matches[:, 2] - matches[:, 0] >= disp_lower_bound] matches = matches[matches[:, 2] - matches[:, 0] <= disp_upper_bound] logging.debug( "{} matches discarded because they fall outside of disparity range " "defined by --elevation_delta_lower_bound and " "--elevation_delta_upper_bound: [{} pix., {} pix.]".format( filtered_nb_matches - matches.shape[0], disp_lower_bound, disp_upper_bound, ) ) else: logging.debug("Matches outside disparity range were not filtered") # convert to Dataframe left_matches_dataframe = pandas.DataFrame(matches) # recover initial mask data in input images # TODO remove with proper dataset creation left_image_object[cst.EPI_MSK].values = saved_left_mask right_image_object[cst.EPI_MSK].values = saved_right_mask cars_dataset.fill_dataframe( left_matches_dataframe, saving_info=saving_info_left, attributes=None ) return left_matches_dataframe