"""
This is a wrapper for a networkx workflow graph.
It adds some convenient functionalities
"""
import traceback
import networkx as nx
from laborchestrator.logging_manager import StandardLogger as Logger
from laborchestrator.structures import Schedule, ScheduledAssignment, ProcessStep, SMProcess
from datetime import datetime
from typing import Optional, List, Dict, NamedTuple, Iterable
[docs]
class Resource(NamedTuple):
Type: str
Tag: str
Preferred: str
[docs]
class Node(NamedTuple):
Idx: str
Duration: float
RequiredResources: List[Resource] # f.e. [('StorageResource', 'origin', 'Carousel'),
# ('MoverResource', 'main', 'Mover'),
# ('IncubationResource', 'target', 'Incubator2')]
StartTime: str
Finish: str
WaitToStartCost: Optional[float] = None
[docs]
class Edge(NamedTuple):
Head: str
Tail: str
WaitCost: Optional[float] = None
MaxWaitingTime: Optional[float] = None
MinWaitingTime: Optional[float] = None
[docs]
class Graph(NamedTuple):
Nodes: List[Node]
Edges: List[Edge]
[docs]
class Assignment:
start: datetime
machine_assignments: Dict[str, str] # f.e. {origin: Carousel, main: Mover, target: Incubator1}
machine_prior: List[str] # additional precedence constraints
[docs]
class WorkFlowGraph:
[docs]
@staticmethod
def create_sila_structure_from_jobs(jobs: Iterable[ProcessStep], wfg: nx.DiGraph):
g = Graph([], [])
job_by_name = {j.name: j for j in jobs}
for j in jobs:
requirements = []
for d in j.used_devices:
try:
type_str = d.device_type.__name__
except Exception as ex:
type_str = str(d.device_type)
# if the step is finished no other device should be scheduled on it
if j.start:
preference = d.name
else:
preference = d.preferred
requirements.append(
Resource(Type=type_str, Tag=d.tag, Preferred=str(preference))
)
g.Nodes.append(Node(j.name, j.duration, requirements, str(j.start), str(j.finish), j.wait_to_start_costs))
connected = set() # avoid double edges
for prior in j.prior:
priors = [prior]
# we also add all edges of precedences that are induced by non-step-nodes
# therefore search for all steps that are connect that way
while priors:
p = priors.pop()
if p in job_by_name:
if p not in connected:
min_wait = j.min_wait[prior] if prior in j.min_wait else None
g.Edges.append(Edge(j.name, p, j.wait_cost[prior], j.max_wait[prior], min_wait))
connected.add(p)
else:
priors.extend([idx for idx in wfg.predecessors(p)
# being connected through steps that might never happen does not count
# if not (idx in job_by_name and job_by_name[idx].opacity < 1)])
if not wfg.nodes[idx]['opacity'] < 1])
return g
[docs]
@staticmethod
def add_waiting_dummies(g: Graph, processes: list[SMProcess]):
dummy_machine = Resource("Dummy", "main", "DummyDump")
for process in processes:
if not process.min_start or process.min_start < datetime.now():
continue
waiting_left = process.min_start - datetime.now()
# create a dummy_task
dummy = Node(Idx=f"dummy_{process.name}", Duration=10, RequiredResources=[dummy_machine],
StartTime="None", Finish="None")
g.Nodes.append(dummy)
# add waiting constraints to this dummy task
for start_step in process.starting_nodes:
dummy_edge = Edge(Head=start_step.name, Tail=dummy.Idx, MinWaitingTime=waiting_left.total_seconds(),
MaxWaitingTime=float('inf'))
g.Edges.append(dummy_edge)
Logger.info(f"Process {process.name} has {waiting_left} to wait")
[docs]
@staticmethod
def create_schedule_from_sila_struct(schedule_response) -> Schedule:
try:
schedule = Schedule()
for elem in schedule_response:
assign = ScheduledAssignment(
# remove the timezone info. it is not needed for anything, except for the sila standard
start=elem.StartTime.replace(tzinfo=None),
participating={m.Tag: m.MachineName for m in elem.AssignedMachines},
machine_prior=elem.MachinePrecedences,
)
if "dummy" in elem.ProcessStepId:
Logger.debug(f"removing {elem.ProcessStepId} from schedule")
continue
schedule[elem.ProcessStepId] = assign
return schedule
except Exception as ex:
Logger.error(f"Could not retrieve schedule from response: {ex}", traceback.print_exc())