Source code for laborchestrator.process_reader

"""
The general interface for process readers. These are used to parse process descriptions from a workflow description
language into the laborchestrator's own structures. For exemplary use see pythonlab_reader.py.
"""

from abc import ABC, abstractmethod
from typing import Any, List

from laborchestrator.structures import SMProcess


[docs] class ProcessReader(ABC): def __init__(self): pass
[docs] @abstractmethod def read_process(self, process: Any, **kwargs) -> SMProcess: """ Reads a process written in some workflow description language into the orchestrators own format. :param process: :param kwargs: :return: """
[docs] @staticmethod def adjust_opacities(p: SMProcess): """ Utility function setting the opacities of all process steps that are unsure to be executed (depending on runtime decisions) to .5 :param p: :return: """ changed = [n.name for n in p.if_nodes] while len(changed) > 0: newly_changed = [] for lis in p.steps, p.if_nodes, p.variables, p.computations: for node in lis: if node.opacity == 1 and any([idx in changed for idx in node.prior]): node.opacity = .5 newly_changed.append(node.name) changed = newly_changed
[docs] @staticmethod def get_available_processes(file_dir: str) -> List[Any]: """ Searches a directory for available process descriptions. :param file_dir: :return: A list containing all found process descriptions (in their native language). """ return []