Source code for cars.applications.hole_detection.cloud_to_bbox

# !/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
CARS holes detection module init file
"""

import logging

# Standard imports
import os

# Third party imports
from json_checker import Checker, OptionalKey

import cars.orchestrator.orchestrator as ocht
from cars.applications import application_constants
from cars.applications.hole_detection import hole_detection_tools
from cars.applications.hole_detection.hole_detection import HoleDetection

# CARS imports
from cars.core.utils import safe_makedirs
from cars.data_structures import cars_dataset, cars_dict


[docs]class CloudToBbox( HoleDetection, short_name="cloud_to_bbox" ): # pylint: disable=R0903 """ CloudToBbox """ # pylint: disable=too-many-instance-attributes def __init__(self, conf=None): """ Init function of CloudToBbox :param conf: configuration for hole detection :return: a application_to_use object """ super().__init__(conf=conf) # check conf # get rasterization parameter self.used_method = self.used_config["method"] # Init orchestrator self.orchestrator = None
[docs] def check_conf(self, conf): """ Check configuration :param conf: configuration to check :type conf: dict :return: overloaded configuration :rtype: dict """ # init conf if conf is not None: overloaded_conf = conf.copy() else: conf = {} overloaded_conf = {} # Overload conf # get rasterization parameter overloaded_conf["method"] = conf.get("method", "cloud_to_bbox") hole_detection_schema = { "method": str, OptionalKey(application_constants.SAVE_INTERMEDIATE_DATA): bool, } # Check conf checker = Checker(hole_detection_schema) checker.validate(overloaded_conf) return overloaded_conf
[docs] def run( self, epipolar_images_left, epipolar_images_right, classification=None, margin=0, orchestrator=None, pair_folder=None, pair_key="PAIR_0", ): """ Run Refill application using plane method. :param epipolar_images_left: left epipolar image :type epipolar_images_left: CarsDataset :param epipolar_images_right: right epipolar image :type epipolar_images_right: CarsDataset :param is_activated: activate application :type is_activated: bool :param margin: margin to use :type margin: int :param classification: mask classes to use :type classification: list(str) :param orchestrator: orchestrator used :type orchestrator: Orchestrator :param pair_folder: folder used for current pair :type pair_folder: str :param pair_key: pair id :type pair_key: str :return: left holes, right holes :rtype: Tuple(CarsDataset, CarsDataset) """ # Default orchestrator if orchestrator is None: # Create default sequential orchestrator for current application # be awere, no out_json will be shared between orchestrators # No files saved self.orchestrator = ocht.Orchestrator( orchestrator_conf={"mode": "sequential"} ) else: self.orchestrator = orchestrator if pair_folder is None: pair_folder = os.path.join(self.orchestrator.out_dir, "tmp") safe_makedirs(pair_folder) if epipolar_images_left.dataset_type == "arrays": # Create CarsDataset # Epipolar_disparity left_bbox_cars_ds = cars_dataset.CarsDataset( "dict", name="cloud_to_bbox_left_" + pair_key ) left_bbox_cars_ds.create_empty_copy(epipolar_images_left) left_bbox_cars_ds.overlaps *= 0 right_bbox_cars_ds = cars_dataset.CarsDataset( "dict", name="cloud_to_bbox_left_" + pair_key ) right_bbox_cars_ds.create_empty_copy(epipolar_images_right) right_bbox_cars_ds.overlaps *= 0 # Add infos to orchestrator.out_json updating_dict = { application_constants.APPLICATION_TAG: {pair_key: {}} } self.orchestrator.update_out_info(updating_dict) logging.info( "Compute bbox: number tiles: {}".format( epipolar_images_left.shape[1] * epipolar_images_left.shape[0] ) ) if classification not in (None, []): # Get saving infos in order to save tiles when they are computed [ saving_info_left, saving_info_right, ] = self.orchestrator.get_saving_infos( [left_bbox_cars_ds, right_bbox_cars_ds] ) # Add to replace list so tiles will be readble at the same time self.orchestrator.add_to_replace_lists( left_bbox_cars_ds, cars_ds_name="epi_msk_bbox_left" ) self.orchestrator.add_to_replace_lists( right_bbox_cars_ds, cars_ds_name="epi_msk_bbox_right" ) # Generate disparity maps for col in range(epipolar_images_left.shape[1]): for row in range(epipolar_images_left.shape[0]): if (epipolar_images_left[row, col] is not None) or ( epipolar_images_right[row, col] is not None ): # update saving_info with row and col needed for # replacement full_saving_info_left = ocht.update_saving_infos( saving_info_left, row=row, col=col ) full_saving_info_right = ocht.update_saving_infos( saving_info_right, row=row, col=col ) # get window and overlaps window_left = epipolar_images_left.tiling_grid[ row, col, : ] window_right = epipolar_images_right.tiling_grid[ row, col, : ] overlap_left = epipolar_images_left.overlaps[ row, col, : ] overlap_right = epipolar_images_right.overlaps[ row, col, : ] # Compute bbox ( left_bbox_cars_ds[row, col], right_bbox_cars_ds[row, col], ) = self.orchestrator.cluster.create_task( compute_mask_bboxes_wrapper, nout=2 )( epipolar_images_left[row, col], epipolar_images_right[row, col], window_left, window_right, overlap_left, overlap_right, classification, saving_info_left=full_saving_info_left, saving_info_right=full_saving_info_right, ) else: logging.error( "CloudToBbox application doesn't " "support this input data format" ) return left_bbox_cars_ds, right_bbox_cars_ds
[docs]def compute_mask_bboxes_wrapper( left_image_dataset, right_image_dataset, window_left, window_right, overlap_left, overlap_right, classification, margin=20, saving_info_left=None, saving_info_right=None, ): """ Compute mask bounding boxes. :param left_image_dataset: tiled Left image - dataset with : - cst.EPI_IMAGE - cst.EPI_MSK (if given) - cst.EPI_COLOR (for left, if given) :type left_image_dataset: xr.Dataset - dataset with : - cst.EPI_IMAGE - cst.EPI_MSK (if given) - cst.EPI_COLOR (for left, if given) :param right_image_dataset: tiled Right image :type right_image_dataset: xr.Dataset :param window_left: left window :type window_left: dict :param window_right: right window :type window_right: dict :param overlap_left: left overlpas :type overlap_left: dict :param overlap_right: right overlaps :type overlap_right: dict :param classification: mask classes to use :type classification: list(str) :param margin: margin to use :type margin: int :param saving_info_left: saving infos left :type saving_info_left: dict :param saving_info_right: saving infos right :type saving_info_right: dict :return: Left image object, Right image object (if exists) Returned objects are composed of dataset with : - cst.EPI_IMAGE - cst.EPI_MSK (if given) - cst.EPI_COLOR (for left, if given) """ # compute offsets row_offset_left = window_left[0] - overlap_left[0] col_offset_left = window_left[2] - overlap_left[2] row_offset_right = window_right[0] - overlap_right[0] col_offset_right = window_right[2] - overlap_right[2] bbox_left = {} if left_image_dataset is not None: bbox_left = hole_detection_tools.localize_masked_areas( left_image_dataset, classification, row_offset=row_offset_left, col_offset=col_offset_left, margin=margin, ) bbox_right = {} if right_image_dataset is not None: bbox_right = hole_detection_tools.localize_masked_areas( right_image_dataset, classification, row_offset=row_offset_right, col_offset=col_offset_right, margin=margin, ) # add saving infos bbox_left_dict = cars_dict.CarsDict({"list_bbox": bbox_left}) cars_dataset.fill_dict(bbox_left_dict, saving_info=saving_info_left) bbox_right_dict = cars_dict.CarsDict({"list_bbox": bbox_right}) cars_dataset.fill_dict(bbox_right_dict, saving_info=saving_info_right) return bbox_left_dict, bbox_right_dict