Source code for laborchestrator.workflowgraph

"""
This is a wrapper for a networkx workflow graph.
It adds some convenient functionalities
"""
import traceback
import networkx as nx
from laborchestrator.logging_manager import StandardLogger as Logger
from laborchestrator.structures import Schedule, ScheduledAssignment, ProcessStep, SMProcess
from datetime import datetime
from typing import Optional, List, Dict, NamedTuple, Iterable


[docs] class Resource(NamedTuple): Type: str Tag: str Preferred: str
[docs] class Node(NamedTuple): Idx: str Duration: float RequiredResources: List[Resource] # f.e. [('StorageResource', 'origin', 'Carousel'), # ('MoverResource', 'main', 'Mover'), # ('IncubationResource', 'target', 'Incubator2')] StartTime: str Finish: str WaitToStartCost: Optional[float] = None
[docs] class Edge(NamedTuple): Head: str Tail: str WaitCost: Optional[float] = None MaxWaitingTime: Optional[float] = None MinWaitingTime: Optional[float] = None
[docs] class Graph(NamedTuple): Nodes: List[Node] Edges: List[Edge]
[docs] class Assignment: start: datetime machine_assignments: Dict[str, str] # f.e. {origin: Carousel, main: Mover, target: Incubator1} machine_prior: List[str] # additional precedence constraints
[docs] class WorkFlowGraph:
[docs] @staticmethod def create_sila_structure_from_jobs(jobs: Iterable[ProcessStep], wfg: nx.DiGraph): g = Graph([], []) job_by_name = {j.name: j for j in jobs} for j in jobs: requirements = [] for d in j.used_devices: try: type_str = d.device_type.__name__ except Exception as ex: type_str = str(d.device_type) # if the step is finished no other device should be scheduled on it if j.start: preference = d.name else: preference = d.preferred requirements.append( Resource(Type=type_str, Tag=d.tag, Preferred=str(preference)) ) g.Nodes.append(Node(j.name, j.duration, requirements, str(j.start), str(j.finish), j.wait_to_start_costs)) connected = set() # avoid double edges for prior in j.prior: priors = [prior] # we also add all edges of precedences that are induced by non-step-nodes # therefore search for all steps that are connect that way while priors: p = priors.pop() if p in job_by_name: if p not in connected: min_wait = j.min_wait[prior] if prior in j.min_wait else None g.Edges.append(Edge(j.name, p, j.wait_cost[prior], j.max_wait[prior], min_wait)) connected.add(p) else: priors.extend([idx for idx in wfg.predecessors(p) # being connected through steps that might never happen does not count # if not (idx in job_by_name and job_by_name[idx].opacity < 1)]) if not wfg.nodes[idx]['opacity'] < 1]) return g
[docs] @staticmethod def add_waiting_dummies(g: Graph, processes: list[SMProcess]): dummy_machine = Resource("Dummy", "main", "DummyDump") for process in processes: if not process.min_start or process.min_start < datetime.now(): continue waiting_left = process.min_start - datetime.now() # create a dummy_task dummy = Node(Idx=f"dummy_{process.name}", Duration=10, RequiredResources=[dummy_machine], StartTime="None", Finish="None") g.Nodes.append(dummy) # add waiting constraints to this dummy task for start_step in process.starting_nodes: dummy_edge = Edge(Head=start_step.name, Tail=dummy.Idx, MinWaitingTime=waiting_left.total_seconds(), MaxWaitingTime=float('inf')) g.Edges.append(dummy_edge) Logger.info(f"Process {process.name} has {waiting_left} to wait")
[docs] @staticmethod def create_schedule_from_sila_struct(schedule_response) -> Schedule: try: schedule = Schedule() for elem in schedule_response: assign = ScheduledAssignment( # remove the timezone info. it is not needed for anything, except for the sila standard start=elem.StartTime.replace(tzinfo=None), participating={m.Tag: m.MachineName for m in elem.AssignedMachines}, machine_prior=elem.MachinePrecedences, ) if "dummy" in elem.ProcessStepId: Logger.debug(f"removing {elem.ProcessStepId} from schedule") continue schedule[elem.ProcessStepId] = assign return schedule except Exception as ex: Logger.error(f"Could not retrieve schedule from response: {ex}", traceback.print_exc())