Source code for laborchestrator.engine.worker_observer
"""
This class observes running process steps. When delays or errors occur, jobs get finished the WFGManager,
WorkerInterface or ScheduleManager are notified accordingly.
"""
from laborchestrator.structures import SchedulingInstance, StepStatus, ProcessStep
from laborchestrator.engine import WFGManager, ScheduleManager, WorkerInterface
import time
from threading import Thread
from datetime import datetime
import traceback
from laborchestrator.logging_manager import StandardLogger as Logger
from typing import Tuple, Set
[docs]
class WorkerObserver:
worker: WorkerInterface
wfg_manager: WFGManager
schedule_manager: ScheduleManager
jssp: SchedulingInstance
observed_jobs: Set[str]
def __init__(self, wfg_manager: WFGManager, schedule_manager: ScheduleManager, jssp: SchedulingInstance,
worker: WorkerInterface):
self.jssp = jssp
self.wfg_manager = wfg_manager
self.schedule_manager = schedule_manager
self.worker = worker
self.observe_thread = Thread(daemon=True, target=self.observe)
self.observe_thread.start()
self.observed_jobs = set()
[docs]
def observe(self):
"""
Master thread
:return:
"""
while True:
try:
time.sleep(.2)
steps_to_observe = list(self.worker.observation_handlers.keys())
for step_id in steps_to_observe:
# check for new started jobs
if step_id not in self.observed_jobs:
# start a thread to observe this job
Thread(daemon=True, target=self._observe_protocol, args=[step_id]).start()
except Exception as ex:
Logger.warning(f"Worker got an exception: {ex}, {traceback.print_exc()}")
[docs]
def _observe_protocol(self, step_id: str):
Logger.debug(f"start observing execution of {step_id}")
self.observed_jobs.add(step_id)
Logger.info(f"start observing execution of {step_id}. now: {self.worker.observation_handlers[step_id].status}")
protocol_info = self.worker.observation_handlers[step_id]
job = self.jssp.step_by_id[step_id]
while True:
try:
if step_id not in self.observed_jobs:
# this happens after error recovery. In this case, we interrupt without doing anything
Logger.info(f"There seems to have happened a error recovery of step {step_id}. Stopping to observer it.")
return
# this happens if the job's status gets manipulated from somewhere else (e.g. error-recovery)
if job.status == StepStatus.FINISHED:
self.worker.process_step_finished(step_id, job.result)
break
# get the current status from the Observable protocol_info
status = StepStatus(protocol_info.status.value)
if status == StepStatus.RUNNING:
job.status = status
# reschedule if the job takes longer than expected
delayed, duration_increase = self.is_delayed_significantly(job)
if delayed:
Logger.warning(f"job {job.name} is delayed by {(datetime.today() - job.start).total_seconds() - job.duration}")
job.duration += duration_increase
# jobs taking longer than expected can lead to serious problems if schedule is not adapted
self.schedule_manager.mark_schedule_invalid()
if status == StepStatus.ERROR:
job.status = status
self.jssp.container_info_by_name[job.cont].in_error_state = True
self.schedule_manager.mark_schedule_invalid()
# save the error message for the GUI
try:
job.result = protocol_info.get_responses()
except Exception as ex:
res = str(ex)
Logger.error(f"error retrieving response: {res}")
job.result = res
# stop observing this step
break
if status == StepStatus.FINISHED:
job.finish = datetime.today()
predicted_duration = job.duration
job.duration = (job.finish-job.start).total_seconds()
# reschedule if we saved significantly much time
if job.duration + 2*self.schedule_manager.time_limit_short < predicted_duration:
self.schedule_manager.mark_schedule_suboptimal()
# handle job effect on container_info
try:
job.result = protocol_info.get_responses()
except Exception as ex:
job.result = str(ex)+str(traceback.print_exc())
Logger.error(ex, traceback.print_exc())
self.worker.process_step_finished(step_id, job.result)
job.status = status # do this AFTER the effect of the step was calculated,
# or other steps might get started with insufficient information
break
time.sleep(.5)
except Exception as ex:
Logger.error(ex, traceback.print_exc())
[docs]
def is_delayed_significantly(self, job: ProcessStep) -> Tuple[bool, float]:
delay = (datetime.today() - job.start).total_seconds() - job.duration
offset = self.schedule_manager.time_limit_short
delayed = delay > 1
duration_increase = 2 * offset
return delayed, duration_increase