Source code for cars.core.inputs

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Inputs module:
contains some CARS global shared general purpose inputs functions
"""

import logging

# Standard imports
import os
import warnings
from typing import Dict, Tuple

# Third party imports
import fiona
import numpy as np
import rasterio as rio
import xarray as xr
from affine import Affine
from json_checker import Checker
from pyproj import CRS
from rasterio.warp import Resampling, calculate_default_transform, reproject
from rasterio.windows import Window
from shapely.geometry import shape

# CARS imports


# Filter rasterio warning when image is not georeferenced
warnings.filterwarnings("ignore", category=rio.errors.NotGeoreferencedWarning)


[docs] def read_vector(path_to_file): """ Read vector file and returns the corresponding polygon :raise Exception when the input file is unreadable :param path_to_file: path to the file to open :type path_to_file: str :return: a shapely polygon :rtype: tuple (polygon, epsg) """ try: polys = [] with fiona.open(path_to_file) as vec_file: _, epsg = vec_file.crs["init"].split(":") for feat in vec_file: polys.append(shape(feat["geometry"])) except BaseException as base_except: raise FileNotFoundError( "Impossible to read {} file".format(path_to_file) ) from base_except if len(polys) == 1: return polys[0], int(epsg) if len(polys) > 1: logging.info( "Multi features files are not supported, " "the first feature of {} will be used".format(path_to_file) ) return polys[0], int(epsg) logging.info("No feature is present in the {} file".format(path_to_file)) return None
[docs] def rasterio_get_values(raster_file: str, x_list, y_list, proj_function): """ Get the z position of corresponding x and y as lon lat :param raster_file: Image file :param x_list: list of x position :type x_list: np array :param y_list: list of y position :type y_list: np array :param proj_function: projection function to use :return: The corresponding z position """ with rio.open(raster_file, "r") as descriptor: file_espg = descriptor.crs.to_epsg() nodata_value = descriptor.nodata # convert point to epsg cloud_in = np.stack([x_list, y_list], axis=1) cloud_out = proj_function(cloud_in, 4326, file_espg) # get the transform and inverse aff_tr = rasterio_get_transform(raster_file) np_tr = np.array( [ [aff_tr[0], aff_tr[1], aff_tr[2]], [aff_tr[3], aff_tr[4], aff_tr[5]], [0, 0, 1], ] ) inv_tr = np.linalg.inv(np_tr) # convert sensor to pixel coordinates pix_pos = np.hstack([cloud_out, np.ones((len(cloud_out), 1))]) pix_pos = inv_tr @ pix_pos.T pix_pos = pix_pos.T[:, [1, 0]].astype( int ) # convention (row, col) i.e. (y, x) # crop to dem bounds ul_corner = np.array([0, 0]) lr_corner = np.array([descriptor.height, descriptor.width]) pix_pos_clipped = np.clip(pix_pos, ul_corner, lr_corner) out_of_bounds_pix = np.any(pix_pos != pix_pos_clipped, axis=1) pix_pos = pix_pos_clipped # get the data needed min_pt = pix_pos.min(axis=0) max_pt = pix_pos.max(axis=0) height = max_pt[0] - min_pt[0] + 1 width = max_pt[1] - min_pt[1] + 1 window = Window(min_pt[1], min_pt[0], width, height) data = descriptor.read(1, window=window) if data.size == 0: return None # read the data for all points max_sampled_pos = np.array(data.shape)[:2] - 1 pix_pos -= min_pt pix_pos[:, 0] = np.clip(pix_pos[:, 0], 0, max_sampled_pos[0]) pix_pos[:, 1] = np.clip(pix_pos[:, 1], 0, max_sampled_pos[1]) z_list = data[pix_pos[:, 0], pix_pos[:, 1]].astype(float) if nodata_value is not None: z_list[z_list == nodata_value] = np.nan z_list[out_of_bounds_pix] = np.nan return z_list
[docs] def rasterio_get_nb_bands(raster_file: str) -> int: """ Get the number of bands in an image file :param raster_file: Image file :return: The number of bands """ with rio.open(raster_file, "r") as descriptor: return descriptor.count
[docs] def rasterio_get_classif_values(raster_file: str) -> int: """ Get the number of bands in an image file :param raster_file: Image file :return: The number of bands """ with rio.open(raster_file, "r") as descriptor: max_value = int(descriptor.stats()[0].max) if max_value <= 10: logging.info("Max value of classif is {}") values = list(range(max_value + 1)) logging.info("Classes are {}".format(values)) return values logging.warning( "Input classif has classes over 10" "Classification file will be read to determine exact values" ) array = descriptor.read() values = list(map(int, np.unique(array))) logging.info("Classes are {}".format(values)) return values
[docs] def rasterio_get_tags(raster_file: str) -> dict: """ Get the tags in an image file :param raster_file: Image file :return: The metadata """ with rio.open(raster_file, "r") as descriptor: return descriptor.tags()
[docs] def rasterio_get_image_type(raster_file: str) -> list: """ Get the image type :param raster_file: Image file :return: The image type """ image_types = None with rio.open(raster_file, "r") as descriptor: image_types = descriptor.dtypes # Check if each color bands have the same type image_type_set = set(image_types) if len(image_type_set) > 1: logging.warning("The image bands don't the same types.") image_type = image_types[0] return image_type
[docs] def rasterio_get_nbits(raster_file): """ Get the band nbits list :param raster_file: Image file :return: The band nbits list """ nbits = [] with rio.open(raster_file, "r") as descriptor: for bidx in range(1, descriptor.count + 1): img_structurre_band = descriptor.tags( ns="IMAGE_STRUCTURE", bidx=bidx ) if "NBITS" in img_structurre_band: nbits.append(int(img_structurre_band["NBITS"])) return nbits
[docs] def rasterio_get_size(raster_file: str) -> Tuple[int, int]: """ Get the size of an image (file) :param raster_file: Image file :return: The size (width, height) """ with rio.open(raster_file, "r") as descriptor: return (descriptor.width, descriptor.height)
[docs] def rasterio_get_nodata(raster_file: str) -> Tuple[int, int]: """ Get the no data value :param raster_file: Image file :return: the no data value """ with rio.open(raster_file, "r") as descriptor: return descriptor.nodata
[docs] def rasterio_get_dtype(raster_file: str) -> Tuple[int, int]: """ Get the dtype of an image (file) :param raster_file: Image file :return: The dtype """ with rio.open(raster_file, "r") as descriptor: return descriptor.dtypes[0]
[docs] def rasterio_get_pixel_points(raster_file: str, terrain_points) -> list: """ Get pixel point coordinates of terrain points :param raster_file: Image file :param terrain_points: points in terrain :return: pixel points """ pixel_points = [] for row in range(terrain_points.shape[0]): pixel_points.append( rio.transform.rowcol( rasterio_get_transform(raster_file), terrain_points[row, 0], terrain_points[row, 1], ) ) return np.array(pixel_points)
[docs] def rasterio_get_resolution(raster_file: str) -> Tuple[float, float]: """ Get the resolution of raster_file :param raster_file: Image file :return: The resolution (res_x, res_y) :rtype: tuple """ transform = list(rasterio_get_transform(raster_file)) res_x = transform[0] res_y = transform[4] return (abs(res_x), abs(res_y))
[docs] def rasterio_get_bounds( raster_file: str, apply_resolution_sign=False ) -> Tuple[int, int]: """ Get the bounds of an image (file) :param raster_file: Image file :return: The size (width, height) """ # get sign of resolution if apply_resolution_sign: transform = list(rasterio_get_transform(raster_file)) res_x = transform[0] res_y = transform[4] res_x /= abs(res_x) res_y /= abs(res_y) res_signs = np.array([res_x, res_y, res_x, res_y]) else: res_signs = np.array([1, 1, 1, 1]) with rio.open(raster_file, "r") as descriptor: return np.array(list(descriptor.bounds)) * res_signs
[docs] def rasterio_get_epsg_code( raster_file: str, ) -> Tuple[int, int]: """ Get the epsg code of an image (file) :param raster_file: Image file :return: epsg code """ with rio.open(raster_file, "r") as descriptor: epsg_code = descriptor.crs return epsg_code
[docs] def rasterio_get_list_min_max(raster_file: str) -> Tuple[int, int]: """ Get the stats of an image (file) :param raster_file: Image file :return: The list min max """ min_list = [] max_list = [] with rio.open(raster_file, "r") as descriptor: for k in range(1, descriptor.count + 1): stat = descriptor.statistics(k) min_list.append(stat.min) max_list.append(stat.max) return min_list, max_list
[docs] def rasterio_read_as_array( raster_file: str, window: rio.windows.Window = None ) -> Tuple[np.ndarray, dict]: """ Get the data of an image file, and its profile :param raster_file: Image file :param window: Window to get data from :return: The array, its profile """ with rio.open(raster_file, "r") as descriptor: if descriptor.count == 1: data = descriptor.read(1, window=window) else: data = descriptor.read(window=window) return data, descriptor.profile
[docs] def rasterio_get_profile(raster_file: str) -> Dict: """ Get the profile of an image file :param raster_file: Image file :return: The profile of the given image """ with rio.open(raster_file, "r") as descriptor: return descriptor.profile
[docs] def rasterio_get_transform(raster_file: str, convention: str = None) -> Dict: """ Get the transform of an image file :param raster_file: Image file :param convention: The convention to follow: None, "north" or "south" :return: The transform of the given image """ with rio.open(raster_file, "r") as dsc: src_tr = dsc.transform if convention == "north" and src_tr.e < 0: src_tr = Affine( src_tr.a, src_tr.b, src_tr.c, -src_tr.d, -src_tr.e, -src_tr.f ) elif convention == "south" and src_tr.e > 0: src_tr = Affine( src_tr.a, src_tr.b, src_tr.c, -src_tr.d, -src_tr.e, -src_tr.f ) return src_tr
[docs] def rasterio_get_epsg(raster_file: str) -> int: """ Get the epsg of an image file :param raster_file: Image file :return: The epsg of the given image """ epsg = None with rio.open(raster_file, "r") as descriptor: epsg = descriptor.crs.to_epsg() return epsg
[docs] def rasterio_get_crs(raster_file: str) -> CRS: """ Get the crs of an image file :param raster_file: Image file :return: The crs of the given image """ crs = None with rio.open(raster_file, "r") as descriptor: crs = descriptor.crs return crs
[docs] def rasterio_transform_epsg(file_name, new_epsg): """ Modify epsg of raster file :param file_name: Image file :param new_epsg: new epsg """ reprojected_file_name = file_name + "_reprojected.tif" # Create reprojected copy with rio.open(file_name) as src: transform, width, height = calculate_default_transform( src.crs, new_epsg, src.width, src.height, *src.bounds ) kwargs = src.meta.copy() kwargs.update( { "crs": new_epsg, "transform": transform, "width": width, "height": height, } ) with rio.open(reprojected_file_name, "w", **kwargs) as dst: for i in range(1, src.count + 1): reproject( source=rio.band(src, i), destination=rio.band(dst, i), src_transform=rasterio_get_transform(file_name), src_crs=src.crs, dst_transform=transform, dst_crs=new_epsg, resampling=Resampling.nearest, ) # Replace file with the reprojected one os.rename(reprojected_file_name, file_name)
[docs] def rasterio_can_open(raster_file: str) -> bool: """ Test if a file can be open by rasterio :param raster_file: File to test :return: True if rasterio can open file and False otherwise """ try: rio.open(raster_file) return True except Exception as read_error: logging.warning( "Impossible to read file {}: {}".format(raster_file, read_error) ) return False
[docs] def ncdf_can_open(file_path): """ Checks if the given file can be opened by NetCDF :param file_path: file path. :type file_path: str :return: True if it can be opened, False otherwise. :rtype: bool """ try: with xr.open_dataset(file_path) as _: return True except Exception as read_error: logging.warning( "Exception caught while trying to read file {}: {}".format( file_path, read_error ) ) return False
[docs] def check_json(conf, schema): """ Check a dictionary with respect to a schema :param conf: The dictionary to check :type conf: dict :param schema: The schema to use :type schema: dict :return: conf if check succeeds (else raises CheckerError) :rtype: dict """ schema_validator = Checker(schema) checked_conf = schema_validator.validate(conf) return checked_conf
[docs] def get_descriptions_bands(sensor) -> Dict: """ Get the descriptions bands of an image file :param raster_file: Image file :type sensor: str or dict :return: The descriptions list of the given image """ if isinstance(sensor, str): with rio.open(sensor, "r") as descriptor: return descriptor.descriptions elif isinstance(sensor, dict): if "values" in sensor: return list(map(str, sensor["values"])) raise RuntimeError("Sensor {} cannot be read".format(sensor)) raise TypeError("Sensor {} is not str or dict".format(sensor))