Source code for laborchestrator.engine.wfg_manager

import time
from threading import Thread
import traceback
from laborchestrator.logging_manager import StandardLogger as Logger
from typing import Dict
import networkx as nx
from laborchestrator.structures import (
    SchedulingInstance, MoveStep, StepStatus, Computation, Variable, IfNode, UsedDevice
)
from .schedule_manager import ScheduleManager


[docs] class WFGManager: jssp: SchedulingInstance def __init__(self, jssp: SchedulingInstance, schedule_manager: ScheduleManager): self.jssp = jssp self.schedule_manager = schedule_manager # the thread running frequent checks on the wfg t = Thread(target=self._wfg_checker, daemon=True) t.start()
[docs] def _wfg_checker(self): while True: try: # stop rescheduling until changes in the wfg are complete self.schedule_manager.hold_rescheduling() self._check_wfg() except Exception as ex: Logger.error(ex, traceback.print_exc()) finally: self.schedule_manager.continue_rescheduling() time.sleep(.2)
[docs] def _check_wfg(self): changed_something = True while changed_something: changed_something = False try: for idx, var in self.jssp.definite_var_by_id.items(): vars_ready = all([self.jssp.operable[name].status == StepStatus.FINISHED for name in var.prior]) waits = var.status == StepStatus.WAITING if waits and vars_ready: changed_something = True self._set_variable(var, self.jssp.operable) for idx, computation in self.jssp.definite_computation_by_id.items(): vars_ready = all( [self.jssp.operable[name].status == StepStatus.FINISHED for name in computation.prior]) waits = computation.status == StepStatus.WAITING if waits and vars_ready: changed_something = True self._do_computation(computation, self.jssp.operable) for idx, if_node in self.jssp.definite_if_node_by_id.items(): vars_ready = all( [self.jssp.operable[name].status == StepStatus.FINISHED for name in if_node.prior]) waits = if_node.status == StepStatus.WAITING if waits and vars_ready: changed_something = True self._eval_if(if_node, self.jssp.operable) except Exception as ex: Logger.warning(f"workflow graph check failed: {traceback.print_exc()}") self.set_origins()
[docs] def set_origins(self): """ The origin device types of MoveJobs are convenient to have, but might change at runtime. This method sets all the definite ones. :return: """ jobs = self.jssp.definite_step_by_id g = self.jssp.combined_wfg topo = list(nx.topological_sort(g)) for idx, job in jobs.items(): if isinstance(job, MoveStep): if job.origin_device is None: ancestors = nx.ancestors(g, idx) past_move_indices = [n for n in ancestors if n in jobs and isinstance(jobs[n], MoveStep) and job.cont == jobs[n].cont] if past_move_indices: sorted_anc = sorted(past_move_indices, key=lambda u: topo.index(u)) last_move = jobs[sorted_anc[-1]] # set the origin to where the container is after the last (at that point) movement origin = last_move.target_device else: origin = self.jssp.container_info_by_name[job.cont].start_device job.used_devices.append(UsedDevice(origin.device_type, tag='origin', preferred=origin.preferred)) self.complete_preferences()
[docs] def complete_preferences(self): """ Writes the preferences of for used devices into origin and destination of adjacent MoveSteps (if uniquely determined). This method is just for convenience. :return: """ J = self.jssp.definite_step_by_id for step in J.values(): prior_jobs = [idx_o for idx_o in step.prior if idx_o in J] if len(prior_jobs) == 1: # in this it is clear, what the last step will have been step_o = J[prior_jobs[0]] if isinstance(step, MoveStep) and not isinstance(step_o, MoveStep): if step_o.main_device.preferred != step.main_device.preferred: # avoid barcode_read steps step.origin_device.preferred = step_o.main_device.preferred if not isinstance(step, MoveStep) and isinstance(step_o, MoveStep): step_o.target_device.preferred = step.main_device.preferred if step.is_start and isinstance(step, MoveStep): cont = self.jssp.container_info_by_name[step.cont] step.origin_device.preferred = cont.start_device.name
[docs] def _eval_if(self, if_node: IfNode, operable: Dict[str, Variable]): try: kwargs = {} for idx in if_node.prior: var = operable[idx] if isinstance(var, Variable) or isinstance(var, Computation): kwargs[var.var_name] = var.result Logger.debug(f"input for evaluation of {if_node.name} is: {kwargs}") Logger.info(f"input for evaluation of {if_node.name} is: {kwargs}") # todo this is preliminary and will be done properly when variables are handled properly try: if_node.decision = bool(if_node.evaluation(**kwargs)) except Exception as ex: Logger.error(f"evaluation of if_node {if_node.name} failed: {ex}\n{traceback.print_exc()}") Logger.warning("Setting decision to False (better than no decision at all)") # A wrong decision is better than no decision if_node.decision = False Logger.info(f"decision is {if_node.decision}") if_node.status = StepStatus.FINISHED # consider the effect on the subtrees in the wfg changed = [] nodes = self.jssp.operable # set the opacities of the direct successor nodes to 0 or 1 according to the made decision for idx in if_node.false_tree: if if_node.decision: # if there is no other connection to that node, it will never be visited if nodes[idx].prior == [if_node.name]: nodes[idx].opacity = 0 changed.append(idx) # remove the precedence constraint to the if-node nodes[idx].prior.remove(if_node.name) else: nodes[idx].opacity = 1 if not isinstance(nodes[idx], IfNode): changed.append(idx) for idx in if_node.true_tree: if if_node.decision: nodes[idx].opacity = 1 changed.append(idx) else: if nodes[idx].prior == [if_node.name]: nodes[idx].opacity = 0 changed.append(idx) nodes[idx].prior.remove(if_node.name) # set the opacities of the indirect successor nodes while len(changed) > 0: newly_changed = [] for idx, node in nodes.items(): if node.opacity < 1: priors_changed = any(i in changed for i in node.prior) if priors_changed: # calculate the maximum opacity of predecessors (if nodes count half) max_prior = 0 for i in node.prior: if isinstance(nodes[i], IfNode): max_prior = max(max_prior, nodes[i].opacity/2) else: max_prior = max(max_prior, nodes[i].opacity) # change the opacity if necessary if not max_prior == node.opacity: Logger.warning(f"setting opacity of {node.name} to {max_prior}") node.opacity = max_prior # we must not set the opacity of steps after an if_node to 1 if not (isinstance(nodes[idx], IfNode) and max_prior == 1): newly_changed.append(idx) changed = newly_changed self.jssp.update_reagent_opacity() # clean up references to nodes that will not be operated for node in nodes.values(): # remove obsolete priors node.prior = [idx for idx in node.prior if nodes[idx].opacity > 0] # remove nodes that will never be operated if node.opacity == 0: self.jssp.remove_operable(node.name) self.schedule_manager.mark_schedule_invalid() except Exception as ex: Logger.error(f"evaluation of if_node {if_node.name} failed: {ex}\n{traceback.print_exc()}") if_node.status = StepStatus.ERROR
[docs] @staticmethod def _set_variable(var: Variable, operable): try: assert len(var.prior) == 1 var.result = operable[var.prior[0]].result var.status = StepStatus.FINISHED except Exception as ex: Logger.error(f"setting of variable {var.name} failed {ex}\n{traceback.print_exc()}") var.status = StepStatus.ERROR
[docs] @staticmethod def _do_computation(computation: Computation, operable): try: kwargs = {} for idx in computation.prior: var = operable[idx] kwargs[var.var_name] = var.result computation.result = computation.evaluation(**kwargs) computation.status = StepStatus.FINISHED except Exception as ex: Logger.error(f"evaluation of computation {computation.name} failed: {ex}\n{traceback.print_exc()}" f"Setting result to None") # setting this to None and FINISHED is better than error, since such an error is hard to resolve computation.result = None computation.status = StepStatus.FINISHED