#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Grids module:
contains functions used for epipolar grid correction
"""
# Standard imports
from __future__ import absolute_import
import logging
import os
# Third party imports
import numpy as np
import pandas
from scipy.interpolate import LinearNDInterpolator
from scipy.spatial import Delaunay # pylint: disable=E0611
import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.grid_generation import grid_constants, grids
from cars.core.utils import safe_makedirs
# CARS imports
from cars.data_structures import cars_dataset
from cars.orchestrator.cluster.log_wrapper import cars_profile
@cars_profile(name="Correct grid from 1d")
def correct_grid_from_1d(
grid, grid_correction_coef, save_grid=False, pair_folder=None
):
"""
Correct grid from correction given in 1d
:param grid: grid to correct
:type grid: CarsDataset
:param grid_correction_coef: grid correction to apply
:type grid_correction_coef: list(float), size 6
:param save_grid: if True grids are saved in root or pair_folder
:type save_grid: bool
:param pair_folder: directory where grids are saved
:type pair_folder: str
"""
coefs_x = grid_correction_coef[:3]
coefs_x.append(0.0)
coefs_y = grid_correction_coef[3:6]
coefs_y.append(0.0)
grid_correction_coef = (
np.array(coefs_x).reshape((2, 2)),
np.array(coefs_y).reshape((2, 2)),
)
# Correct grid right with provided epipolar a priori
corrected_grid_right = correct_grid(
grid, grid_correction_coef, save_grid, pair_folder
)
return corrected_grid_right
@cars_profile(name="Correct grid")
def correct_grid(grid, grid_correction, save_grid=None, pair_folder=None):
"""
Correct grid
:param grid: grid to correct
:type grid: CarsDataset
:param grid_correction: grid correction to apply
:type grid_correction: Tuple(np.ndarray, np.ndarray)
(coefsx_2d, coefsy_2d) , each of size (2,2)
:param save_grid: if True grids are saved in root or pair_folder
:type save_grid: bool
:param pair_folder: directory where grids are saved
:type pair_folder: str
"""
coefsx_2d, coefsy_2d = grid_correction
right_grid = np.copy(grid[0, 0])
origin = grid.attributes["grid_origin"]
spacing = grid.attributes["grid_spacing"]
# Form 3D array with grid positions
x_values_1d = np.linspace(
origin[0],
origin[0] + right_grid.shape[0] * spacing[0],
right_grid.shape[0],
)
y_values_1d = np.linspace(
origin[1],
origin[1] + right_grid.shape[1] * spacing[1],
right_grid.shape[1],
)
x_values_2d, y_values_2d = np.meshgrid(y_values_1d, x_values_1d)
# Compute corresponding point in sensor geometry (grid encodes (x_sensor -
# x_epi,y_sensor - y__epi)
source_points = right_grid
# Interpolate the regression model at grid position
correction_grid_x = np.polynomial.polynomial.polyval2d(
x_values_2d, y_values_2d, coefsx_2d
)
correction_grid_y = np.polynomial.polynomial.polyval2d(
x_values_2d, y_values_2d, coefsy_2d
)
# Compute corrected grid
corrected_grid_x = source_points[:, :, 0] - correction_grid_x
corrected_grid_y = source_points[:, :, 1] - correction_grid_y
corrected_right_grid = np.stack(
(corrected_grid_x, corrected_grid_y), axis=2
)
# create new cars ds
corrected_grid_right = cars_dataset.CarsDataset("arrays")
corrected_grid_right.attributes = grid.attributes.copy()
corrected_grid_right.tiling_grid = grid.tiling_grid
corrected_grid_right[0, 0] = corrected_right_grid
# Dump corrected grid
grid_origin = grid.attributes["grid_origin"]
grid_spacing = grid.attributes["grid_spacing"]
# Get save folder (permanent or temporay according to save_grid parameter)
if None in (pair_folder, save_grid):
# Set path to None
corrected_grid_right.attributes["path"] = None
else:
if save_grid:
safe_makedirs(pair_folder)
save_folder = os.path.join(
pair_folder, "corrected_right_epi_grid.tif"
)
else:
safe_makedirs(os.path.join(pair_folder, "tmp"))
save_folder = os.path.join(
pair_folder, "tmp", "corrected_right_epi_grid.tif"
)
grids.write_grid(
corrected_grid_right[0, 0], save_folder, grid_origin, grid_spacing
)
corrected_grid_right.attributes["path"] = save_folder
return corrected_grid_right
@cars_profile(name="Grid correction estimation")
def estimate_right_grid_correction(
matches,
grid_right,
initial_cars_ds=None,
save_matches=False,
minimum_nb_matches=100,
pair_folder="",
pair_key="pair_0",
orchestrator=None,
):
"""
Estimates grid correction, and correct matches
:param matches: matches
:type matches: np.ndarray
:param grid_right: grid to correct
:type grid_right: CarsDataset
:param save_matches: true is matches needs to be saved
:type save_matches: bool
:param minimum_nb_matches: minimum number of matches required
:type minimum_nb_matches: int
:param pair_folder: folder used for current pair
:type pair_folder: str
:return: grid_correction to apply, corrected_matches, info before,
info after
:rtype: Tuple(np.ndarray, np.ndarray) , np.ndarray, dict, dict
grid_correction is : (coefsx_2d, coefsy_2d) , each of size (2,2)
"""
# Default orchestrator
if orchestrator is None:
# Create default sequential orchestrator for current application
# be awere, no out_json will be shared between orchestrators
# No files saved
cars_orchestrator = ocht.Orchestrator(
orchestrator_conf={"mode": "sequential"}
)
else:
cars_orchestrator = orchestrator
if matches.shape[0] < minimum_nb_matches:
logging.error(
"Insufficient amount of matches found"
", can not safely estimate epipolar error correction"
)
raise ValueError(
f"Insufficient amount of matches found (< {minimum_nb_matches})"
", can not safely estimate epipolar error correction"
)
# Get grids attributes
right_grid = np.copy(grid_right[0, 0])
origin = grid_right.attributes["grid_origin"]
spacing = grid_right.attributes["grid_spacing"]
# Form 3D array with grid positions
x_values_1d = np.arange(
origin[0],
origin[0] + right_grid.shape[0] * spacing[0],
spacing[0],
)
y_values_1d = np.arange(
origin[1],
origin[1] + right_grid.shape[1] * spacing[1],
spacing[1],
)
x_values_2d, y_values_2d = np.meshgrid(y_values_1d, x_values_1d)
# Compute corresponding point in sensor geometry (grid encodes (x_sensor -
# x_epi,y_sensor - y__epi)
source_points = right_grid
# Extract matches for convenience
matches_y1 = matches[:, 1]
matches_x2 = matches[:, 2]
matches_y2 = matches[:, 3]
# Map real matches to sensor geometry
points = np.column_stack((np.ravel(x_values_2d), np.ravel(y_values_2d)))
triangulation = Delaunay(points)
values = np.ravel(source_points[:, :, 0])
interpolator = LinearNDInterpolator(triangulation, values)
sensor_matches_raw_x = interpolator(matches_x2, matches_y2)
# Simulate matches that have no epipolar error (i.e. y2 == y1) and map
# them to sensor geometry
sensor_matches_perfect_x = interpolator(matches_x2, matches_y1)
values = np.ravel(source_points[:, :, 1])
interpolator = LinearNDInterpolator(triangulation, values)
sensor_matches_raw_y = interpolator(matches_x2, matches_y2)
sensor_matches_perfect_y = interpolator(matches_x2, matches_y1)
# Compute epipolar error in sensor geometry in both direction
epipolar_error_x = sensor_matches_perfect_x - sensor_matches_raw_x
epipolar_error_y = sensor_matches_perfect_y - sensor_matches_raw_y
# Output epipolar error stats for monitoring
mean_epipolar_error = [np.mean(epipolar_error_x), np.mean(epipolar_error_y)]
median_epipolar_error = [
np.median(epipolar_error_x),
np.median(epipolar_error_y),
]
std_epipolar_error = [np.std(epipolar_error_x), np.std(epipolar_error_y)]
rms_epipolar_error = np.mean(
np.sqrt(
epipolar_error_x * epipolar_error_x
+ epipolar_error_y * epipolar_error_y
)
)
rmsd_epipolar_error = np.std(
np.sqrt(
epipolar_error_x * epipolar_error_x
+ epipolar_error_y * epipolar_error_y
)
)
in_stats = {
"mean_epipolar_error": mean_epipolar_error,
"median_epipolar_error": median_epipolar_error,
"std_epipolar_error": std_epipolar_error,
"rms_epipolar_error": rms_epipolar_error,
"rmsd_epipolar_error": rmsd_epipolar_error,
}
logging.debug(
"Epipolar error before correction: \n"
"x = {:.3f} +/- {:.3f} pixels \n"
"y = {:.3f} +/- {:.3f} pixels \n"
"rmse = {:.3f} +/- {:.3f} pixels \n"
"medianx = {:.3f} pixels \n"
"mediany = {:.3f} pixels".format(
mean_epipolar_error[0],
std_epipolar_error[0],
mean_epipolar_error[1],
std_epipolar_error[1],
rms_epipolar_error,
rmsd_epipolar_error,
median_epipolar_error[0],
median_epipolar_error[1],
)
)
# Perform bilinear regression for both component of epipolar error
nan_mask = np.logical_and(
~np.isnan(epipolar_error_x), ~np.isnan(epipolar_error_y)
)
lstsq_input = np.array(
[
matches_x2[nan_mask] * 0 + 1,
matches_x2[nan_mask],
matches_y2[nan_mask],
]
).T
coefsx, residx, __, __ = np.linalg.lstsq(
lstsq_input, epipolar_error_x[nan_mask], rcond=None
)
coefsy, residy, __, __ = np.linalg.lstsq(
lstsq_input, epipolar_error_y[nan_mask], rcond=None
)
# Normalize residuals by number of matches
rmsex = np.sqrt(residx / matches.shape[0])
rmsey = np.sqrt(residy / matches.shape[1])
logging.debug(
"Root Mean Square Error of correction estimation:"
"rmsex={} pixels, rmsey={} pixels".format(rmsex, rmsey)
)
# Reshape coefs to 2D (expected by np.polynomial.polyval2d)
coefsx_2d = np.ndarray((2, 2))
coefsx_2d[0, 0] = coefsx[0]
coefsx_2d[1, 0] = coefsx[1]
coefsx_2d[0, 1] = coefsx[2]
coefsx_2d[1, 1] = 0.0
coefsy_2d = np.ndarray((2, 2))
coefsy_2d[0, 0] = coefsy[0]
coefsy_2d[1, 0] = coefsy[1]
coefsy_2d[0, 1] = coefsy[2]
coefsy_2d[1, 1] = 0.0
grid_correction = (coefsx_2d, coefsy_2d)
# Map corrected matches to sensor geometry
sensor_matches_corrected_x = (
sensor_matches_raw_x
+ np.polynomial.polynomial.polyval2d(matches_x2, matches_y2, coefsx_2d)
)
sensor_matches_corrected_y = (
sensor_matches_raw_y
+ np.polynomial.polynomial.polyval2d(matches_x2, matches_y2, coefsy_2d)
)
# Map corrected matches to epipolar geometry
points = np.column_stack(
(np.ravel(source_points[:, :, 0]), np.ravel(source_points[:, :, 1]))
)
triangulation = Delaunay(points)
values = np.ravel(x_values_2d)
interpolator = LinearNDInterpolator(triangulation, values)
epipolar_matches_corrected_x = interpolator(
sensor_matches_corrected_x, sensor_matches_corrected_y
)
values = np.ravel(y_values_2d)
interpolator = LinearNDInterpolator(triangulation, values)
epipolar_matches_corrected_y = interpolator(
sensor_matches_corrected_x, sensor_matches_corrected_y
)
corrected_matches = np.copy(matches)
corrected_matches[:, 2] = epipolar_matches_corrected_x
corrected_matches[:, 3] = epipolar_matches_corrected_y
# Compute epipolar error in sensor geometry in both direction after
# correction
corrected_epipolar_error_x = (
sensor_matches_perfect_x - sensor_matches_corrected_x
)
corrected_epipolar_error_y = (
sensor_matches_perfect_y - sensor_matches_corrected_y
)
# Output corrected epipolar error stats for monitoring
mean_corrected_epipolar_error = [
np.mean(corrected_epipolar_error_x),
np.mean(corrected_epipolar_error_y),
]
median_corrected_epipolar_error = [
np.median(corrected_epipolar_error_x),
np.median(corrected_epipolar_error_y),
]
std_corrected_epipolar_error = [
np.std(corrected_epipolar_error_x),
np.std(corrected_epipolar_error_y),
]
rms_corrected_epipolar_error = np.mean(
np.sqrt(
corrected_epipolar_error_x * corrected_epipolar_error_x
+ corrected_epipolar_error_y * corrected_epipolar_error_y
)
)
rmsd_corrected_epipolar_error = np.std(
np.sqrt(
corrected_epipolar_error_x * corrected_epipolar_error_x
+ corrected_epipolar_error_y * corrected_epipolar_error_y
)
)
out_stats = {
"mean_epipolar_error": mean_corrected_epipolar_error,
"median_epipolar_error": median_corrected_epipolar_error,
"std_epipolar_error": std_corrected_epipolar_error,
"rms_epipolar_error": rms_corrected_epipolar_error,
"rmsd_epipolar_error": rmsd_corrected_epipolar_error,
}
logging.debug(
"Epipolar error after correction: \n"
"x = {:.3f} +/- {:.3f} pixels \n"
"y = {:.3f} +/- {:.3f} pixels \n"
"rmse = {:.3f} +/- {:.3f} pixels \n"
"medianx = {:.3f} pixels \n"
"mediany = {:.3f} pixels".format(
mean_corrected_epipolar_error[0],
std_corrected_epipolar_error[0],
mean_corrected_epipolar_error[1],
std_corrected_epipolar_error[1],
rms_corrected_epipolar_error,
rmsd_corrected_epipolar_error,
median_corrected_epipolar_error[0],
median_corrected_epipolar_error[1],
)
)
corrected_epipolar_error = corrected_matches[:, 1] - corrected_matches[:, 3]
logging.info(
"Epipolar error after correction: mean = {:.3f} pix., "
"standard deviation = {:.3f} pix., max = {:.3f} pix.".format(
np.mean(corrected_epipolar_error),
np.std(corrected_epipolar_error),
np.max(np.fabs(corrected_epipolar_error)),
)
)
# Export filtered matches
matches_array_path = None
current_out_dir = None
if save_matches:
logging.info("Writing matches file")
if pair_folder is None:
logging.error("Pair folder not provided")
else:
safe_makedirs(pair_folder)
current_out_dir = pair_folder
matches_array_path = os.path.join(
current_out_dir, "corrected_filtered_matches.npy"
)
np.save(matches_array_path, corrected_matches)
# Create CarsDataset containing corrected matches, with same tiling as input
corrected_matches_cars_ds = None
if initial_cars_ds is not None:
corrected_matches_cars_ds = create_matches_cars_ds(
corrected_matches, initial_cars_ds
)
# Update orchestrator out_json
corrected_matches_infos = {
application_constants.APPLICATION_TAG: {
grid_constants.GRID_CORRECTION_TAG: {pair_key: {}}
}
}
cars_orchestrator.update_out_info(corrected_matches_infos)
return (
grid_correction,
corrected_matches,
corrected_matches_cars_ds,
in_stats,
out_stats,
)
[docs]def create_matches_cars_ds(corrected_matches, initial_cars_ds):
"""
Create CarsDataset representing matches, from numpy matches.
Matches are split into tiles, stored in pandas DataFrames
Right CarsDataset is filled with Nones
:param corrected_matches: matches
:type corrected_matches: numpy array
:param initial_cars_ds: cars dataset to use tiling from
:type initial_cars_ds: CarsDataset
:return new_matches_cars_ds
:rtype: CarsDataset
"""
# initialize CarsDataset
new_matches_cars_ds = cars_dataset.CarsDataset("points")
new_matches_cars_ds.create_empty_copy(initial_cars_ds)
new_matches_cars_ds.attributes = initial_cars_ds.attributes
for row in range(new_matches_cars_ds.shape[0]):
for col in range(new_matches_cars_ds.shape[1]):
[
row_min,
row_max,
col_min,
col_max,
] = new_matches_cars_ds.tiling_grid[row, col, :]
# Get corresponding matches
tile_matches = corrected_matches[corrected_matches[:, 1] > row_min]
tile_matches = tile_matches[tile_matches[:, 1] < row_max]
tile_matches = tile_matches[tile_matches[:, 0] > col_min]
tile_matches = tile_matches[tile_matches[:, 0] < col_max]
# Create pandas DataFrame
new_matches_cars_ds[row, col] = pandas.DataFrame(tile_matches)
return new_matches_cars_ds