Source code for laborchestrator.engine.worker_observer

"""
This class observes running process steps. When delays or errors occur, jobs get finished the WFGManager,
WorkerInterface or ScheduleManager are notified accordingly.
"""
from laborchestrator.structures import SchedulingInstance, StepStatus, ProcessStep
from laborchestrator.engine import WFGManager, ScheduleManager, WorkerInterface
import time
from threading import Thread
from datetime import datetime
import traceback
from laborchestrator.logging_manager import StandardLogger as Logger
from typing import Tuple, Set


[docs] class WorkerObserver: worker: WorkerInterface wfg_manager: WFGManager schedule_manager: ScheduleManager jssp: SchedulingInstance observed_jobs: Set[str] def __init__(self, wfg_manager: WFGManager, schedule_manager: ScheduleManager, jssp: SchedulingInstance, worker: WorkerInterface): self.jssp = jssp self.wfg_manager = wfg_manager self.schedule_manager = schedule_manager self.worker = worker self.observe_thread = Thread(daemon=True, target=self.observe) self.observe_thread.start() self.observed_jobs = set()
[docs] def observe(self): """ Master thread :return: """ while True: try: time.sleep(.2) steps_to_observe = list(self.worker.observation_handlers.keys()) for step_id in steps_to_observe: # check for new started jobs if step_id not in self.observed_jobs: # start a thread to observe this job Thread(daemon=True, target=self._observe_protocol, args=[step_id]).start() except Exception as ex: Logger.warning(f"Worker got an exception: {ex}, {traceback.print_exc()}")
[docs] def _observe_protocol(self, step_id: str): Logger.debug(f"start observing execution of {step_id}") self.observed_jobs.add(step_id) Logger.info(f"start observing execution of {step_id}. now: {self.worker.observation_handlers[step_id].status}") protocol_info = self.worker.observation_handlers[step_id] job = self.jssp.step_by_id[step_id] while True: try: if step_id not in self.observed_jobs: # this happens after error recovery. In this case, we interrupt without doing anything Logger.info(f"There seems to have happened a error recovery of step {step_id}. Stopping to observer it.") return # this happens if the job's status gets manipulated from somewhere else (e.g. error-recovery) if job.status == StepStatus.FINISHED: self.worker.process_step_finished(step_id, job.result) break # get the current status from the Observable protocol_info status = StepStatus(protocol_info.status.value) if status == StepStatus.RUNNING: job.status = status # reschedule if the job takes longer than expected delayed, duration_increase = self.is_delayed_significantly(job) if delayed: Logger.warning(f"job {job.name} is delayed by {(datetime.today() - job.start).total_seconds() - job.duration}") job.duration += duration_increase # jobs taking longer than expected can lead to serious problems if schedule is not adapted self.schedule_manager.mark_schedule_invalid() if status == StepStatus.ERROR: job.status = status self.jssp.container_info_by_name[job.cont].in_error_state = True self.schedule_manager.mark_schedule_invalid() # save the error message for the GUI try: job.result = protocol_info.get_responses() except Exception as ex: res = str(ex) Logger.error(f"error retrieving response: {res}") job.result = res # stop observing this step break if status == StepStatus.FINISHED: job.finish = datetime.today() predicted_duration = job.duration job.duration = (job.finish-job.start).total_seconds() # reschedule if we saved significantly much time if job.duration + 2*self.schedule_manager.time_limit_short < predicted_duration: self.schedule_manager.mark_schedule_suboptimal() # handle job effect on container_info try: job.result = protocol_info.get_responses() except Exception as ex: job.result = str(ex)+str(traceback.print_exc()) Logger.error(ex, traceback.print_exc()) self.worker.process_step_finished(step_id, job.result) job.status = status # do this AFTER the effect of the step was calculated, # or other steps might get started with insufficient information break time.sleep(.5) except Exception as ex: Logger.error(ex, traceback.print_exc())
[docs] def is_delayed_significantly(self, job: ProcessStep) -> Tuple[bool, float]: delay = (datetime.today() - job.start).total_seconds() - job.duration offset = self.schedule_manager.time_limit_short delayed = delay > 1 duration_increase = 2 * offset return delayed, duration_increase