Source code for laborchestrator.process_reader
"""
The general interface for process readers. These are used to parse process descriptions from a workflow description
language into the laborchestrator's own structures. For exemplary use see pythonlab_reader.py.
"""
from abc import ABC, abstractmethod
from typing import Any, List
from laborchestrator.structures import SMProcess
[docs]
class ProcessReader(ABC):
def __init__(self):
pass
[docs]
@abstractmethod
def read_process(self, process: Any, **kwargs) -> SMProcess:
"""
Reads a process written in some workflow description language into the orchestrators own format.
:param process:
:param kwargs:
:return:
"""
[docs]
@staticmethod
def adjust_opacities(p: SMProcess):
"""
Utility function setting the opacities of all process steps that are unsure to be executed
(depending on runtime decisions) to .5
:param p:
:return:
"""
changed = [n.name for n in p.if_nodes]
while len(changed) > 0:
newly_changed = []
for lis in p.steps, p.if_nodes, p.variables, p.computations:
for node in lis:
if node.opacity == 1 and any([idx in changed for idx in node.prior]):
node.opacity = .5
newly_changed.append(node.name)
changed = newly_changed
[docs]
@staticmethod
def get_available_processes(file_dir: str) -> List[Any]:
"""
Searches a directory for available process descriptions.
:param file_dir:
:return: A list containing all found process descriptions (in their native language).
"""
return []