Source code for laborchestrator.engine.worker_interface

"""
An interface for a worker, that the worker server and its features communicate with.
"""
from __future__ import annotations
import time
import traceback
from typing import Optional, Union, Dict, List, NamedTuple, Any, Tuple
from threading import Thread
from datetime import datetime
from abc import ABC
from datetime import timedelta
import inspect
import asyncio

from sila2.client import ClientObservableCommandInstance
from sila2.framework import CommandExecutionStatus
from laborchestrator.structures import (
    SchedulingInstance, ProcessStep, ScheduledAssignment, StepStatus, MoveStep, SMProcess)
from laborchestrator.engine import ScheduleManager
from laborchestrator.logging_manager import StandardLogger as Logger
from laborchestrator.database_integration import StatusDBInterface


[docs] class WorkerInterface: # this serves as an easy point to process step stati and results observation_handlers: Dict[str, Observable] simulation_mode: bool = False db_client: StatusDBInterface def __init__(self, jssp: SchedulingInstance, schedule_manager: ScheduleManager, db_client: StatusDBInterface): self.observation_handlers = {} self.jssp = jssp self.schedule_manager = schedule_manager self.db_client = db_client # start the dash app to control the worker Thread(daemon=True, target=self._work).start()
[docs] def _work(self): while True: try: # it is safer to sort the assignments by start time. It might be, that due to some computational delays # there are more jobs due than a device can handle. In that case, we should start the one that's # scheduled first if self.schedule_manager.schedule_executable(): schedule = self.jssp.schedule sorted_schedule = sorted(schedule.keys(), key=lambda idx: schedule[idx].start) for step_id in sorted_schedule: if step_id not in self.jssp.step_by_id: continue step = self.jssp.step_by_id[step_id] # check whether all prerequisites for the executing the step are fulfilled if self.job_is_due(step, schedule[step_id]): # mark the step as started now step.status = StepStatus.RUNNING step.start = datetime.today() # for movement steps, this is the point where we decide the position in destination device if isinstance(step, MoveStep): labware = self.jssp.container_info_by_name[step.cont] # the position is unknown in the beginning of the experiment if not step.origin_device.name == labware.current_device: Logger.warning(f"the predicted place ({step.origin_device}) and the labware " f"current position({labware.current_device}) diverged." f"using the labware position") step.origin_device.name = labware.current_device step.origin_pos = labware.current_pos step.destination_pos = self.determine_destination_position(step) # call the execution interface method for step execution try: if self.simulation_mode: observable = self.simulate_process_step(step_id, schedule[step_id].device, step.data) else: observable = self.execute_process_step(step_id, schedule[step_id].device, step.data) if step_id in self.observation_handlers: Logger.warning(f"There is already a observable handler for step {step_id}.") self.observation_handlers[step_id] = observable except Exception as ex: Logger.error(f"failed to start execution of step {step_id}:{ex}" f"\n{traceback.print_exc()}") except Exception as ex: Logger.error(ex, traceback.print_exc()) time.sleep(.5)
[docs] def check_prerequisites(self, process: SMProcess) -> Tuple[bool, str]: """ This method will be called when a process is started (Not when it is resumed) :param process: The process object, that just started :return: A report (as string) of problems found """ message = f"No problems observed for {process.name}. The process can start" return True, message
[docs] def execute_process_step(self, step_id: str, device: str, device_kwargs: Dict[str, Any]) -> Observable: """ Gets called, when the time for step has come, all prerequisites for the step are fulfilled and when the assigned device has capacity. Overwrite it, to make something happen. :param device_kwargs: arguments to be forwarded to the server :param step_id: :param device: :return: """ Logger.info(f"It is time to start process step {step_id} on {device} ") # todo inherit from this method to actually make something happen (other than simulation) return self.simulate_process_step(step_id, device, device_kwargs)
[docs] def simulate_process_step(self, step_id: str, device: str, device_kwargs: Dict[str, Any]) -> Observable: observable = DummyHandler(duration=max(self.jssp.definite_step_by_id[step_id].duration / 10 - .5, 1)) observable.run_protocol(None) return observable
[docs] def process_step_finished(self, step_id: str, result: Optional[NamedTuple]): """ Gets called, when the corresponding step finished. Overwrite it, to make something happen. :param step_id: :param result: Might be None :return: """ Logger.info(f"step {step_id} has finished") if step_id not in self.jssp.step_by_id: Logger.error(f"Step {step_id} seems to have been removed from the orchestrator") return # save the relocation to the database and the container_info step = self.jssp.step_by_id[step_id] experiment_uuid = self.jssp.process_by_name[step.process_name].experiment_uuid for cont_name in step.cont_names: container = self.jssp.container_info_by_name[cont_name] self.db_client.safe_step_to_db(step, container, experiment_uuid) if isinstance(step, MoveStep): container.current_device = step.target_device.name container.current_pos = step.destination_pos cont_in_db = self.db_client.get_cont_info_by_barcode(container.barcode) if cont_in_db: origin_device, origin_pos = cont_in_db.current_device, cont_in_db.current_pos if not (origin_device, origin_pos) == (step.origin_device.name, step.origin_pos): Logger.warning(f"database and wfg have diverged. db: {origin_device, origin_pos}, wfg: {step}") else: origin_device, origin_pos = step.origin_device.name, step.origin_pos Logger.debug("tell the database, that we move a container") self.db_client.moved_container(origin_device, origin_pos, step.target_device.name, step.destination_pos, barcode=container.barcode)
[docs] def job_is_due(self, step: ProcessStep, assignment: ScheduledAssignment): """ checks whether the given job should be stated now :param step: the job to investigate :param assignment: the job's scheduled assignment to time and device :return: """ # check whether the job is still waiting if not step.status == StepStatus.WAITING: return False # check whether the schedule says, that it's time to start if assignment.start >= datetime.today(): return False # check whether the corresponding process is running or was removed if step.process_name not in self.jssp.running_processes_names: return False # check whether all necessary prior jobs are done (container and machine precedences) operable = self.jssp.operable prior = step.prior + assignment.machine_prior if not all(operable[idx_o].status == StepStatus.FINISHED for idx_o in prior): return False return True
[docs] def determine_destination_position(self, step: MoveStep) -> Optional[int]: """ The position in the destination device is set at runtime according to free space. By default, it is the next free one or a given preference (if that is free) :param step: :return: index of the position or None if none is available """ device_name = step.target_device.name # get all slots in the device device_slots = self.db_client.get_all_positions(device_name) # if there is a preference and that slot is empty, take that if step.pref_dest_pos: if self.db_client.position_empty(device_name, step.pref_dest_pos): return step.pref_dest_pos else: Logger.warning(f"Container {step.cont} should go to slot {step.pref_dest_pos}, but there is" f"{self.db_client.get_container_at_position(device_name, step.pref_dest_pos)}") for slot in device_slots: # take the first empty slot if self.db_client.position_empty(device_name, slot): return slot # return None, if there is no empty slot return None
[docs] class ObservableProtocolHandler(ABC): """ The class is supposed be an interface for protocols consisting of different SiLA commands or non-SilA commands, so they can be treated the same way as observable SiLA-commands. """ def __init__(self): self._status = CommandExecutionStatus.waiting self.response = None
[docs] def run_protocol(self, client: Any, **kwargs)\ -> Union[ObservableProtocolHandler, ClientObservableCommandInstance]: """ :return: """ Thread(daemon=True, target=self._run_protocol, args=[client], kwargs=kwargs).start() return self
[docs] def _run_protocol(self, client, **kwargs): self._status = CommandExecutionStatus.running try: if inspect.iscoroutinefunction(self._protocol): Logger.debug("running protocol in async fashion") asyncio.run(self._protocol(client, **kwargs)) else: self._protocol(client, **kwargs) except Exception as ex: self._status = CommandExecutionStatus.finishedWithError Logger.error(f"protocol error: {ex}\n", traceback.print_exc()) self.response = f"protocol error: {ex}\n", traceback.print_exc() return self._status = CommandExecutionStatus.finishedSuccessfully
[docs] def _protocol(self, client, **kwargs): """ This is where protocols should be defined, that can not be written as a single observable command. """ pass
@property def status(self) -> CommandExecutionStatus: """ provides the current status of protocol execution :return: 0,1,2,3 for waiting, running, success, error """ if isinstance(self._status, CommandExecutionStatus): return self._status return CommandExecutionStatus(self._status)
[docs] def get_remaining_time(self) -> timedelta: """ provides the remaining time of protocol execution :return: remaining time in seconds (float) """ return timedelta(seconds=0)
[docs] def get_responses(self): return self.response
@property def done(self): return self._status in [CommandExecutionStatus.finishedSuccessfully, CommandExecutionStatus.finishedWithError]
# this comes from either a direct observable SiLA command or a wrapper with the same interface # made for more complicated executions Observable = Union[ClientObservableCommandInstance, ObservableProtocolHandler]
[docs] class DummyHandler(ObservableProtocolHandler): """ A dummy implementation of ObservableProtocolHandler to simulate running real process steps """ def __init__(self, duration: float = 0): super(DummyHandler, self).__init__() self.duration = duration
[docs] def _protocol(self, client, **kwargs): time.sleep(self.duration)