"""
An interface for a worker, that the worker server and its features communicate with.
"""
from __future__ import annotations
import time
import traceback
from typing import Optional, Union, Dict, List, NamedTuple, Any, Tuple
from threading import Thread
from datetime import datetime
from abc import ABC
from datetime import timedelta
import inspect
import asyncio
from sila2.client import ClientObservableCommandInstance
from sila2.framework import CommandExecutionStatus
from laborchestrator.structures import (
SchedulingInstance, ProcessStep, ScheduledAssignment, StepStatus, MoveStep, SMProcess)
from laborchestrator.engine import ScheduleManager
from laborchestrator.logging_manager import StandardLogger as Logger
from laborchestrator.database_integration import StatusDBInterface
[docs]
class WorkerInterface:
# this serves as an easy point to process step stati and results
observation_handlers: Dict[str, Observable]
simulation_mode: bool = False
db_client: StatusDBInterface
def __init__(self, jssp: SchedulingInstance, schedule_manager: ScheduleManager, db_client: StatusDBInterface):
self.observation_handlers = {}
self.jssp = jssp
self.schedule_manager = schedule_manager
self.db_client = db_client
# start the dash app to control the worker
Thread(daemon=True, target=self._work).start()
[docs]
def _work(self):
while True:
try:
# it is safer to sort the assignments by start time. It might be, that due to some computational delays
# there are more jobs due than a device can handle. In that case, we should start the one that's
# scheduled first
if self.schedule_manager.schedule_executable():
schedule = self.jssp.schedule
sorted_schedule = sorted(schedule.keys(), key=lambda idx: schedule[idx].start)
for step_id in sorted_schedule:
if step_id not in self.jssp.step_by_id:
continue
step = self.jssp.step_by_id[step_id]
# check whether all prerequisites for the executing the step are fulfilled
if self.job_is_due(step, schedule[step_id]):
# mark the step as started now
step.status = StepStatus.RUNNING
step.start = datetime.today()
# for movement steps, this is the point where we decide the position in destination device
if isinstance(step, MoveStep):
labware = self.jssp.container_info_by_name[step.cont]
# the position is unknown in the beginning of the experiment
if not step.origin_device.name == labware.current_device:
Logger.warning(f"the predicted place ({step.origin_device}) and the labware "
f"current position({labware.current_device}) diverged."
f"using the labware position")
step.origin_device.name = labware.current_device
step.origin_pos = labware.current_pos
step.destination_pos = self.determine_destination_position(step)
# call the execution interface method for step execution
try:
if self.simulation_mode:
observable = self.simulate_process_step(step_id, schedule[step_id].device, step.data)
else:
observable = self.execute_process_step(step_id, schedule[step_id].device, step.data)
if step_id in self.observation_handlers:
Logger.warning(f"There is already a observable handler for step {step_id}.")
self.observation_handlers[step_id] = observable
except Exception as ex:
Logger.error(f"failed to start execution of step {step_id}:{ex}"
f"\n{traceback.print_exc()}")
except Exception as ex:
Logger.error(ex, traceback.print_exc())
time.sleep(.5)
[docs]
def check_prerequisites(self, process: SMProcess) -> Tuple[bool, str]:
"""
This method will be called when a process is started (Not when it is resumed)
:param process: The process object, that just started
:return: A report (as string) of problems found
"""
message = f"No problems observed for {process.name}. The process can start"
return True, message
[docs]
def execute_process_step(self, step_id: str, device: str, device_kwargs: Dict[str, Any]) -> Observable:
"""
Gets called, when the time for step has come, all prerequisites for the step are fulfilled and when the
assigned device has capacity. Overwrite it, to make something happen.
:param device_kwargs: arguments to be forwarded to the server
:param step_id:
:param device:
:return:
"""
Logger.info(f"It is time to start process step {step_id} on {device} ")
# todo inherit from this method to actually make something happen (other than simulation)
return self.simulate_process_step(step_id, device, device_kwargs)
[docs]
def simulate_process_step(self, step_id: str, device: str, device_kwargs: Dict[str, Any]) -> Observable:
observable = DummyHandler(duration=max(self.jssp.definite_step_by_id[step_id].duration / 10 - .5, 1))
observable.run_protocol(None)
return observable
[docs]
def process_step_finished(self, step_id: str, result: Optional[NamedTuple]):
"""
Gets called, when the corresponding step finished. Overwrite it, to make something happen.
:param step_id:
:param result: Might be None
:return:
"""
Logger.info(f"step {step_id} has finished")
if step_id not in self.jssp.step_by_id:
Logger.error(f"Step {step_id} seems to have been removed from the orchestrator")
return
# save the relocation to the database and the container_info
step = self.jssp.step_by_id[step_id]
experiment_uuid = self.jssp.process_by_name[step.process_name].experiment_uuid
for cont_name in step.cont_names:
container = self.jssp.container_info_by_name[cont_name]
self.db_client.safe_step_to_db(step, container, experiment_uuid)
if isinstance(step, MoveStep):
container.current_device = step.target_device.name
container.current_pos = step.destination_pos
cont_in_db = self.db_client.get_cont_info_by_barcode(container.barcode)
if cont_in_db:
origin_device, origin_pos = cont_in_db.current_device, cont_in_db.current_pos
if not (origin_device, origin_pos) == (step.origin_device.name, step.origin_pos):
Logger.warning(f"database and wfg have diverged. db: {origin_device, origin_pos}, wfg: {step}")
else:
origin_device, origin_pos = step.origin_device.name, step.origin_pos
Logger.debug("tell the database, that we move a container")
self.db_client.moved_container(origin_device, origin_pos,
step.target_device.name, step.destination_pos,
barcode=container.barcode)
[docs]
def job_is_due(self, step: ProcessStep, assignment: ScheduledAssignment):
"""
checks whether the given job should be stated now
:param step: the job to investigate
:param assignment: the job's scheduled assignment to time and device
:return:
"""
# check whether the job is still waiting
if not step.status == StepStatus.WAITING:
return False
# check whether the schedule says, that it's time to start
if assignment.start >= datetime.today():
return False
# check whether the corresponding process is running or was removed
if step.process_name not in self.jssp.running_processes_names:
return False
# check whether all necessary prior jobs are done (container and machine precedences)
operable = self.jssp.operable
prior = step.prior + assignment.machine_prior
if not all(operable[idx_o].status == StepStatus.FINISHED for idx_o in prior):
return False
return True
[docs]
def determine_destination_position(self, step: MoveStep) -> Optional[int]:
"""
The position in the destination device is set at runtime according to free space.
By default, it is the next free one or a given preference (if that is free)
:param step:
:return: index of the position or None if none is available
"""
device_name = step.target_device.name
# get all slots in the device
device_slots = self.db_client.get_all_positions(device_name)
# if there is a preference and that slot is empty, take that
if step.pref_dest_pos:
if self.db_client.position_empty(device_name, step.pref_dest_pos):
return step.pref_dest_pos
else:
Logger.warning(f"Container {step.cont} should go to slot {step.pref_dest_pos}, but there is"
f"{self.db_client.get_container_at_position(device_name, step.pref_dest_pos)}")
for slot in device_slots:
# take the first empty slot
if self.db_client.position_empty(device_name, slot):
return slot
# return None, if there is no empty slot
return None
[docs]
class ObservableProtocolHandler(ABC):
"""
The class is supposed be an interface for protocols consisting of different SiLA commands or non-SilA commands,
so they can be treated the same way as observable SiLA-commands.
"""
def __init__(self):
self._status = CommandExecutionStatus.waiting
self.response = None
[docs]
def run_protocol(self, client: Any, **kwargs)\
-> Union[ObservableProtocolHandler, ClientObservableCommandInstance]:
"""
:return:
"""
Thread(daemon=True, target=self._run_protocol, args=[client], kwargs=kwargs).start()
return self
[docs]
def _run_protocol(self, client, **kwargs):
self._status = CommandExecutionStatus.running
try:
if inspect.iscoroutinefunction(self._protocol):
Logger.debug("running protocol in async fashion")
asyncio.run(self._protocol(client, **kwargs))
else:
self._protocol(client, **kwargs)
except Exception as ex:
self._status = CommandExecutionStatus.finishedWithError
Logger.error(f"protocol error: {ex}\n", traceback.print_exc())
self.response = f"protocol error: {ex}\n", traceback.print_exc()
return
self._status = CommandExecutionStatus.finishedSuccessfully
[docs]
def _protocol(self, client, **kwargs):
""" This is where protocols should be defined, that can not be written as a single observable command. """
pass
@property
def status(self) -> CommandExecutionStatus:
"""
provides the current status of protocol execution
:return: 0,1,2,3 for waiting, running, success, error
"""
if isinstance(self._status, CommandExecutionStatus):
return self._status
return CommandExecutionStatus(self._status)
[docs]
def get_remaining_time(self) -> timedelta:
"""
provides the remaining time of protocol execution
:return: remaining time in seconds (float)
"""
return timedelta(seconds=0)
[docs]
def get_responses(self):
return self.response
@property
def done(self):
return self._status in [CommandExecutionStatus.finishedSuccessfully, CommandExecutionStatus.finishedWithError]
# this comes from either a direct observable SiLA command or a wrapper with the same interface
# made for more complicated executions
Observable = Union[ClientObservableCommandInstance, ObservableProtocolHandler]
[docs]
class DummyHandler(ObservableProtocolHandler):
"""
A dummy implementation of ObservableProtocolHandler to simulate running real process steps
"""
def __init__(self, duration: float = 0):
super(DummyHandler, self).__init__()
self.duration = duration
[docs]
def _protocol(self, client, **kwargs):
time.sleep(self.duration)