Source code for laborchestrator.orchestrator_interface
from abc import ABC, ABCMeta, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from enum import Enum
# TODO: any other reader ?
[docs]
class ProcessExecutionState(Enum):
"""_State of a Process Execution_
TODO: check SiLA specification
:param Enum: _description_
:type Enum: _type_
"""
IDLE = 1
SCHEDULED = 5 # - do we need this ? stefan: might be useful
RUNNING = 2 # EXECUTING
PAUSED = 3
FINISHED = 4
[docs]
@dataclass
class ProcessDescription:
name: str
description: Optional[str] = None
file_path: Optional[str] = None
[docs]
class FormalOrchestratorConfigInterface(metaclass=ABCMeta):
""" Resources are all ... excluding labware"""
@classmethod
def __subclasshook__(cls, subclass):
# TODO: check logic
return (hasattr(subclass, 'add_lab_resources') and
callable(subclass.add_lab_resources) or
NotImplemented)
[docs]
@abstractmethod
def add_lab_resources_from_file(self, lab_env_filename: str):
"""Defines and configures the lab environment (types, names, functionalities, etc. of devices).
:param lab_env_filename: Name of the Lab Environment configuration file in YAML format,
according to the specification TODO:[@Mark: insert link]
:return:
"""
raise NotImplementedError
[docs]
@abstractmethod
def add_lab_resources_from_database(self, URI: str):
"""Defines and configures the lab environment (types, names, functionalities, etc. of devices).
:param URI: Name of the Lab Environment configuration file in YAML format,
according to the specification TODO:[@Mark: insert link]
:return:
"""
raise NotImplementedError
[docs]
class FormalProcessStepControllerInterface(metaclass=ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
# TODO: check logic
return (hasattr(subclass, 'add_lab_resources') and
callable(subclass.add_lab_resources) or
NotImplemented)
[docs]
@abstractmethod
def insert_process_step(self, process_step: Any,
parent_step_ids: List[str], child_step_ids: List[str], process_id: Optional[str] = None,
waiting_cost: Dict[str, float] = None, max_waiting_time: Dict[str, float] = None):
"""
Adds the given process step inbetween the given parents and children with the (optional) given time constraints.
If process_id is omitted, there is assumed to be at least one child or parent step. The new step is then
assigned to the same process
"""
raise NotImplementedError
[docs]
@abstractmethod
def interrupt_process_step(self, step_id: str):
"""
Stops the process step as soon as possible and considers it finished
"""
raise NotImplementedError
[docs]
@abstractmethod
def retry_process_step(self, step_id: str):
"""
TODO: add description
"""
raise NotImplementedError
[docs]
@abstractmethod
def remove_process_step(self, step_id: str):
"""
TODO: add description
"""
raise NotImplementedError
[docs]
@abstractmethod
def get_process_step_state(self, step_id: str):
"""
TODO: add description
"""
raise NotImplementedError
[docs]
@abstractmethod
def process_step_executed_externally(self, step_id: str, result):
"""
TODO: improve description
There are jobs that have to be done externally, e.g. by a human or external device.
Call this method to inform the scheduler a external process step has finished
the specified operation.
:param step_id: unique id of the step, a external entity was supposed to do
:param result: In case the job had some results, those should be given here
"""
raise NotImplementedError
[docs]
@abstractmethod
def reset_error_state(self, step_id: str, repeat_operation: bool = False):
"""TODO: improve description
In case an error need to be resolved manually, use this command afterwards to proceed with the process.
:param step_id: Unique identifier for an operation
:param repeat_operation: Boolean whether the operation shall be tried again
:return: nothing
"""
raise NotImplementedError
[docs]
class FormalProcessControllerInterface(metaclass=ABCMeta):
"""
A class for managing one or more processes in a laboratory. You can add processes (also while others are already
running), monitor their progress. The class incorporates an intelligent scheduler. It adapts the schedule to delays,
errors, manual interference and addition of processes. Before really adding a process, you can check what the new
schedule would look like. The progress can be observed via gantt charts or workflow graphs.
"""
# A class to read new processes. By default, it is a PythonLabReader
@classmethod
def __subclasshook__(cls, subclass):
# TODO: check logic
return (hasattr(subclass, 'process_reader') and
callable(subclass.process_reader) and
hasattr(subclass, 'add_process') and
callable(subclass.add_process) or
hasattr(subclass, 'processes') and
callable(subclass.processes) or
hasattr(subclass, 'start_process') and
callable(subclass.start_process) or
hasattr(subclass, 'start_all_processes') and
callable(subclass.start_all_processes) or
hasattr(subclass, 'stop_process') and
callable(subclass.stop_process) or
hasattr(subclass, 'stop_all_processes') and
callable(subclass.stop_all_processes) or
hasattr(subclass, 'pause_process') and
callable(subclass.pause_all_processes) or
hasattr(subclass, 'resume_process') and
callable(subclass.resume_process) or
hasattr(subclass, 'resume_all_processes') and
callable(subclass.resume_all_processes) or
hasattr(subclass, 'get_process_state') and
callable(subclass.get_process_state) or
hasattr(subclass, 'remove_process') and
callable(subclass.remove_process) or
hasattr(subclass, 'remove_all_process') and
callable(subclass.remove_all_process) or
hasattr(subclass, 'in_time') and
callable(subclass.in_time) or
hasattr(subclass, 'process_step_executed_externally') and
callable(subclass.process_step_executed_externally) or
hasattr(subclass, 'reset_error_state') and
callable(subclass.reset_error_state) or
hasattr(subclass, 'gantt_chart') and
callable(subclass.gantt_chart) or
hasattr(subclass, 'workflow_graph_visualisation') and
callable(subclass.workflow_graph_visualisation) or
NotImplemented)
[docs]
@abstractmethod
def select_process_reader(self, process_reader=ProcessReader.PYTHONLABREADER):
"""Dependency injection of the process reader
"""
[docs]
def get_parameter(self, param_name: str):
"""
Used as a flexible and easily extendable method to retrieve parameters of the orchestrator
"""
[docs]
def set_parameter(self, param_name: str, new_value):
"""
Used as a flexible and easily extendable method to set parameters of the orchestrator
"""
@property
@abstractmethod
def available_processes(self) -> List[ProcessDescription]:
"""
Checks the file-system(currently not implemented) or database for available saved processes.
All found processes are returned with the available information.
:return: A List of tuples [Name, description(if found), filepath(if found)] for each found process
"""
[docs]
@abstractmethod
def add_process(self, description: Optional[str] = None, file_path: Optional[str] = None,
name: Optional[str] = None) -> str:
"""Adds a process to be orchestrated, read by the selected process reader
You have to either specify a description (i.e. the file content) or a file location.
If no name is specified, the process will be named like P_2 (enumerated). If the given name is already taken,
it is added a suffix.
:return: process object
:raises ParsingError
"""
raise NotImplementedError
@property
@abstractmethod
def processes(self) -> List[ProcessInfo]:
"""_Lists all process info for all current processes_
:raises NotImplementedError: _description_
:return: _description_
:rtype: List[str]
"""
raise NotImplementedError
[docs]
@abstractmethod
def start_processes(self, process_names: List[str] = None) -> bool:
"""Starts the specified added process. This will cause an initial scheduling.
:return: bool if the process could be started
"""
raise NotImplementedError
[docs]
@abstractmethod
def stop_processes(self, process_names: List[str] = None) -> bool:
"""
Stops the specified process. All running operations will continue, but no follow up operation will be started.
:param process_names: Unique name of the process to stop
:return: bool, if process could be stopped ?
"""
raise NotImplementedError
[docs]
@abstractmethod
def pause_processes(self, process_names: List[str] = None) -> bool:
"""
Pausing a process of a given name
:return: bool, if process could be paused
"""
raise NotImplementedError
[docs]
@abstractmethod
def resume_processes(self, process_names: List[str] = None) -> bool:
"""
Resume form Pausing a process of a given name
:return: bool, if all process could be resumed
"""
raise NotImplementedError
[docs]
@abstractmethod
def restart_process_from_datetime(self, process_uri: str, start: datetime = None) -> bool:
"""
Restarts a process from a given point in time. There has to be a database interface implemented.
:param process_uri: unique uri for the database interface to find information on the process
:param start: Point in time from where to restart. The default start point is the last known state.
:return: bool, if the process could be restarted
"""
raise NotImplementedError
[docs]
@abstractmethod
def get_process_state(self, process_name: str) -> ProcessExecutionState:
"""
returns current state of the process
:param process_name:
:return: ProcessExecutionState
"""
raise NotImplementedError
[docs]
@abstractmethod
def set_process_priority(self, process_name: str, priority: int):
"""
Changes the priority of an existing process
"""
raise NotImplementedError
[docs]
@abstractmethod
def remove_processes(self, process_names: List[str],
return_labwares: bool = False, final_device: str = None):
"""
TODO: improve description
Removes all information of the specified process. If the flag is set, the scheduler will return all involved
labware to their original position. You can also specify a storage device, where all labware shall be
brought.
:param process_names: Unique name of the process to remove.
:param return_labwares: Shall the scheduler bring all involved labwares to some location? By default it is \
their starting position.
:param final_device: If *return_labware* is set, you can specify a device where all involved labware shall\
be brought.
:return: Nothing
"""
raise NotImplementedError
[docs]
@abstractmethod
def simulate_all_processes(self, speed: float) -> bool:
"""TODO: description 0-600x ?.
:return: bool if all processes could be started
"""
# > 600 -> SpeedTooHightError
raise NotImplementedError
@property
@abstractmethod
def simulation_speed(self) -> float:
raise NotImplementedError
@simulation_speed.setter
@abstractmethod
def simulation_speed(self, speed: float) -> bool:
"""TODO: description 0-600x ?.
:return: bool if all processes could be started
"""
# > 600 -> SpeedTooHightError
raise NotImplementedError
@property
@abstractmethod
def in_time(self) -> bool:
"""timing state of the orchestrator
:return: _True, if orchestrator is currently in time (=not delayed) _
:rtype: bool
"""
raise NotImplementedError
@property
@abstractmethod
def gantt_chart_scheduled_processes(self, processes: List[str] = None):
"""returns the gantt chart including that process.
future
:param processes: List of names of all processes to include in the gantt chart. By default, its all.
:return: The gantt chart as plotly figure.
"""
raise NotImplementedError
@property
@abstractmethod
def gantt_chart_executed_processes(self, processes: List[str] = None):
"""returns the gantt chart including that process.
past
:param processes: List of names of all processes to include in the gantt chart. By default, its all.
:return: The gantt chart as plotly figure.
"""
raise NotImplementedError
@property
@abstractmethod
def workflow_graph_scheduled_processes(self, processes: List[str] = None):
"""
control-flow or workflow ?
Creates a graphviz Digraph visualizing the progress of scheduled processes.
future
TODO: we could add different formats, defined by an WFGFormat(Enum)
:param processes: A list of process names that shall be visualized. By default all processes will be included.
:return: WorkFlowGraphViz
"""
raise NotImplementedError
@property
@abstractmethod
def workflow_graph_executed_processes(self, processes: List[str] = None):
"""
control-flow or workflow ?
Creates a graphviz Digraph visualizing the progress of scheduled processes.
future
TODO: we could add different formats, defined by an WFGFormat(Enum)
:param processes: A list of process names that shall be visualized. By default all processes will be included.
:return: WorkFlowGraphViz
"""
raise NotImplementedError
# !! I would write an extra module to validate and test a process (additional instance of the orchestrator)
# def validate_process(self, process):
[docs]
class FormalLabwareManagerInterface(metaclass=ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
# TODO: check logic
return (hasattr(subclass, 'add_lab_resources') and
callable(subclass.add_lab_resources) or
NotImplemented)
[docs]
def check_labware_presence(self, process_names: List[str] = None) -> Tuple[List[Any], List[Any]]:
"""
Checks whether in the database there is labware registered as required for the process. (In the process is
defined, where what labware is required).
:param process_names: By default, it is all processes, have been added but not started.
:return: A tuple [found, missing]. The first entry is a list of information on existing labware
and the second entry lists requirements of missing labware
"""
raise NotImplementedError
[docs]
@abstractmethod
def add_labware(self, labware: List[Any]):
"""
register one or several labware to orchestrator
"""
raise NotImplementedError
[docs]
@abstractmethod
def remove_labware(self, labware_ids: List[str]):
"""
Removes one ore several labware from the orchestrator
"""
raise NotImplementedError
[docs]
@abstractmethod
def create_labware_location_graph(self, labware_ids: List[str]):
"""
history of labware movements (esp. colocations)
current state labware distribution
"""
raise NotImplementedError
[docs]
class FormalSampleManagerInterface(metaclass=ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
# TODO: check logic
return (hasattr(subclass, 'add_lab_resources') and
callable(subclass.add_lab_resources) or
NotImplemented)
@property
@abstractmethod
def sample_location_graph(self, labware_id: str):
"""
TODO: add description
"""
raise NotImplementedError
[docs]
@abstractmethod
def get_sample_labware(self, labware_id: str, sample_id):
"""
TODO: add description
"""
raise NotImplementedError
[docs]
class FormalLoggingInterface(metaclass=ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
# TODO: check logic
return (hasattr(subclass, 'add_lab_resources') and
callable(subclass.add_lab_resources) or
NotImplemented)
@property
@abstractmethod
def logging_level(self):
"""
TODO: add description
"""
raise NotImplementedError
@logging_level.setter
@abstractmethod
def logging_level(self, level: int):
"""
TODO: add description
"""
raise NotImplementedError
[docs]
@abstractmethod
def get_log(self, start_datetime: datetime = None, stop_datetime: datetime = None, level: int = 0):
"""
default for stop: now
"""
[docs]
class OrchestratorInterface(
ABC,
FormalOrchestratorConfigInterface,
FormalProcessControllerInterface,
FormalLoggingInterface,
FormalLabwareManagerInterface,
FormalProcessStepControllerInterface,
#FormalSampleManagerInterface
):
pass