Source code for laborchestrator.engine.wfg_manager
import time
from threading import Thread
import traceback
from laborchestrator.logging_manager import StandardLogger as Logger
from typing import Dict
import networkx as nx
from laborchestrator.structures import (
SchedulingInstance, MoveStep, StepStatus, Computation, Variable, IfNode, UsedDevice
)
from .schedule_manager import ScheduleManager
[docs]
class WFGManager:
jssp: SchedulingInstance
def __init__(self, jssp: SchedulingInstance, schedule_manager: ScheduleManager):
self.jssp = jssp
self.schedule_manager = schedule_manager
# the thread running frequent checks on the wfg
t = Thread(target=self._wfg_checker, daemon=True)
t.start()
[docs]
def _wfg_checker(self):
while True:
try:
# stop rescheduling until changes in the wfg are complete
self.schedule_manager.hold_rescheduling()
self._check_wfg()
except Exception as ex:
Logger.error(ex, traceback.print_exc())
finally:
self.schedule_manager.continue_rescheduling()
time.sleep(.2)
[docs]
def _check_wfg(self):
changed_something = True
while changed_something:
changed_something = False
try:
for idx, var in self.jssp.definite_var_by_id.items():
vars_ready = all([self.jssp.operable[name].status == StepStatus.FINISHED for name in var.prior])
waits = var.status == StepStatus.WAITING
if waits and vars_ready:
changed_something = True
self._set_variable(var, self.jssp.operable)
for idx, computation in self.jssp.definite_computation_by_id.items():
vars_ready = all(
[self.jssp.operable[name].status == StepStatus.FINISHED for name in computation.prior])
waits = computation.status == StepStatus.WAITING
if waits and vars_ready:
changed_something = True
self._do_computation(computation, self.jssp.operable)
for idx, if_node in self.jssp.definite_if_node_by_id.items():
vars_ready = all(
[self.jssp.operable[name].status == StepStatus.FINISHED for name in if_node.prior])
waits = if_node.status == StepStatus.WAITING
if waits and vars_ready:
changed_something = True
self._eval_if(if_node, self.jssp.operable)
except Exception as ex:
Logger.warning(f"workflow graph check failed: {traceback.print_exc()}")
self.set_origins()
[docs]
def set_origins(self):
"""
The origin device types of MoveJobs are convenient to have, but might change at runtime.
This method sets all the definite ones.
:return:
"""
jobs = self.jssp.definite_step_by_id
g = self.jssp.combined_wfg
topo = list(nx.topological_sort(g))
for idx, job in jobs.items():
if isinstance(job, MoveStep):
if job.origin_device is None:
ancestors = nx.ancestors(g, idx)
past_move_indices = [n for n in ancestors if n in jobs and isinstance(jobs[n], MoveStep) and
job.cont == jobs[n].cont]
if past_move_indices:
sorted_anc = sorted(past_move_indices, key=lambda u: topo.index(u))
last_move = jobs[sorted_anc[-1]]
# set the origin to where the container is after the last (at that point) movement
origin = last_move.target_device
else:
origin = self.jssp.container_info_by_name[job.cont].start_device
job.used_devices.append(UsedDevice(origin.device_type, tag='origin', preferred=origin.preferred))
self.complete_preferences()
[docs]
def complete_preferences(self):
"""
Writes the preferences of for used devices into origin and destination of adjacent MoveSteps
(if uniquely determined). This method is just for convenience.
:return:
"""
J = self.jssp.definite_step_by_id
for step in J.values():
prior_jobs = [idx_o for idx_o in step.prior if idx_o in J]
if len(prior_jobs) == 1:
# in this it is clear, what the last step will have been
step_o = J[prior_jobs[0]]
if isinstance(step, MoveStep) and not isinstance(step_o, MoveStep):
if step_o.main_device.preferred != step.main_device.preferred: # avoid barcode_read steps
step.origin_device.preferred = step_o.main_device.preferred
if not isinstance(step, MoveStep) and isinstance(step_o, MoveStep):
step_o.target_device.preferred = step.main_device.preferred
if step.is_start and isinstance(step, MoveStep):
cont = self.jssp.container_info_by_name[step.cont]
step.origin_device.preferred = cont.start_device.name
[docs]
def _eval_if(self, if_node: IfNode, operable: Dict[str, Variable]):
try:
kwargs = {}
for idx in if_node.prior:
var = operable[idx]
if isinstance(var, Variable) or isinstance(var, Computation):
kwargs[var.var_name] = var.result
Logger.debug(f"input for evaluation of {if_node.name} is: {kwargs}")
Logger.info(f"input for evaluation of {if_node.name} is: {kwargs}")
# todo this is preliminary and will be done properly when variables are handled properly
try:
if_node.decision = bool(if_node.evaluation(**kwargs))
except Exception as ex:
Logger.error(f"evaluation of if_node {if_node.name} failed: {ex}\n{traceback.print_exc()}")
Logger.warning("Setting decision to False (better than no decision at all)")
# A wrong decision is better than no decision
if_node.decision = False
Logger.info(f"decision is {if_node.decision}")
if_node.status = StepStatus.FINISHED
# consider the effect on the subtrees in the wfg
changed = []
nodes = self.jssp.operable
# set the opacities of the direct successor nodes to 0 or 1 according to the made decision
for idx in if_node.false_tree:
if if_node.decision:
# if there is no other connection to that node, it will never be visited
if nodes[idx].prior == [if_node.name]:
nodes[idx].opacity = 0
changed.append(idx)
# remove the precedence constraint to the if-node
nodes[idx].prior.remove(if_node.name)
else:
nodes[idx].opacity = 1
if not isinstance(nodes[idx], IfNode):
changed.append(idx)
for idx in if_node.true_tree:
if if_node.decision:
nodes[idx].opacity = 1
changed.append(idx)
else:
if nodes[idx].prior == [if_node.name]:
nodes[idx].opacity = 0
changed.append(idx)
nodes[idx].prior.remove(if_node.name)
# set the opacities of the indirect successor nodes
while len(changed) > 0:
newly_changed = []
for idx, node in nodes.items():
if node.opacity < 1:
priors_changed = any(i in changed for i in node.prior)
if priors_changed:
# calculate the maximum opacity of predecessors (if nodes count half)
max_prior = 0
for i in node.prior:
if isinstance(nodes[i], IfNode):
max_prior = max(max_prior, nodes[i].opacity/2)
else:
max_prior = max(max_prior, nodes[i].opacity)
# change the opacity if necessary
if not max_prior == node.opacity:
Logger.warning(f"setting opacity of {node.name} to {max_prior}")
node.opacity = max_prior
# we must not set the opacity of steps after an if_node to 1
if not (isinstance(nodes[idx], IfNode) and max_prior == 1):
newly_changed.append(idx)
changed = newly_changed
self.jssp.update_reagent_opacity()
# clean up references to nodes that will not be operated
for node in nodes.values():
# remove obsolete priors
node.prior = [idx for idx in node.prior if nodes[idx].opacity > 0]
# remove nodes that will never be operated
if node.opacity == 0:
self.jssp.remove_operable(node.name)
self.schedule_manager.mark_schedule_invalid()
except Exception as ex:
Logger.error(f"evaluation of if_node {if_node.name} failed: {ex}\n{traceback.print_exc()}")
if_node.status = StepStatus.ERROR
[docs]
@staticmethod
def _set_variable(var: Variable, operable):
try:
assert len(var.prior) == 1
var.result = operable[var.prior[0]].result
var.status = StepStatus.FINISHED
except Exception as ex:
Logger.error(f"setting of variable {var.name} failed {ex}\n{traceback.print_exc()}")
var.status = StepStatus.ERROR
[docs]
@staticmethod
def _do_computation(computation: Computation, operable):
try:
kwargs = {}
for idx in computation.prior:
var = operable[idx]
kwargs[var.var_name] = var.result
computation.result = computation.evaluation(**kwargs)
computation.status = StepStatus.FINISHED
except Exception as ex:
Logger.error(f"evaluation of computation {computation.name} failed: {ex}\n{traceback.print_exc()}"
f"Setting result to None")
# setting this to None and FINISHED is better than error, since such an error is hard to resolve
computation.result = None
computation.status = StepStatus.FINISHED