"""
Here we define the structures of a process, a scheduling instance etc.
"""
import traceback
from dataclasses import dataclass, field
from typing import List, Dict, Any, Union, Callable, Optional
from enum import IntEnum
from copy import deepcopy
from datetime import datetime, timedelta
from laborchestrator.orchestrator_interface import ProcessExecutionState
from laborchestrator.logging_manager import StandardLogger as Logger
import networkx as nx
import pandas as pd
import plotly.express as px
from graphviz import Digraph
[docs]
class StepStatus(IntEnum):
WAITING = 0
RUNNING = 1
FINISHED = 2
ERROR = 3
[docs]
@dataclass
class UsedDevice:
device_type: Any
name: Optional[str] = None # is set when a specific device is assigned
preferred: Optional[str] = None # optional with for a specific device
tag: str = "" # convenient field to add information like main, source, target, additional, etc.
def __str__(self):
return f"{self.tag}|{self.device_type}|{self.name}|{self.preferred}"
[docs]
@dataclass
class ContainerInfo:
"""Editable data collection for containers going through a lab process."""
name: str
current_device: str
current_pos: int
start_device: UsedDevice
filled: bool = True # this is safer, since filled container are handled more carefully
content: str = ""
labware_type: str = ""
barcode: Optional[str] = None
finished: bool = False
lidded: bool = False
lid_site: Any = None
in_error_state: bool = False
is_reagent: bool = False
[docs]
@dataclass
class ProcessStep:
"""Editable data collection for a general operations in a lab process."""
name: str # important for referencing. Should be the same in the workflow graph.
cont_names: List[str] # the first one should be the main container (if there is such)
function: str # todo: change this to some kind of enum
duration: float # duration of the operation in seconds
process_name: str = "" # unique (intern, human-readable) name of the experiment this job belongs to
status: StepStatus = StepStatus.WAITING
start: Union[datetime, None] = None
finish: Union[datetime, None] = None
prior: List[str] = field(default_factory=list) # list of the names of operations prior to this
used_devices: List[UsedDevice] = field(default_factory=list) # containers to store information on used devices
label: str = 'LABEL' # use only for visualization
wait_cost: Dict[str, float] = field(default_factory=dict) # waiting costs after prior operations(linked by name)
max_wait: Dict[str, int] = field(default_factory=dict) # maximum waiting times after prior operations
min_wait: Dict[str, int] = field(default_factory=dict) # minimum waiting times after prior operations
wait_to_start_costs: float = 0
is_start: bool = False # flag whether this job has no other jobs that have to be finished prior
opacity: float = 1 # the reduces if this job is behind an if-statement
result: Any = None # todo sollte eine uuid zur datenbank werden
data: Dict = field(default_factory=dict) # custom data like duration, speed, etc.
@property
def cont(self):
if not self.cont_names:
return None
return self.cont_names[0]
[docs]
def priors_done(self, job_by_name: Dict):
"""Checks whether all prerequisite operations are finished"""
return all([job_by_name[name].status == StepStatus.FINISHED for name in self.prior])
@property
def main_device(self) -> Optional[UsedDevice]:
for used_device in self.used_devices:
if used_device.tag == "main":
return used_device
return None
[docs]
@dataclass
class MoveStep(ProcessStep):
"""Operation resembling a movement of a container"""
pref_dest_pos: Optional[int] = None # optional preferences for the destination slot number
destination_pos: int = 0 # actual destination slot number (set at execution runtime)
origin_pos: int = 0 # this should not be relevant for the execution but might be nice for logging
@property
def origin_device(self) -> Optional[UsedDevice]:
for used_device in self.used_devices:
if used_device.tag == "origin":
return used_device
return None
@property
def target_device(self) -> Optional[UsedDevice]:
for used_device in self.used_devices:
if used_device.tag == "target":
return used_device
return None
[docs]
@dataclass
class Variable:
"""Representing a variable node of the workflow graph"""
name: str # to index the node in the implicit workflow graph
var_name: str
result: Any = None
prior: List[str] = field(default_factory=list)
status: StepStatus = StepStatus.WAITING
opacity: float = 1 # the reduces if this variable is behind an if-statement
[docs]
@dataclass
class IfNode:
"""Represents a constraint in the workflow graph"""
name: str
evaluation: Callable[[Dict[str, Any]], bool] # takes **kwargs and outputs the decision of the node
decision: Union[bool, None] = None
prior: List[str] = field(default_factory=list) # indices of the variable nodes needed for a decision
true_tree: List[str] = field(default_factory=list) # indices of nodes to be executed (directly) after positive decision
false_tree: List[str] = field(default_factory=list) # indices of nodes not to be executed (directly) after positive decision
status: StepStatus = StepStatus.WAITING
opacity: float = 1 # the reduces if this node is behind an if-statement
[docs]
@dataclass
class Computation:
name: str
var_name: str
evaluation: Callable[[Dict[str, Any]], Any] # takes **kwargs and outputs the computation result
prior: List[str] = field(default_factory=list) # indices of the variable nodes needed for computation
result: Any = None
status: StepStatus = StepStatus.WAITING
opacity: float = 1 # the reduces if this computation is behind an if-statement
[docs]
@dataclass
class ScheduledAssignment:
start: datetime # scheduled start of the job
# List of jobs scheduled on the same machine, that have to finish prior
machine_prior: List[str] = field(default_factory=list)
# other participating are assigned by their tag in used_devices. e.g.: 'target'->'Carousel'
participating: Dict[str, str] = field(default_factory=dict)
@property
def device(self) -> Optional[str]:
"""
:return: name of the device this job is scheduled on
"""
if 'main' not in self.participating:
return None
return self.participating['main']
# a schedule is an assignment of start and device to each job
[docs]
class Schedule(Dict[str, ScheduledAssignment]):
def __str__(self):
return "\n".join(f"{idx} --> {assign}" for idx, assign in self.items())
# all kinds of nodes except for container nodes
Operable = Union[ProcessStep, Variable, IfNode, Computation]
node_col = dict(container='grey', variable='blue', operation='red', if_node='orange', computation="lightgrey", dummy="cyan")
node_col2 = {ContainerInfo: 'grey', Variable: 'blue', IfNode: 'orange', Computation: "lightgrey",
ProcessStep: 'red', MoveStep: 'red'}
job_col = {StepStatus.WAITING: 'red', StepStatus.RUNNING: 'yellow', StepStatus.FINISHED: 'green', StepStatus.ERROR: 'pink'}
ProcessInfo = str
[docs]
class SMProcess:
"""
Encapsulates the information about an experimental process (as workflow graph) and provides utility functions.
This structure is independent of the process description language
"""
name: str
experiment_uuid: str
# earliest possible starting time. Allows to schedule processes for a delayed start
min_start: datetime | None
_status: ProcessExecutionState
steps: List[ProcessStep]
containers: List[ContainerInfo]
variables: List[Variable]
if_nodes: List[IfNode]
computations: List[Computation]
def __init__(self, name: str, priority: int = 0):
self.steps = []
self.containers = []
self.variables = []
self.if_nodes = []
self.computations = []
self.priority = priority
self.name = name
self.experiment_uuid = name
self._status = ProcessExecutionState.IDLE
self.min_start = None
[docs]
def update_reagent_opacity(self):
"""
This method sets the opacity of all operations of reagents
:return:
"""
g = self.wfg
step_by_id = {j.name: j for j in self.steps}
for cont in self.containers:
if cont.is_reagent:
name = cont.name
for buff, start_job in g.out_edges(name):
# get all descendants
reachable = nx.descendants(g, start_job)
# filter the ones, that use this reagent but do also involve other containers
needed_for = [step_by_id[idx] for idx in reachable
if idx in step_by_id and
name in step_by_id[idx].cont_names and
len(step_by_id[idx].cont_names) > 1]
max_opacity = .5 if len(needed_for) == 0 else max(job.opacity for job in needed_for)
reachable.add(start_job)
for reagent_usage in reachable:
if reagent_usage in step_by_id:
step = step_by_id[reagent_usage]
# we iterate over all jobs, that only use this reagent
if step.cont == name:
step.opacity = max_opacity
if not step.opacity == max_opacity:
Logger.warning(f"setting opacity of {step.name} to {max_opacity}")
[docs]
def get_info(self) -> ProcessInfo:
return self.name
@property
def status(self):
"""
This automatically sets the status to FINISHED if all steps are finished
:return:
"""
if all(step.status == StepStatus.FINISHED for step in self.steps):
self._status = ProcessExecutionState.FINISHED
return self._status
@status.setter
def status(self, status):
self._status = status
@property
def wfg(self) -> nx.DiGraph:
"""
Having the workflow as nx.DiGraph is very useful, but saving it in memory means a second 'source of truth'.
So, we create it on the fly.
:return: the whole workflow as nx.DiGraph
"""
g = nx.DiGraph()
operable = self.steps + self.if_nodes + self.variables + self.computations
# sort out those with opacity 0
all_nodes = self.containers + [n for n in operable if n.opacity > 0]
type_tag = {ProcessStep: 'operation', Computation: 'computation', IfNode: 'if_node', Variable: 'variable',
ContainerInfo: 'container', MoveStep: 'operation'}
node_data = [(n.name, dict(type=type_tag[type(n)],
opacity=n.opacity if hasattr(n, 'opacity') else 1))
for n in all_nodes]
g.add_nodes_from(node_data)
edge_data = []
for n in all_nodes:
if hasattr(n, 'prior'):
for prior in n.prior:
edge_data.append((prior, n.name, dict()))
for job in self.steps:
if job.is_start:
for cont in job.cont_names:
edge_data.append((cont, job.name, dict()))
g.add_edges_from(edge_data)
return g
@property
def starting_nodes(self) -> list[ProcessStep]:
starts = [step for step in self.steps if step.is_start]
return starts
[docs]
class SchedulingInstance:
step_by_id: Dict[str, ProcessStep]
container_info_by_name: Dict[str, ContainerInfo]
if_node_by_id: Dict[str, IfNode]
var_by_id: Dict[str, Variable]
computation_by_id: Dict[str, Computation]
# here all operations are saved, that got removed f.e. due to runtime decisions
deleted_operable: Dict[str, Operable]
process_by_name: Dict[str, SMProcess]
# processes can be stopped and continued
schedule: Schedule
def __init__(self):
self.container_info_by_name = {}
self.process_by_name = {}
self.step_by_id = {}
self.if_node_by_id = {}
self.var_by_id = {}
self.computation_by_id = {}
self.deleted_operable = {}
self.schedule = Schedule()
self.future = []
[docs]
def add_process(self, process: SMProcess):
if process.name in self.process_by_name:
logger.Error(f"Error: process names {process.name} already exists")
if not self.unique_job_names(process):
Logger.error(f"Error: Some operation name occurs twice")
# link process, operation and containers
self.process_by_name[process.name] = process
for job in process.steps:
self.step_by_id[job.name] = job
job.process_name = process.name
for cont in process.containers:
self.container_info_by_name[cont.name] = cont
for var in process.variables:
self.var_by_id[var.name] = var
for comp in process.computations:
self.computation_by_id[comp.name] = comp
for if_node in process.if_nodes:
self.if_node_by_id[if_node.name] = if_node
[docs]
def remove_process(self, process_id):
"""
Purges the process and its content from all dictionaries
:param process_id:
:return:
"""
Logger.info(f"removing process {process_id}")
if process_id not in self.process_by_name:
Logger.error(f"There is no process with id {process_id}")
return
p = self.process_by_name[process_id]
for job in p.steps:
self.step_by_id.pop(job.name)
for cont in p.containers:
self.container_info_by_name.pop(cont.name)
for var in p.variables:
self.var_by_id.pop(var.name)
for comp in p.computations:
self.computation_by_id.pop(comp.name)
for if_node in p.if_nodes:
self.if_node_by_id.pop(if_node.name)
self.process_by_name.pop(process_id)
[docs]
def set_schedule(self, schedule: Schedule):
"""
saves the schedule and writes its information into all affected job info
:param schedule:
:return:
"""
J = self.step_by_id
self.schedule = schedule
for idx, assign in schedule.items():
J[idx].main_device.name = assign.device
# set the names of other participating devices
for used_device in J[idx].used_devices:
if used_device.tag in assign.participating:
used_device.name = assign.participating[used_device.tag]
[docs]
def start_process(self, process_id):
if process_id not in self.process_by_name:
Logger.error(f"There is no process named {process_id}")
else:
self.process_by_name[process_id].status = ProcessExecutionState.RUNNING
[docs]
def stop_process(self, process_id):
if process_id not in self.process_by_name:
Logger.error(f"There is no process named {process_id}")
else:
process = self.process_by_name[process_id]
if process.status == ProcessExecutionState.RUNNING:
process.status = ProcessExecutionState.PAUSED
[docs]
def unique_job_names(self, process: SMProcess):
"""Checks whether none of the processes operations names is already given to an operation."""
return True
[docs]
def naming_consistent(self, process: SMProcess):
"""Check whether the names of the jobs correspond to the indices in the workflow graph."""
return True
[docs]
def update_reagent_opacity(self):
"""
This method sets the opacity of all operations of reagent
:return:
"""
for p in self.process_by_name.values():
p.update_reagent_opacity()
[docs]
def visualize_wfg(self):
dot = Digraph(comment="Workflow")
dot.attr(rankdir='LR')
g = self.combined_wfg
try:
# some default_values
for n, data in g.nodes(data=True):
data['name'] = n
data['color'] = 'cyan'
data['style'] = 'filled'
for u, v, data in g.edges(data=True):
data['c'] = 0
data['w'] = ""
# customize the labels and colors
for idx, op in self.operable.items():
#g.nodes[idx]['style'] = 'filled'if op.opacity == 1 else 'striped'
g.nodes[idx]['style'] = 'filled' if idx in self.future else 'striped'
g.nodes[idx]['color'] = node_col2[type(op)]
for idx, job in self.step_by_id.items():
g.nodes[idx]['color'] = job_col[job.status]
g.nodes[idx]['name'] = job.function
for idx_o in job.prior:
edge_data = g.edges[idx_o, idx]
edge_data['c'] = round(job.wait_cost[idx_o])
if job.min_wait.get(idx_o, 0) > 0:
edge_data["w"] = f"{job.min_wait[idx_o]}\u2264w"
if job.max_wait.get(idx_o, float('inf')) < float('inf'):
edge_data["w"] += f"w\u2264{job.max_wait[idx_o]}"
edge_data['w'] = edge_data['w'].replace('ww', 'w')
for idx, if_node in self.if_node_by_id.items():
if if_node.status == StepStatus.FINISHED:
g.nodes[idx]['name'] += f"\nis {if_node.decision}"
else:
for if_true in if_node.true_tree:
g.edges[idx, if_true]['label'] = "true "
for if_true in if_node.false_tree:
g.edges[idx, if_true]['label'] = "false "
for idx, var in self.var_by_id.items():
if var.status == StepStatus.FINISHED:
g.nodes[idx]['name'] += f"\n= {var.result}"[:15]
for idx, cont in self.container_info_by_name.items():
g.nodes[idx]['color'] = node_col2[ContainerInfo]
g.nodes[idx]['name'] += f"\nBC={cont.barcode}"
for idx, comp in self.computation_by_id.items():
if comp.status == StepStatus.FINISHED:
g.nodes[idx]['name'] += f"\n= {comp.result}"[:15]
# create the dot graph
for n, data in g.nodes(data=True):
dot.node(str(n), data['name'], color=data['color'], style=data['style'])
for u, v, data in g.edges(data=True):
dot.edge(str(u), str(v), f"c={data['c']}, {data['w']}")
except Exception as ex:
Logger.debug(ex, traceback.print_exc())
return None
dot.format = "png"
return dot
[docs]
def schedule_violated(self, tolerance: float = 10) -> bool:
tolerance = timedelta(seconds=tolerance)
for job_id, scheduled_start in self.schedule.items():
# ignore removed jobs
if job_id in self.step_by_id:
job = self.step_by_id[job_id]
if job.status == StepStatus.WAITING and scheduled_start.start+tolerance < datetime.today():
return True
if job.status == StepStatus.RUNNING and scheduled_start.start+tolerance < job.start:
return True
return False
[docs]
def gannt_chart(self):
data = []
color_discrete_map = dict(Cytomat1550_1='red', Cytomat1550_2='crimson', VarioskanLUX="green",
F5="blue", Rotanta_Transfer="violet", NOW="black")
# collect containers with scheduled jobs
relevant_cont_names = set()
for idx, start in self.schedule.items():
job = self.step_by_id[idx]
if job.main_device.name is None:
job.main_device.name = "F5"
for cont_name in job.cont_names:
cont_info = self.container_info_by_name[cont_name]
if cont_info.barcode:
name = f"BC_({cont_info.barcode})"
else:
name = f"{cont_name}(BC unknown)"
relevant_cont_names.add(name)
data.append(dict(
name=name,
start=start.start,
finish=start.start+timedelta(seconds=job.duration),
device=job.main_device.name,
hover_name=idx,
))
# there might be jobs without a container involved
if not job.cont_names:
relevant_cont_names.add("No Container")
data.append(dict(
name=f"No Container",
start=start.start,
finish=start.start+timedelta(seconds=job.duration),
device=job.main_device.name,
hover_name=idx,
))
if self.running_processes_names or len(self.schedule) == 0:
for name in relevant_cont_names:
now = datetime.today()
width = timedelta(seconds=1)
data.append(dict(name=name, start=now, finish=now+width, device="NOW"))
df = pd.DataFrame(data)
fig = px.timeline(df, x_start="start", x_end="finish", y="name", color="device",
color_discrete_map=color_discrete_map, hover_name='hover_name')
fig.update_yaxes(autorange="reversed") # otherwise tasks are listed from the bottom up
return fig
[docs]
def remove_operable(self, idx: str):
"""
Deletes a process step, variable, computation or decision from the structure including all references.
"""
if idx not in self.operable:
Logger.error(f"Node {idx} is not active node in this JSSP")
op = self.operable[idx]
self.deleted_operable[idx] = op
# delete the reference from wherever it belonged to
for d in [self.step_by_id, self.var_by_id, self.computation_by_id, self.if_node_by_id]:
if idx in d:
d.pop(idx)
# remove it from the process as well
for p in self.process_by_name.values():
for l in [p.containers, p.if_nodes, p.steps, p.variables, p.computations]:
if op in l:
l.remove(op)
@property
def running_processes_names(self) -> List[str]:
""" :return a list of the names of all running processes"""
running = [name for name, p in self.process_by_name.items() if
p.status == ProcessExecutionState.RUNNING]
return running
@property
def process_stati_by_name(self) -> Dict[str, ProcessExecutionState]:
return {name: p.status for name, p in self.process_by_name.items()}
@property
def combined_wfg(self) -> nx.DiGraph:
g = nx.DiGraph()
for p in self.process_by_name.values():
g = nx.compose(g, p.wfg)
return g
@property
def definite_step_by_id(self) -> Dict[str, ProcessStep]:
# mark all jobs, that shall be excluded due to errors
conflict = set()
for cont_name, cont in self.container_info_by_name.items():
if cont.in_error_state:
for idx, step in self.step_by_id.items():
if cont_name in step.cont_names:
# adds any step that includes a container in error-state
conflict.add(idx)
# todo make this a consistent wfg
return {idx: job for idx, job in self.step_by_id.items() if job.opacity == 1 and idx not in conflict}
@property
def definite_if_node_by_id(self):
return {idx: node for idx, node in self.if_node_by_id.items() if node.opacity == 1}
@property
def definite_var_by_id(self):
return {idx: var for idx, var in self.var_by_id.items() if var.opacity == 1}
@property
def definite_computation_by_id(self):
return {idx: computation for idx, computation in self.computation_by_id.items() if computation.opacity == 1}
@property
def operable(self) -> Dict[str, Operable]:
res = {}
for d in [self.if_node_by_id, self.var_by_id, self.computation_by_id, self.step_by_id]:
res.update(d)
return res