Source code for cars.orchestrator.cluster.mp_cluster.mp_factorizer

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2020 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of CARS
# (see https://github.com/CNES/cars).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Contains functions needed to factorize delayed
"""

# CARS imports
from cars.orchestrator.cluster.mp_cluster.mp_objects import (
    FactorizedObject,
    MpDelayedTask,
)


[docs] def factorize_delayed(task_list): """ Factorize Task list Only factorize tasks that depends on a single task :param task_list: list of delayed :type task_list: list(MpDelayed) """ # Compute graph usage graph_usages = compute_graph_delayed_usages(task_list) # Factorize delayed already_seen_delayed_tasks = [] for delayed in task_list: factorize_delayed_rec(delayed, graph_usages, already_seen_delayed_tasks)
[docs] def factorize_delayed_rec(delayed, graph_usages, already_seen_delayed_tasks): """ Factorize Task list Only factorize tasks that depends on a single task :param delayed: delayed to factorize :type delayed: MpDelayed :param graph_usages: number of usages of delayed :type graph_usages: dict example: {delayed1: 3} :param already_seen_delayed_tasks: list of MpDelayedTask already seen """ # check if current delayed can be factorized depending_delayed = delayed.get_depending_delayed() number_depending_task = compute_nb_depending_task(depending_delayed) max_nb_of_usages = 0 if len(depending_delayed) > 0: max_nb_of_usages = max( number_of_usage(deld, graph_usages) for deld in depending_delayed ) current_task = delayed.delayed_task if current_task not in already_seen_delayed_tasks: if number_depending_task == 1 and max_nb_of_usages == 1: previous_task = depending_delayed[0].delayed_task factorized_object = FactorizedObject(current_task, previous_task) # Create new task and assign it to current delay new_task = MpDelayedTask(factorized_fun, [factorized_object], {}) new_task.associated_objects = current_task.associated_objects delayed.delayed_task = new_task # Factorize again with current factorize_delayed_rec( delayed, graph_usages, already_seen_delayed_tasks ) else: # Only set to seen when task is completly factorized already_seen_delayed_tasks.append(current_task) # Get new dependances and factorize it depending_delayed = delayed.get_depending_delayed() for new_delayed in depending_delayed: factorize_delayed_rec( new_delayed, graph_usages, already_seen_delayed_tasks )
[docs] def compute_graph_delayed_usages(task_list): """ Compute the number of times every delayed is used in graph :param task_list: list of delayed :type task_list: list(MpDelayed) :return: number of usages of delayed :rtype: dict example: {delayed1: 3} """ graph_usages = {} already_seen_tasks = [] for delayed in task_list: get_delayed_usage_rec(delayed, graph_usages, already_seen_tasks) return graph_usages
[docs] def get_delayed_usage_rec(delayed, graph_usages, already_seen_tasks): """ Get number of time input delayed is used :param delayed: delayed to factorize :type delayed: MpDelayed :param graph_usages: number of usages of delayed :type graph_usages: dict example: {delayed1: 3} :param already_seen_tasks: list of seen delayed task :type already_seen_tasks: list[MpDelayedTask] """ # update graph_usages if delayed in graph_usages: graph_usages[delayed] += 1 else: graph_usages[delayed] = 1 # get usage of task inputs, if task was not already seen delayed_task = delayed.delayed_task if delayed_task not in already_seen_tasks: # add task to seen already_seen_tasks.append(delayed_task) # get usage for all inputs depending_delayed = delayed.get_depending_delayed() for input_delayed in depending_delayed: get_delayed_usage_rec( input_delayed, graph_usages, already_seen_tasks )
[docs] def number_of_usage(delayed, graph_usages): """ Compute the number of times a delayed is used :param delayed: delayed to factorize :type delayed: MpDelayed :param graph_usages: number of usages of delayed :type graph_usages: dict example: {delayed1: 3} :return: number of usages of delayed :rtype: int """ nb_usage = graph_usages[delayed] return nb_usage
[docs] def compute_nb_depending_task(depending_delayed_list): """ Compute the number of different delayed task in list of delayed :param depending_delayed_list: list of delayed :type depending_delayed_list: list[MpDelayed] :return: number of depending task :rtype: int """ list_delayed_task = [] for delayed in depending_delayed_list: delayed_task = delayed.delayed_task if delayed_task not in list_delayed_task: list_delayed_task.append(delayed_task) return len(list_delayed_task)
# Factorized function and its generator
[docs] def factorized_fun(factorized_object): """ This function unpack multiple functions with their arguments, and run them sequentialy until task list is empty :param factorized_object: Object that contains a list of tasks :type factorized_object: mp_objects.FactorizedObject """ res = None while factorized_object.tasks: # Run next task with output of previous task res = factorized_object.pop_next_task(previous_result=res) return res