#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Inputs module:
contains some CARS global shared general purpose inputs functions
"""
import logging
# Standard imports
import os
import warnings
from typing import Dict, Tuple
# Third party imports
import fiona
import numpy as np
import rasterio as rio
import xarray as xr
from affine import Affine
from json_checker import Checker
from pyproj import CRS
from rasterio.warp import Resampling, calculate_default_transform, reproject
from rasterio.windows import Window
from shapely.geometry import shape
# CARS imports
# Filter rasterio warning when image is not georeferenced
warnings.filterwarnings("ignore", category=rio.errors.NotGeoreferencedWarning)
[docs]
def read_vector(path_to_file):
"""
Read vector file and returns the corresponding polygon
:raise Exception when the input file is unreadable
:param path_to_file: path to the file to open
:type path_to_file: str
:return: a shapely polygon
:rtype: tuple (polygon, epsg)
"""
try:
polys = []
with fiona.open(path_to_file) as vec_file:
_, epsg = vec_file.crs["init"].split(":")
for feat in vec_file:
polys.append(shape(feat["geometry"]))
except BaseException as base_except:
raise FileNotFoundError(
"Impossible to read {} file".format(path_to_file)
) from base_except
if len(polys) == 1:
return polys[0], int(epsg)
if len(polys) > 1:
logging.info(
"Multi features files are not supported, "
"the first feature of {} will be used".format(path_to_file)
)
return polys[0], int(epsg)
logging.info("No feature is present in the {} file".format(path_to_file))
return None
[docs]
def rasterio_get_values(raster_file: str, x_list, y_list, proj_function):
"""
Get the z position of corresponding x and y as lon lat
:param raster_file: Image file
:param x_list: list of x position
:type x_list: np array
:param y_list: list of y position
:type y_list: np array
:param proj_function: projection function to use
:return: The corresponding z position
"""
with rio.open(raster_file, "r") as descriptor:
file_espg = descriptor.crs.to_epsg()
nodata_value = descriptor.nodata
# convert point to epsg
cloud_in = np.stack([x_list, y_list], axis=1)
cloud_out = proj_function(cloud_in, 4326, file_espg)
# get the transform and inverse
aff_tr = rasterio_get_transform(raster_file)
np_tr = np.array(
[
[aff_tr[0], aff_tr[1], aff_tr[2]],
[aff_tr[3], aff_tr[4], aff_tr[5]],
[0, 0, 1],
]
)
inv_tr = np.linalg.inv(np_tr)
# convert sensor to pixel coordinates
pix_pos = np.hstack([cloud_out, np.ones((len(cloud_out), 1))])
pix_pos = inv_tr @ pix_pos.T
pix_pos = pix_pos.T[:, [1, 0]].astype(
int
) # convention (row, col) i.e. (y, x)
# crop to dem bounds
ul_corner = np.array([0, 0])
lr_corner = np.array([descriptor.height, descriptor.width])
pix_pos_clipped = np.clip(pix_pos, ul_corner, lr_corner)
out_of_bounds_pix = np.any(pix_pos != pix_pos_clipped, axis=1)
pix_pos = pix_pos_clipped
# get the data needed
min_pt = pix_pos.min(axis=0)
max_pt = pix_pos.max(axis=0)
height = max_pt[0] - min_pt[0] + 1
width = max_pt[1] - min_pt[1] + 1
window = Window(min_pt[1], min_pt[0], width, height)
data = descriptor.read(1, window=window)
if data.size == 0:
return None
# read the data for all points
max_sampled_pos = np.array(data.shape)[:2] - 1
pix_pos -= min_pt
pix_pos[:, 0] = np.clip(pix_pos[:, 0], 0, max_sampled_pos[0])
pix_pos[:, 1] = np.clip(pix_pos[:, 1], 0, max_sampled_pos[1])
z_list = data[pix_pos[:, 0], pix_pos[:, 1]].astype(float)
if nodata_value is not None:
z_list[z_list == nodata_value] = np.nan
z_list[out_of_bounds_pix] = np.nan
return z_list
[docs]
def rasterio_get_nb_bands(raster_file: str) -> int:
"""
Get the number of bands in an image file
:param raster_file: Image file
:return: The number of bands
"""
with rio.open(raster_file, "r") as descriptor:
return descriptor.count
[docs]
def rasterio_get_classif_values(raster_file: str) -> int:
"""
Get the number of bands in an image file
:param raster_file: Image file
:return: The number of bands
"""
with rio.open(raster_file, "r") as descriptor:
max_value = int(descriptor.stats()[0].max)
if max_value <= 10:
logging.info("Max value of classif is {}")
values = list(range(max_value + 1))
logging.info("Classes are {}".format(values))
return values
logging.warning(
"Input classif has classes over 10"
"Classification file will be read to determine exact values"
)
array = descriptor.read()
values = list(map(int, np.unique(array)))
logging.info("Classes are {}".format(values))
return values
[docs]
def rasterio_get_image_type(raster_file: str) -> list:
"""
Get the image type
:param raster_file: Image file
:return: The image type
"""
image_types = None
with rio.open(raster_file, "r") as descriptor:
image_types = descriptor.dtypes
# Check if each color bands have the same type
image_type_set = set(image_types)
if len(image_type_set) > 1:
logging.warning("The image bands don't the same types.")
image_type = image_types[0]
return image_type
[docs]
def rasterio_get_nbits(raster_file):
"""
Get the band nbits list
:param raster_file: Image file
:return: The band nbits list
"""
nbits = []
with rio.open(raster_file, "r") as descriptor:
for bidx in range(1, descriptor.count + 1):
img_structurre_band = descriptor.tags(
ns="IMAGE_STRUCTURE", bidx=bidx
)
if "NBITS" in img_structurre_band:
nbits.append(int(img_structurre_band["NBITS"]))
return nbits
[docs]
def rasterio_get_size(raster_file: str) -> Tuple[int, int]:
"""
Get the size of an image (file)
:param raster_file: Image file
:return: The size (width, height)
"""
with rio.open(raster_file, "r") as descriptor:
return (descriptor.width, descriptor.height)
[docs]
def rasterio_get_nodata(raster_file: str) -> Tuple[int, int]:
"""
Get the no data value
:param raster_file: Image file
:return: the no data value
"""
with rio.open(raster_file, "r") as descriptor:
return descriptor.nodata
[docs]
def rasterio_get_dtype(raster_file: str) -> Tuple[int, int]:
"""
Get the dtype of an image (file)
:param raster_file: Image file
:return: The dtype
"""
with rio.open(raster_file, "r") as descriptor:
return descriptor.dtypes[0]
[docs]
def rasterio_get_pixel_points(raster_file: str, terrain_points) -> list:
"""
Get pixel point coordinates of terrain points
:param raster_file: Image file
:param terrain_points: points in terrain
:return: pixel points
"""
pixel_points = []
for row in range(terrain_points.shape[0]):
pixel_points.append(
rio.transform.rowcol(
rasterio_get_transform(raster_file),
terrain_points[row, 0],
terrain_points[row, 1],
)
)
return np.array(pixel_points)
[docs]
def rasterio_get_resolution(raster_file: str) -> Tuple[float, float]:
"""
Get the resolution of raster_file
:param raster_file: Image file
:return: The resolution (res_x, res_y)
:rtype: tuple
"""
transform = list(rasterio_get_transform(raster_file))
res_x = transform[0]
res_y = transform[4]
return (abs(res_x), abs(res_y))
[docs]
def rasterio_get_bounds(
raster_file: str, apply_resolution_sign=False
) -> Tuple[int, int]:
"""
Get the bounds of an image (file)
:param raster_file: Image file
:return: The size (width, height)
"""
# get sign of resolution
if apply_resolution_sign:
transform = list(rasterio_get_transform(raster_file))
res_x = transform[0]
res_y = transform[4]
res_x /= abs(res_x)
res_y /= abs(res_y)
res_signs = np.array([res_x, res_y, res_x, res_y])
else:
res_signs = np.array([1, 1, 1, 1])
with rio.open(raster_file, "r") as descriptor:
return np.array(list(descriptor.bounds)) * res_signs
[docs]
def rasterio_get_epsg_code(
raster_file: str,
) -> Tuple[int, int]:
"""
Get the epsg code of an image (file)
:param raster_file: Image file
:return: epsg code
"""
with rio.open(raster_file, "r") as descriptor:
epsg_code = descriptor.crs
return epsg_code
[docs]
def rasterio_get_list_min_max(raster_file: str) -> Tuple[int, int]:
"""
Get the stats of an image (file)
:param raster_file: Image file
:return: The list min max
"""
min_list = []
max_list = []
with rio.open(raster_file, "r") as descriptor:
for k in range(1, descriptor.count + 1):
stat = descriptor.statistics(k)
min_list.append(stat.min)
max_list.append(stat.max)
return min_list, max_list
[docs]
def rasterio_read_as_array(
raster_file: str, window: rio.windows.Window = None
) -> Tuple[np.ndarray, dict]:
"""
Get the data of an image file, and its profile
:param raster_file: Image file
:param window: Window to get data from
:return: The array, its profile
"""
with rio.open(raster_file, "r") as descriptor:
if descriptor.count == 1:
data = descriptor.read(1, window=window)
else:
data = descriptor.read(window=window)
return data, descriptor.profile
[docs]
def rasterio_get_profile(raster_file: str) -> Dict:
"""
Get the profile of an image file
:param raster_file: Image file
:return: The profile of the given image
"""
with rio.open(raster_file, "r") as descriptor:
return descriptor.profile
[docs]
def rasterio_get_epsg(raster_file: str) -> int:
"""
Get the epsg of an image file
:param raster_file: Image file
:return: The epsg of the given image
"""
epsg = None
with rio.open(raster_file, "r") as descriptor:
epsg = descriptor.crs.to_epsg()
return epsg
[docs]
def rasterio_get_crs(raster_file: str) -> CRS:
"""
Get the crs of an image file
:param raster_file: Image file
:return: The crs of the given image
"""
crs = None
with rio.open(raster_file, "r") as descriptor:
crs = descriptor.crs
return crs
[docs]
def rasterio_can_open(raster_file: str) -> bool:
"""
Test if a file can be open by rasterio
:param raster_file: File to test
:return: True if rasterio can open file and False otherwise
"""
try:
rio.open(raster_file)
return True
except Exception as read_error:
logging.warning(
"Impossible to read file {}: {}".format(raster_file, read_error)
)
return False
[docs]
def ncdf_can_open(file_path):
"""
Checks if the given file can be opened by NetCDF
:param file_path: file path.
:type file_path: str
:return: True if it can be opened, False otherwise.
:rtype: bool
"""
try:
with xr.open_dataset(file_path) as _:
return True
except Exception as read_error:
logging.warning(
"Exception caught while trying to read file {}: {}".format(
file_path, read_error
)
)
return False
[docs]
def check_json(conf, schema):
"""
Check a dictionary with respect to a schema
:param conf: The dictionary to check
:type conf: dict
:param schema: The schema to use
:type schema: dict
:return: conf if check succeeds (else raises CheckerError)
:rtype: dict
"""
schema_validator = Checker(schema)
checked_conf = schema_validator.validate(conf)
return checked_conf
[docs]
def get_descriptions_bands(sensor) -> Dict:
"""
Get the descriptions bands of an image file
:param raster_file: Image file
:type sensor: str or dict
:return: The descriptions list of the given image
"""
if isinstance(sensor, str):
with rio.open(sensor, "r") as descriptor:
return descriptor.descriptions
elif isinstance(sensor, dict):
if "values" in sensor:
return list(map(str, sensor["values"]))
raise RuntimeError("Sensor {} cannot be read".format(sensor))
raise TypeError("Sensor {} is not str or dict".format(sensor))