#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
this module contains the abstract matching application class.
"""
import logging
import math
import os
from abc import ABCMeta, abstractmethod
from typing import Dict
import numpy as np
import xarray as xr
from shareloc.geofunctions.rectification_grid import RectificationGrid
import cars.applications.sparse_matching.sparse_matching_constants as sm_cst
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.application import Application
from cars.applications.application_template import ApplicationTemplate
from cars.core import constants as cst
from cars.core.utils import safe_makedirs
[docs]@Application.register("sparse_matching")
class SparseMatching(ApplicationTemplate, metaclass=ABCMeta):
"""
SparseMatching
"""
available_applications: Dict = {}
default_application = "sift"
def __new__(cls, conf=None): # pylint: disable=W0613
"""
Return the required application
:raises:
- KeyError when the required application is not registered
:param conf: configuration for matching
:return: a application_to_use object
"""
matching_method = cls.default_application
if bool(conf) is False or "method" not in conf:
logging.info(
"Sparse Matching method not specified, default "
" {} is used".format(matching_method)
)
else:
matching_method = conf.get("method", cls.default_application)
if matching_method not in cls.available_applications:
logging.error(
"No matching application named {} registered".format(
matching_method
)
)
raise KeyError(
"No matching application named {} registered".format(
matching_method
)
)
logging.info(
"The SparseMatching({}) application will be used".format(
matching_method
)
)
return super(SparseMatching, cls).__new__(
cls.available_applications[matching_method]
)
[docs] def __init_subclass__(cls, short_name, **kwargs): # pylint: disable=E0302
super().__init_subclass__(**kwargs)
for name in short_name:
cls.available_applications[name] = cls
def __init__(self, conf=None):
"""
Init function of SparseMatching
:param conf: configuration
:return: an application_to_use object
"""
super().__init__(conf=conf)
[docs] @abstractmethod
def get_disparity_margin(self):
"""
Get disparity margin corresponding to sparse matches
:return: margin in percent
"""
[docs] @abstractmethod
def get_strip_margin(self):
"""
Get strip margin corresponding to sparse matches
:return: margin
"""
[docs] @abstractmethod
def get_epipolar_error_upper_bound(self):
"""
Get epipolar error upper bound corresponding to sparse matches
:return: margin
"""
[docs] @abstractmethod
def get_epipolar_error_maximum_bias(self):
"""
Get epipolar error lower bound corresponding to sparse matches
:return: margin
"""
[docs] @abstractmethod
def get_match_filter_constant(self):
"""
Get get_match_filter_constant :
constant in the formula to compute threshold of outliers
:return: match_filter_constant
"""
[docs] @abstractmethod
def get_match_filter_mean_factor(self):
"""
Get match_filter_mean_factor :
factor of mean in the formula
to compute threshold of outliers
:return: match_filter_mean_factor
"""
[docs] @abstractmethod
def get_match_filter_knn(self):
"""
Get match_filter_knn :
number of neighboors used to measure isolation of matches
:return: match_filter_knn
"""
[docs] @abstractmethod
def get_match_filter_dev_factor(self):
"""
Get match_filter_dev_factor :
factor of deviation in the formula
to compute threshold of outliers
:return: match_filter_dev_factor
"""
[docs] @abstractmethod
def get_minimum_nb_matches(self):
"""
Get minimum_nb_matches :
get the minimum number of matches
:return: minimum_nb_matches
"""
[docs] def get_margins_fun(self, disp_min=None, disp_max=None, method="sift"):
"""
Get margins function to use in resampling
:param disp_min: disp min for info
:param disp_max: disp max for info
:param method: method for the margins
:return: margins function
:rtype: function generating xr.Dataset
"""
# Compute margins
corner = ["left", "up", "right", "down"]
data = np.zeros(len(corner))
col = np.arange(len(corner))
margins = xr.Dataset(
{"left_margin": (["col"], data)}, coords={"col": col}
)
margins["right_margin"] = xr.DataArray(data, dims=["col"])
left_margin = self.get_strip_margin()
if method == "sift":
right_margin = self.get_strip_margin() + int(
math.floor(
self.get_epipolar_error_upper_bound()
+ self.get_epipolar_error_maximum_bias()
)
)
else:
right_margin = left_margin
# Compute margins for left region
margins["left_margin"].data = [0, left_margin, 0, left_margin]
# Compute margins for right region
margins["right_margin"].data = [0, right_margin, 0, right_margin]
# add disp range info
margins.attrs["disp_min"] = disp_min
margins.attrs["disp_max"] = disp_max
logging.info(
"Margins added to left region for matching: {}".format(
margins["left_margin"].data
)
)
logging.info(
"Margins added to right region for matching: {}".format(
margins["right_margin"].data
)
)
def margins_wrapper( # pylint: disable=unused-argument
row_min, row_max, col_min, col_max
):
"""
Generates margins Dataset used in resampling
:param row_min: row min
:param row_max: row max
:param col_min: col min
:param col_max: col max
:return: margins
:rtype: xr.Dataset
"""
# Constant margins for all tiles
return margins
return margins_wrapper
[docs] def filter_matches(
self,
epipolar_matches_left,
grid_left,
grid_right,
geom_plugin,
orchestrator=None,
pair_key="pair_0",
pair_folder=None,
save_matches=False,
):
"""
Transform matches CarsDataset to numpy matches, and filters matches
:param cars_orchestrator: orchestrator
:param epipolar_matches_left: matches. CarsDataset contains:
- N x M Delayed tiles \
Each tile will be a future pandas DataFrame containing:
- data : (L, 4) shape matches
- attributes containing "disp_lower_bound", "disp_upper_bound", \
"elevation_delta_lower_bound","elevation_delta_upper_bound"
:type epipolar_matches_left: CarsDataset
:param grid_left: left epipolar grid
:type grid_left: CarsDataset
:param grid_right: right epipolar grid
:type grid_right: CarsDataset
:param save_matches: true is matches needs to be saved
:type save_matches: bool
:return filtered matches
:rtype: np.ndarray
"""
# Default orchestrator
if orchestrator is None:
# Create default sequential orchestrator for current application
# be awere, no out_json will be shared between orchestrators
# No files saved
cars_orchestrator = ocht.Orchestrator(
orchestrator_conf={"mode": "sequential"}
)
else:
cars_orchestrator = orchestrator
if pair_folder is None:
pair_folder = os.path.join(cars_orchestrator.out_dir, "tmp")
epipolar_error_upper_bound = self.get_epipolar_error_upper_bound()
epipolar_error_maximum_bias = self.get_epipolar_error_maximum_bias()
grid_left = RectificationGrid(
grid_left.attributes["path"],
interpolator=geom_plugin.interpolator,
)
grid_right = RectificationGrid(
grid_right.attributes["path"],
interpolator=geom_plugin.interpolator,
)
# Concatenated matches
list_matches = []
for row in range(epipolar_matches_left.shape[0]):
for col in range(epipolar_matches_left.shape[1]):
# CarsDataset containing Pandas DataFrame, not Delayed anymore
if epipolar_matches_left[row, col] is not None:
epipolar_matches = epipolar_matches_left[
row, col
].to_numpy()
sensor_matches = geom_plugin.matches_to_sensor_coords(
grid_left,
grid_right,
epipolar_matches,
cst.MATCHES_MODE,
)
sensor_matches = np.concatenate(sensor_matches, axis=1)
matches = np.concatenate(
[
epipolar_matches,
sensor_matches,
],
axis=1,
)
list_matches.append(matches)
matches = np.concatenate(list_matches)
raw_nb_matches = matches.shape[0]
logging.info(
"Raw number of matches found: {} matches".format(raw_nb_matches)
)
# Export matches
raw_matches_array_path = None
if save_matches:
safe_makedirs(pair_folder)
logging.info("Writing raw matches file")
raw_matches_array_path = os.path.join(
pair_folder, "raw_matches.npy"
)
np.save(raw_matches_array_path, matches)
# Filter matches that are out of margin
if epipolar_error_maximum_bias == 0:
epipolar_median_shift = 0
else:
epipolar_median_shift = np.median(matches[:, 3] - matches[:, 1])
matches = matches[
((matches[:, 3] - matches[:, 1]) - epipolar_median_shift)
>= -epipolar_error_upper_bound
]
matches = matches[
((matches[:, 3] - matches[:, 1]) - epipolar_median_shift)
<= epipolar_error_upper_bound
]
matches_discarded_message = (
"{} matches discarded because their epipolar error "
"is greater than --epipolar_error_upper_bound = {} pix"
).format(raw_nb_matches - matches.shape[0], epipolar_error_upper_bound)
if epipolar_error_maximum_bias != 0:
matches_discarded_message += (
" considering a shift of {} pix".format(epipolar_median_shift)
)
logging.info(matches_discarded_message)
filtered_matches_array_path = None
if save_matches:
logging.info("Writing filtered matches file")
filtered_matches_array_path = os.path.join(
pair_folder, "filtered_matches.npy"
)
np.save(filtered_matches_array_path, matches)
# Retrieve number of matches
nb_matches = matches.shape[0]
# Check if we have enough matches
# TODO: we could also make it a warning and continue
# with uncorrected grid
# and default disparity range
if nb_matches < self.get_minimum_nb_matches():
error_message_matches = (
"Insufficient amount of matches found ({} < {}), "
"can not safely estimate epipolar error correction "
" and disparity range".format(
nb_matches, self.get_minimum_nb_matches()
)
)
logging.error(error_message_matches)
raise ValueError(error_message_matches)
logging.info(
"Number of matches kept for epipolar "
"error correction: {} matches".format(nb_matches)
)
# Compute epipolar error
epipolar_error = matches[:, 1] - matches[:, 3]
epi_error_mean = np.mean(epipolar_error)
epi_error_std = np.std(epipolar_error)
epi_error_max = np.max(np.fabs(epipolar_error))
logging.info(
"Epipolar error before correction: mean = {:.3f} pix., "
"standard deviation = {:.3f} pix., max = {:.3f} pix.".format(
epi_error_mean,
epi_error_std,
epi_error_max,
)
)
# Update orchestrator out_json
raw_matches_infos = {
application_constants.APPLICATION_TAG: {
sm_cst.MATCH_FILTERING_TAG: {
pair_key: {
sm_cst.NUMBER_MATCHES_TAG: nb_matches,
sm_cst.RAW_NUMBER_MATCHES_TAG: raw_nb_matches,
sm_cst.BEFORE_CORRECTION_EPI_ERROR_MEAN: epi_error_mean,
sm_cst.BEFORE_CORRECTION_EPI_ERROR_STD: epi_error_std,
sm_cst.BEFORE_CORRECTION_EPI_ERROR_MAX: epi_error_max,
}
}
}
}
cars_orchestrator.update_out_info(raw_matches_infos)
return matches
[docs] @abstractmethod
def get_save_matches(self):
"""
Get save_matches parameter
:return: true is save_matches activated
:rtype: bool
"""
[docs] @abstractmethod
def run(self, epipolar_image_left, epipolar_image_right, **kwargs):
"""
Run Matching application.
Create left and right CarsDataset filled with pandas.DataFrame ,
corresponding to epipolar 2D disparities, on the same geometry
that epipolar_images_left and epipolar_images_right.
:param epipolar_image_left: tiled left epipolar
:type epipolar_image_left: CarsDataset
:param epipolar_image_right: tiled right epipolar
:type epipolar_image_right: CarsDataset
:param disp_to_alt_ratio: disp to alti ratio
:type disp_to_alt_ratio: float
:param orchestrator: orchestrator used
:param pair_folder: folder used for current pair
:type pair_folder: str
:param pair_key: pair key id
:type pair_key: str
:param mask1_ignored_by_sift: values used in left mask to ignore
in correlation
:type mask1_ignored_by_sift: list
:param mask2_ignored_by_sift: values used in right mask to ignore
in correlation
:type mask2_ignored_by_sift: list
:return left matches, right matches
:rtype: Tuple(CarsDataset, CarsDataset)
"""