Source code for laborchestrator.structures

"""
Here we define the structures of a process, a scheduling instance etc.
"""
import traceback
from dataclasses import dataclass, field
from typing import List, Dict, Any, Union, Callable, Optional
from enum import IntEnum
from copy import deepcopy
from datetime import datetime, timedelta
from laborchestrator.orchestrator_interface import ProcessExecutionState
from laborchestrator.logging_manager import StandardLogger as Logger
import networkx as nx
import pandas as pd
import plotly.express as px
from graphviz import Digraph


[docs] class StepStatus(IntEnum): WAITING = 0 RUNNING = 1 FINISHED = 2 ERROR = 3
[docs] @dataclass class UsedDevice: device_type: Any name: Optional[str] = None # is set when a specific device is assigned preferred: Optional[str] = None # optional with for a specific device tag: str = "" # convenient field to add information like main, source, target, additional, etc. def __str__(self): return f"{self.tag}|{self.device_type}|{self.name}|{self.preferred}"
[docs] @dataclass class ContainerInfo: """Editable data collection for containers going through a lab process.""" name: str current_device: str current_pos: int start_device: UsedDevice filled: bool = True # this is safer, since filled container are handled more carefully content: str = "" labware_type: str = "" barcode: Optional[str] = None finished: bool = False lidded: bool = False lid_site: Any = None in_error_state: bool = False is_reagent: bool = False
[docs] @dataclass class ProcessStep: """Editable data collection for a general operations in a lab process.""" name: str # important for referencing. Should be the same in the workflow graph. cont_names: List[str] # the first one should be the main container (if there is such) function: str # todo: change this to some kind of enum duration: float # duration of the operation in seconds process_name: str = "" # unique (intern, human-readable) name of the experiment this job belongs to status: StepStatus = StepStatus.WAITING start: Union[datetime, None] = None finish: Union[datetime, None] = None prior: List[str] = field(default_factory=list) # list of the names of operations prior to this used_devices: List[UsedDevice] = field(default_factory=list) # containers to store information on used devices label: str = 'LABEL' # use only for visualization wait_cost: Dict[str, float] = field(default_factory=dict) # waiting costs after prior operations(linked by name) max_wait: Dict[str, int] = field(default_factory=dict) # maximum waiting times after prior operations min_wait: Dict[str, int] = field(default_factory=dict) # minimum waiting times after prior operations wait_to_start_costs: float = 0 is_start: bool = False # flag whether this job has no other jobs that have to be finished prior opacity: float = 1 # the reduces if this job is behind an if-statement result: Any = None # todo sollte eine uuid zur datenbank werden data: Dict = field(default_factory=dict) # custom data like duration, speed, etc. @property def cont(self): if not self.cont_names: return None return self.cont_names[0]
[docs] def priors_done(self, job_by_name: Dict): """Checks whether all prerequisite operations are finished""" return all([job_by_name[name].status == StepStatus.FINISHED for name in self.prior])
@property def main_device(self) -> Optional[UsedDevice]: for used_device in self.used_devices: if used_device.tag == "main": return used_device return None
[docs] @dataclass class MoveStep(ProcessStep): """Operation resembling a movement of a container""" pref_dest_pos: Optional[int] = None # optional preferences for the destination slot number destination_pos: int = 0 # actual destination slot number (set at execution runtime) origin_pos: int = 0 # this should not be relevant for the execution but might be nice for logging @property def origin_device(self) -> Optional[UsedDevice]: for used_device in self.used_devices: if used_device.tag == "origin": return used_device return None @property def target_device(self) -> Optional[UsedDevice]: for used_device in self.used_devices: if used_device.tag == "target": return used_device return None
[docs] @dataclass class Variable: """Representing a variable node of the workflow graph""" name: str # to index the node in the implicit workflow graph var_name: str result: Any = None prior: List[str] = field(default_factory=list) status: StepStatus = StepStatus.WAITING opacity: float = 1 # the reduces if this variable is behind an if-statement
[docs] @dataclass class IfNode: """Represents a constraint in the workflow graph""" name: str evaluation: Callable[[Dict[str, Any]], bool] # takes **kwargs and outputs the decision of the node decision: Union[bool, None] = None prior: List[str] = field(default_factory=list) # indices of the variable nodes needed for a decision true_tree: List[str] = field(default_factory=list) # indices of nodes to be executed (directly) after positive decision false_tree: List[str] = field(default_factory=list) # indices of nodes not to be executed (directly) after positive decision status: StepStatus = StepStatus.WAITING opacity: float = 1 # the reduces if this node is behind an if-statement
[docs] @dataclass class Computation: name: str var_name: str evaluation: Callable[[Dict[str, Any]], Any] # takes **kwargs and outputs the computation result prior: List[str] = field(default_factory=list) # indices of the variable nodes needed for computation result: Any = None status: StepStatus = StepStatus.WAITING opacity: float = 1 # the reduces if this computation is behind an if-statement
[docs] @dataclass class ScheduledAssignment: start: datetime # scheduled start of the job # List of jobs scheduled on the same machine, that have to finish prior machine_prior: List[str] = field(default_factory=list) # other participating are assigned by their tag in used_devices. e.g.: 'target'->'Carousel' participating: Dict[str, str] = field(default_factory=dict) @property def device(self) -> Optional[str]: """ :return: name of the device this job is scheduled on """ if 'main' not in self.participating: return None return self.participating['main']
# a schedule is an assignment of start and device to each job
[docs] class Schedule(Dict[str, ScheduledAssignment]): def __str__(self): return "\n".join(f"{idx} --> {assign}" for idx, assign in self.items())
# all kinds of nodes except for container nodes Operable = Union[ProcessStep, Variable, IfNode, Computation] node_col = dict(container='grey', variable='blue', operation='red', if_node='orange', computation="lightgrey", dummy="cyan") node_col2 = {ContainerInfo: 'grey', Variable: 'blue', IfNode: 'orange', Computation: "lightgrey", ProcessStep: 'red', MoveStep: 'red'} job_col = {StepStatus.WAITING: 'red', StepStatus.RUNNING: 'yellow', StepStatus.FINISHED: 'green', StepStatus.ERROR: 'pink'} ProcessInfo = str
[docs] class SMProcess: """ Encapsulates the information about an experimental process (as workflow graph) and provides utility functions. This structure is independent of the process description language """ name: str experiment_uuid: str # earliest possible starting time. Allows to schedule processes for a delayed start min_start: datetime | None _status: ProcessExecutionState steps: List[ProcessStep] containers: List[ContainerInfo] variables: List[Variable] if_nodes: List[IfNode] computations: List[Computation] def __init__(self, name: str, priority: int = 0): self.steps = [] self.containers = [] self.variables = [] self.if_nodes = [] self.computations = [] self.priority = priority self.name = name self.experiment_uuid = name self._status = ProcessExecutionState.IDLE self.min_start = None
[docs] def update_reagent_opacity(self): """ This method sets the opacity of all operations of reagents :return: """ g = self.wfg step_by_id = {j.name: j for j in self.steps} for cont in self.containers: if cont.is_reagent: name = cont.name for buff, start_job in g.out_edges(name): # get all descendants reachable = nx.descendants(g, start_job) # filter the ones, that use this reagent but do also involve other containers needed_for = [step_by_id[idx] for idx in reachable if idx in step_by_id and name in step_by_id[idx].cont_names and len(step_by_id[idx].cont_names) > 1] max_opacity = .5 if len(needed_for) == 0 else max(job.opacity for job in needed_for) reachable.add(start_job) for reagent_usage in reachable: if reagent_usage in step_by_id: step = step_by_id[reagent_usage] # we iterate over all jobs, that only use this reagent if step.cont == name: step.opacity = max_opacity if not step.opacity == max_opacity: Logger.warning(f"setting opacity of {step.name} to {max_opacity}")
[docs] def get_info(self) -> ProcessInfo: return self.name
@property def status(self): """ This automatically sets the status to FINISHED if all steps are finished :return: """ if all(step.status == StepStatus.FINISHED for step in self.steps): self._status = ProcessExecutionState.FINISHED return self._status @status.setter def status(self, status): self._status = status @property def wfg(self) -> nx.DiGraph: """ Having the workflow as nx.DiGraph is very useful, but saving it in memory means a second 'source of truth'. So, we create it on the fly. :return: the whole workflow as nx.DiGraph """ g = nx.DiGraph() operable = self.steps + self.if_nodes + self.variables + self.computations # sort out those with opacity 0 all_nodes = self.containers + [n for n in operable if n.opacity > 0] type_tag = {ProcessStep: 'operation', Computation: 'computation', IfNode: 'if_node', Variable: 'variable', ContainerInfo: 'container', MoveStep: 'operation'} node_data = [(n.name, dict(type=type_tag[type(n)], opacity=n.opacity if hasattr(n, 'opacity') else 1)) for n in all_nodes] g.add_nodes_from(node_data) edge_data = [] for n in all_nodes: if hasattr(n, 'prior'): for prior in n.prior: edge_data.append((prior, n.name, dict())) for job in self.steps: if job.is_start: for cont in job.cont_names: edge_data.append((cont, job.name, dict())) g.add_edges_from(edge_data) return g @property def starting_nodes(self) -> list[ProcessStep]: starts = [step for step in self.steps if step.is_start] return starts
[docs] class SchedulingInstance: step_by_id: Dict[str, ProcessStep] container_info_by_name: Dict[str, ContainerInfo] if_node_by_id: Dict[str, IfNode] var_by_id: Dict[str, Variable] computation_by_id: Dict[str, Computation] # here all operations are saved, that got removed f.e. due to runtime decisions deleted_operable: Dict[str, Operable] process_by_name: Dict[str, SMProcess] # processes can be stopped and continued schedule: Schedule def __init__(self): self.container_info_by_name = {} self.process_by_name = {} self.step_by_id = {} self.if_node_by_id = {} self.var_by_id = {} self.computation_by_id = {} self.deleted_operable = {} self.schedule = Schedule() self.future = []
[docs] def add_process(self, process: SMProcess): if process.name in self.process_by_name: logger.Error(f"Error: process names {process.name} already exists") if not self.unique_job_names(process): Logger.error(f"Error: Some operation name occurs twice") # link process, operation and containers self.process_by_name[process.name] = process for job in process.steps: self.step_by_id[job.name] = job job.process_name = process.name for cont in process.containers: self.container_info_by_name[cont.name] = cont for var in process.variables: self.var_by_id[var.name] = var for comp in process.computations: self.computation_by_id[comp.name] = comp for if_node in process.if_nodes: self.if_node_by_id[if_node.name] = if_node
[docs] def remove_process(self, process_id): """ Purges the process and its content from all dictionaries :param process_id: :return: """ Logger.info(f"removing process {process_id}") if process_id not in self.process_by_name: Logger.error(f"There is no process with id {process_id}") return p = self.process_by_name[process_id] for job in p.steps: self.step_by_id.pop(job.name) for cont in p.containers: self.container_info_by_name.pop(cont.name) for var in p.variables: self.var_by_id.pop(var.name) for comp in p.computations: self.computation_by_id.pop(comp.name) for if_node in p.if_nodes: self.if_node_by_id.pop(if_node.name) self.process_by_name.pop(process_id)
[docs] def set_schedule(self, schedule: Schedule): """ saves the schedule and writes its information into all affected job info :param schedule: :return: """ J = self.step_by_id self.schedule = schedule for idx, assign in schedule.items(): J[idx].main_device.name = assign.device # set the names of other participating devices for used_device in J[idx].used_devices: if used_device.tag in assign.participating: used_device.name = assign.participating[used_device.tag]
[docs] def start_process(self, process_id): if process_id not in self.process_by_name: Logger.error(f"There is no process named {process_id}") else: self.process_by_name[process_id].status = ProcessExecutionState.RUNNING
[docs] def stop_process(self, process_id): if process_id not in self.process_by_name: Logger.error(f"There is no process named {process_id}") else: process = self.process_by_name[process_id] if process.status == ProcessExecutionState.RUNNING: process.status = ProcessExecutionState.PAUSED
[docs] def unique_job_names(self, process: SMProcess): """Checks whether none of the processes operations names is already given to an operation.""" return True
[docs] def naming_consistent(self, process: SMProcess): """Check whether the names of the jobs correspond to the indices in the workflow graph.""" return True
[docs] def update_reagent_opacity(self): """ This method sets the opacity of all operations of reagent :return: """ for p in self.process_by_name.values(): p.update_reagent_opacity()
[docs] def visualize_wfg(self): dot = Digraph(comment="Workflow") dot.attr(rankdir='LR') g = self.combined_wfg try: # some default_values for n, data in g.nodes(data=True): data['name'] = n data['color'] = 'cyan' data['style'] = 'filled' for u, v, data in g.edges(data=True): data['c'] = 0 data['w'] = "" # customize the labels and colors for idx, op in self.operable.items(): #g.nodes[idx]['style'] = 'filled'if op.opacity == 1 else 'striped' g.nodes[idx]['style'] = 'filled' if idx in self.future else 'striped' g.nodes[idx]['color'] = node_col2[type(op)] for idx, job in self.step_by_id.items(): g.nodes[idx]['color'] = job_col[job.status] g.nodes[idx]['name'] = job.function for idx_o in job.prior: edge_data = g.edges[idx_o, idx] edge_data['c'] = round(job.wait_cost[idx_o]) if job.min_wait.get(idx_o, 0) > 0: edge_data["w"] = f"{job.min_wait[idx_o]}\u2264w" if job.max_wait.get(idx_o, float('inf')) < float('inf'): edge_data["w"] += f"w\u2264{job.max_wait[idx_o]}" edge_data['w'] = edge_data['w'].replace('ww', 'w') for idx, if_node in self.if_node_by_id.items(): if if_node.status == StepStatus.FINISHED: g.nodes[idx]['name'] += f"\nis {if_node.decision}" else: for if_true in if_node.true_tree: g.edges[idx, if_true]['label'] = "true " for if_true in if_node.false_tree: g.edges[idx, if_true]['label'] = "false " for idx, var in self.var_by_id.items(): if var.status == StepStatus.FINISHED: g.nodes[idx]['name'] += f"\n= {var.result}"[:15] for idx, cont in self.container_info_by_name.items(): g.nodes[idx]['color'] = node_col2[ContainerInfo] g.nodes[idx]['name'] += f"\nBC={cont.barcode}" for idx, comp in self.computation_by_id.items(): if comp.status == StepStatus.FINISHED: g.nodes[idx]['name'] += f"\n= {comp.result}"[:15] # create the dot graph for n, data in g.nodes(data=True): dot.node(str(n), data['name'], color=data['color'], style=data['style']) for u, v, data in g.edges(data=True): dot.edge(str(u), str(v), f"c={data['c']}, {data['w']}") except Exception as ex: Logger.debug(ex, traceback.print_exc()) return None dot.format = "png" return dot
[docs] def schedule_violated(self, tolerance: float = 10) -> bool: tolerance = timedelta(seconds=tolerance) for job_id, scheduled_start in self.schedule.items(): # ignore removed jobs if job_id in self.step_by_id: job = self.step_by_id[job_id] if job.status == StepStatus.WAITING and scheduled_start.start+tolerance < datetime.today(): return True if job.status == StepStatus.RUNNING and scheduled_start.start+tolerance < job.start: return True return False
[docs] def gannt_chart(self): data = [] color_discrete_map = dict(Cytomat1550_1='red', Cytomat1550_2='crimson', VarioskanLUX="green", F5="blue", Rotanta_Transfer="violet", NOW="black") # collect containers with scheduled jobs relevant_cont_names = set() for idx, start in self.schedule.items(): job = self.step_by_id[idx] if job.main_device.name is None: job.main_device.name = "F5" for cont_name in job.cont_names: cont_info = self.container_info_by_name[cont_name] if cont_info.barcode: name = f"BC_({cont_info.barcode})" else: name = f"{cont_name}(BC unknown)" relevant_cont_names.add(name) data.append(dict( name=name, start=start.start, finish=start.start+timedelta(seconds=job.duration), device=job.main_device.name, hover_name=idx, )) # there might be jobs without a container involved if not job.cont_names: relevant_cont_names.add("No Container") data.append(dict( name=f"No Container", start=start.start, finish=start.start+timedelta(seconds=job.duration), device=job.main_device.name, hover_name=idx, )) if self.running_processes_names or len(self.schedule) == 0: for name in relevant_cont_names: now = datetime.today() width = timedelta(seconds=1) data.append(dict(name=name, start=now, finish=now+width, device="NOW")) df = pd.DataFrame(data) fig = px.timeline(df, x_start="start", x_end="finish", y="name", color="device", color_discrete_map=color_discrete_map, hover_name='hover_name') fig.update_yaxes(autorange="reversed") # otherwise tasks are listed from the bottom up return fig
[docs] def remove_operable(self, idx: str): """ Deletes a process step, variable, computation or decision from the structure including all references. """ if idx not in self.operable: Logger.error(f"Node {idx} is not active node in this JSSP") op = self.operable[idx] self.deleted_operable[idx] = op # delete the reference from wherever it belonged to for d in [self.step_by_id, self.var_by_id, self.computation_by_id, self.if_node_by_id]: if idx in d: d.pop(idx) # remove it from the process as well for p in self.process_by_name.values(): for l in [p.containers, p.if_nodes, p.steps, p.variables, p.computations]: if op in l: l.remove(op)
@property def running_processes_names(self) -> List[str]: """ :return a list of the names of all running processes""" running = [name for name, p in self.process_by_name.items() if p.status == ProcessExecutionState.RUNNING] return running @property def process_stati_by_name(self) -> Dict[str, ProcessExecutionState]: return {name: p.status for name, p in self.process_by_name.items()} @property def combined_wfg(self) -> nx.DiGraph: g = nx.DiGraph() for p in self.process_by_name.values(): g = nx.compose(g, p.wfg) return g @property def definite_step_by_id(self) -> Dict[str, ProcessStep]: # mark all jobs, that shall be excluded due to errors conflict = set() for cont_name, cont in self.container_info_by_name.items(): if cont.in_error_state: for idx, step in self.step_by_id.items(): if cont_name in step.cont_names: # adds any step that includes a container in error-state conflict.add(idx) # todo make this a consistent wfg return {idx: job for idx, job in self.step_by_id.items() if job.opacity == 1 and idx not in conflict} @property def definite_if_node_by_id(self): return {idx: node for idx, node in self.if_node_by_id.items() if node.opacity == 1} @property def definite_var_by_id(self): return {idx: var for idx, var in self.var_by_id.items() if var.opacity == 1} @property def definite_computation_by_id(self): return {idx: computation for idx, computation in self.computation_by_id.items() if computation.opacity == 1} @property def operable(self) -> Dict[str, Operable]: res = {} for d in [self.if_node_by_id, self.var_by_id, self.computation_by_id, self.step_by_id]: res.update(d) return res