Source code for laborchestrator.orchestrator_interface

from abc import ABC, ABCMeta, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple

from enum import Enum


[docs] class ProcessReader(Enum): PYTHONLABREADER = 1
# TODO: any other reader ?
[docs] class ProcessExecutionState(Enum): """_State of a Process Execution_ TODO: check SiLA specification :param Enum: _description_ :type Enum: _type_ """ IDLE = 1 SCHEDULED = 5 # - do we need this ? stefan: might be useful RUNNING = 2 # EXECUTING PAUSED = 3 FINISHED = 4
[docs] @dataclass class ProcessDescription: name: str description: Optional[str] = None file_path: Optional[str] = None
[docs] @dataclass class ProcessInfo: name: str priority: int state: ProcessExecutionState
[docs] @dataclass class LabwareInfo: """ """ URI: str barcode: str
[docs] class FormalOrchestratorConfigInterface(metaclass=ABCMeta): """ Resources are all ... excluding labware""" @classmethod def __subclasshook__(cls, subclass): # TODO: check logic return (hasattr(subclass, 'add_lab_resources') and callable(subclass.add_lab_resources) or NotImplemented)
[docs] @abstractmethod def add_lab_resources_from_file(self, lab_env_filename: str): """Defines and configures the lab environment (types, names, functionalities, etc. of devices). :param lab_env_filename: Name of the Lab Environment configuration file in YAML format, according to the specification TODO:[@Mark: insert link] :return: """ raise NotImplementedError
[docs] @abstractmethod def add_lab_resources_from_database(self, URI: str): """Defines and configures the lab environment (types, names, functionalities, etc. of devices). :param URI: Name of the Lab Environment configuration file in YAML format, according to the specification TODO:[@Mark: insert link] :return: """ raise NotImplementedError
[docs] class FormalProcessStepControllerInterface(metaclass=ABCMeta): @classmethod def __subclasshook__(cls, subclass): # TODO: check logic return (hasattr(subclass, 'add_lab_resources') and callable(subclass.add_lab_resources) or NotImplemented)
[docs] @abstractmethod def insert_process_step(self, process_step: Any, parent_step_ids: List[str], child_step_ids: List[str], process_id: Optional[str] = None, waiting_cost: Dict[str, float] = None, max_waiting_time: Dict[str, float] = None): """ Adds the given process step inbetween the given parents and children with the (optional) given time constraints. If process_id is omitted, there is assumed to be at least one child or parent step. The new step is then assigned to the same process """ raise NotImplementedError
[docs] @abstractmethod def interrupt_process_step(self, step_id: str): """ Stops the process step as soon as possible and considers it finished """ raise NotImplementedError
[docs] @abstractmethod def retry_process_step(self, step_id: str): """ TODO: add description """ raise NotImplementedError
[docs] @abstractmethod def remove_process_step(self, step_id: str): """ TODO: add description """ raise NotImplementedError
[docs] @abstractmethod def get_process_step_state(self, step_id: str): """ TODO: add description """ raise NotImplementedError
[docs] @abstractmethod def process_step_executed_externally(self, step_id: str, result): """ TODO: improve description There are jobs that have to be done externally, e.g. by a human or external device. Call this method to inform the scheduler a external process step has finished the specified operation. :param step_id: unique id of the step, a external entity was supposed to do :param result: In case the job had some results, those should be given here """ raise NotImplementedError
[docs] @abstractmethod def reset_error_state(self, step_id: str, repeat_operation: bool = False): """TODO: improve description In case an error need to be resolved manually, use this command afterwards to proceed with the process. :param step_id: Unique identifier for an operation :param repeat_operation: Boolean whether the operation shall be tried again :return: nothing """ raise NotImplementedError
[docs] class FormalProcessControllerInterface(metaclass=ABCMeta): """ A class for managing one or more processes in a laboratory. You can add processes (also while others are already running), monitor their progress. The class incorporates an intelligent scheduler. It adapts the schedule to delays, errors, manual interference and addition of processes. Before really adding a process, you can check what the new schedule would look like. The progress can be observed via gantt charts or workflow graphs. """ # A class to read new processes. By default, it is a PythonLabReader @classmethod def __subclasshook__(cls, subclass): # TODO: check logic return (hasattr(subclass, 'process_reader') and callable(subclass.process_reader) and hasattr(subclass, 'add_process') and callable(subclass.add_process) or hasattr(subclass, 'processes') and callable(subclass.processes) or hasattr(subclass, 'start_process') and callable(subclass.start_process) or hasattr(subclass, 'start_all_processes') and callable(subclass.start_all_processes) or hasattr(subclass, 'stop_process') and callable(subclass.stop_process) or hasattr(subclass, 'stop_all_processes') and callable(subclass.stop_all_processes) or hasattr(subclass, 'pause_process') and callable(subclass.pause_all_processes) or hasattr(subclass, 'resume_process') and callable(subclass.resume_process) or hasattr(subclass, 'resume_all_processes') and callable(subclass.resume_all_processes) or hasattr(subclass, 'get_process_state') and callable(subclass.get_process_state) or hasattr(subclass, 'remove_process') and callable(subclass.remove_process) or hasattr(subclass, 'remove_all_process') and callable(subclass.remove_all_process) or hasattr(subclass, 'in_time') and callable(subclass.in_time) or hasattr(subclass, 'process_step_executed_externally') and callable(subclass.process_step_executed_externally) or hasattr(subclass, 'reset_error_state') and callable(subclass.reset_error_state) or hasattr(subclass, 'gantt_chart') and callable(subclass.gantt_chart) or hasattr(subclass, 'workflow_graph_visualisation') and callable(subclass.workflow_graph_visualisation) or NotImplemented)
[docs] @abstractmethod def select_process_reader(self, process_reader=ProcessReader.PYTHONLABREADER): """Dependency injection of the process reader """
[docs] def get_parameter(self, param_name: str): """ Used as a flexible and easily extendable method to retrieve parameters of the orchestrator """
[docs] def set_parameter(self, param_name: str, new_value): """ Used as a flexible and easily extendable method to set parameters of the orchestrator """
@property @abstractmethod def available_processes(self) -> List[ProcessDescription]: """ Checks the file-system(currently not implemented) or database for available saved processes. All found processes are returned with the available information. :return: A List of tuples [Name, description(if found), filepath(if found)] for each found process """
[docs] @abstractmethod def add_process(self, description: Optional[str] = None, file_path: Optional[str] = None, name: Optional[str] = None) -> str: """Adds a process to be orchestrated, read by the selected process reader You have to either specify a description (i.e. the file content) or a file location. If no name is specified, the process will be named like P_2 (enumerated). If the given name is already taken, it is added a suffix. :return: process object :raises ParsingError """ raise NotImplementedError
@property @abstractmethod def processes(self) -> List[ProcessInfo]: """_Lists all process info for all current processes_ :raises NotImplementedError: _description_ :return: _description_ :rtype: List[str] """ raise NotImplementedError
[docs] @abstractmethod def start_processes(self, process_names: List[str] = None) -> bool: """Starts the specified added process. This will cause an initial scheduling. :return: bool if the process could be started """ raise NotImplementedError
[docs] @abstractmethod def stop_processes(self, process_names: List[str] = None) -> bool: """ Stops the specified process. All running operations will continue, but no follow up operation will be started. :param process_names: Unique name of the process to stop :return: bool, if process could be stopped ? """ raise NotImplementedError
[docs] @abstractmethod def pause_processes(self, process_names: List[str] = None) -> bool: """ Pausing a process of a given name :return: bool, if process could be paused """ raise NotImplementedError
[docs] @abstractmethod def resume_processes(self, process_names: List[str] = None) -> bool: """ Resume form Pausing a process of a given name :return: bool, if all process could be resumed """ raise NotImplementedError
[docs] @abstractmethod def restart_process_from_datetime(self, process_uri: str, start: datetime = None) -> bool: """ Restarts a process from a given point in time. There has to be a database interface implemented. :param process_uri: unique uri for the database interface to find information on the process :param start: Point in time from where to restart. The default start point is the last known state. :return: bool, if the process could be restarted """ raise NotImplementedError
[docs] @abstractmethod def get_process_state(self, process_name: str) -> ProcessExecutionState: """ returns current state of the process :param process_name: :return: ProcessExecutionState """ raise NotImplementedError
[docs] @abstractmethod def set_process_priority(self, process_name: str, priority: int): """ Changes the priority of an existing process """ raise NotImplementedError
[docs] @abstractmethod def remove_processes(self, process_names: List[str], return_labwares: bool = False, final_device: str = None): """ TODO: improve description Removes all information of the specified process. If the flag is set, the scheduler will return all involved labware to their original position. You can also specify a storage device, where all labware shall be brought. :param process_names: Unique name of the process to remove. :param return_labwares: Shall the scheduler bring all involved labwares to some location? By default it is \ their starting position. :param final_device: If *return_labware* is set, you can specify a device where all involved labware shall\ be brought. :return: Nothing """ raise NotImplementedError
[docs] @abstractmethod def simulate_all_processes(self, speed: float) -> bool: """TODO: description 0-600x ?. :return: bool if all processes could be started """ # > 600 -> SpeedTooHightError raise NotImplementedError
@property @abstractmethod def simulation_speed(self) -> float: raise NotImplementedError @simulation_speed.setter @abstractmethod def simulation_speed(self, speed: float) -> bool: """TODO: description 0-600x ?. :return: bool if all processes could be started """ # > 600 -> SpeedTooHightError raise NotImplementedError @property @abstractmethod def in_time(self) -> bool: """timing state of the orchestrator :return: _True, if orchestrator is currently in time (=not delayed) _ :rtype: bool """ raise NotImplementedError @property @abstractmethod def gantt_chart_scheduled_processes(self, processes: List[str] = None): """returns the gantt chart including that process. future :param processes: List of names of all processes to include in the gantt chart. By default, its all. :return: The gantt chart as plotly figure. """ raise NotImplementedError @property @abstractmethod def gantt_chart_executed_processes(self, processes: List[str] = None): """returns the gantt chart including that process. past :param processes: List of names of all processes to include in the gantt chart. By default, its all. :return: The gantt chart as plotly figure. """ raise NotImplementedError @property @abstractmethod def workflow_graph_scheduled_processes(self, processes: List[str] = None): """ control-flow or workflow ? Creates a graphviz Digraph visualizing the progress of scheduled processes. future TODO: we could add different formats, defined by an WFGFormat(Enum) :param processes: A list of process names that shall be visualized. By default all processes will be included. :return: WorkFlowGraphViz """ raise NotImplementedError @property @abstractmethod def workflow_graph_executed_processes(self, processes: List[str] = None): """ control-flow or workflow ? Creates a graphviz Digraph visualizing the progress of scheduled processes. future TODO: we could add different formats, defined by an WFGFormat(Enum) :param processes: A list of process names that shall be visualized. By default all processes will be included. :return: WorkFlowGraphViz """ raise NotImplementedError
# !! I would write an extra module to validate and test a process (additional instance of the orchestrator) # def validate_process(self, process):
[docs] class FormalLabwareManagerInterface(metaclass=ABCMeta): @classmethod def __subclasshook__(cls, subclass): # TODO: check logic return (hasattr(subclass, 'add_lab_resources') and callable(subclass.add_lab_resources) or NotImplemented)
[docs] def check_labware_presence(self, process_names: List[str] = None) -> Tuple[List[Any], List[Any]]: """ Checks whether in the database there is labware registered as required for the process. (In the process is defined, where what labware is required). :param process_names: By default, it is all processes, have been added but not started. :return: A tuple [found, missing]. The first entry is a list of information on existing labware and the second entry lists requirements of missing labware """ raise NotImplementedError
[docs] @abstractmethod def add_labware(self, labware: List[Any]): """ register one or several labware to orchestrator """ raise NotImplementedError
[docs] @abstractmethod def remove_labware(self, labware_ids: List[str]): """ Removes one ore several labware from the orchestrator """ raise NotImplementedError
[docs] @abstractmethod def create_labware_location_graph(self, labware_ids: List[str]): """ history of labware movements (esp. colocations) current state labware distribution """ raise NotImplementedError
[docs] class FormalSampleManagerInterface(metaclass=ABCMeta): @classmethod def __subclasshook__(cls, subclass): # TODO: check logic return (hasattr(subclass, 'add_lab_resources') and callable(subclass.add_lab_resources) or NotImplemented) @property @abstractmethod def sample_location_graph(self, labware_id: str): """ TODO: add description """ raise NotImplementedError
[docs] @abstractmethod def get_sample_labware(self, labware_id: str, sample_id): """ TODO: add description """ raise NotImplementedError
[docs] class FormalLoggingInterface(metaclass=ABCMeta): @classmethod def __subclasshook__(cls, subclass): # TODO: check logic return (hasattr(subclass, 'add_lab_resources') and callable(subclass.add_lab_resources) or NotImplemented) @property @abstractmethod def logging_level(self): """ TODO: add description """ raise NotImplementedError @logging_level.setter @abstractmethod def logging_level(self, level: int): """ TODO: add description """ raise NotImplementedError
[docs] @abstractmethod def get_log(self, start_datetime: datetime = None, stop_datetime: datetime = None, level: int = 0): """ default for stop: now """
[docs] class OrchestratorInterface( ABC, FormalOrchestratorConfigInterface, FormalProcessControllerInterface, FormalLoggingInterface, FormalLabwareManagerInterface, FormalProcessStepControllerInterface, #FormalSampleManagerInterface ): pass
[docs] class ProcessParsingError(Exception): def __init__(self, message): super().__init__()