laborchestrator.engine.worker_interface module

An interface for a worker, that the worker server and its features communicate with.

class laborchestrator.engine.worker_interface.DummyHandler(duration: float = 0)[source]

Bases: ObservableProtocolHandler

A dummy implementation of ObservableProtocolHandler to simulate running real process steps

_abc_impl = <_abc._abc_data object>
_protocol(client, **kwargs)[source]

This is where protocols should be defined, that can not be written as a single observable command.

class laborchestrator.engine.worker_interface.ObservableProtocolHandler[source]

Bases: ABC

The class is supposed be an interface for protocols consisting of different SiLA commands or non-SilA commands, so they can be treated the same way as observable SiLA-commands.

_abc_impl = <_abc._abc_data object>
_protocol(client, **kwargs)[source]

This is where protocols should be defined, that can not be written as a single observable command.

_run_protocol(client, **kwargs)[source]
property done
get_remaining_time() timedelta[source]

provides the remaining time of protocol execution :return: remaining time in seconds (float)

get_responses()[source]
run_protocol(client: Any, **kwargs) ObservableProtocolHandler | ClientObservableCommandInstance[source]
Returns:

property status: CommandExecutionStatus

provides the current status of protocol execution :return: 0,1,2,3 for waiting, running, success, error

class laborchestrator.engine.worker_interface.WorkerInterface(jssp: SchedulingInstance, schedule_manager: ScheduleManager, db_client: StatusDBInterface)[source]

Bases: object

_work()[source]
check_prerequisites(process: SMProcess) Tuple[bool, str][source]

This method will be called when a process is started (Not when it is resumed) :param process: The process object, that just started :return: A report (as string) of problems found

db_client: StatusDBInterface
determine_destination_position(step: MoveStep) int | None[source]

The position in the destination device is set at runtime according to free space. By default, it is the next free one or a given preference (if that is free) :param step: :return: index of the position or None if none is available

execute_process_step(step_id: str, device: str, device_kwargs: Dict[str, Any]) ClientObservableCommandInstance | ObservableProtocolHandler[source]

Gets called, when the time for step has come, all prerequisites for the step are fulfilled and when the assigned device has capacity. Overwrite it, to make something happen. :param device_kwargs: arguments to be forwarded to the server :param step_id: :param device: :return:

job_is_due(step: ProcessStep, assignment: ScheduledAssignment)[source]

checks whether the given job should be stated now :param step: the job to investigate :param assignment: the job’s scheduled assignment to time and device :return:

observation_handlers: Dict[str, ClientObservableCommandInstance | ObservableProtocolHandler]
process_step_finished(step_id: str, result: NamedTuple | None)[source]

Gets called, when the corresponding step finished. Overwrite it, to make something happen. :param step_id: :param result: Might be None :return:

simulate_process_step(step_id: str, device: str, device_kwargs: Dict[str, Any]) ClientObservableCommandInstance | ObservableProtocolHandler[source]
simulation_mode: bool = False