laborchestrator.orchestrator_implementation module¶
module doc
- class laborchestrator.orchestrator_implementation.Orchestrator(reader: str = 'PythonLab', worker_type: ~typing.Type[~laborchestrator.engine.worker_interface.WorkerInterface] = <class 'laborchestrator.engine.worker_interface.WorkerInterface'>)[source]¶
Bases:
OrchestratorInterfaceA class for managing one or more processes in a laboratory. You can add processes (also while others are already running), monitor their progress. The class incorporates an intelligent scheduler. It adapts the schedule to delays, errors, manual interference and addition of processes. Before really adding a process, you can check what the new schedule would look like. The progress can be observed via gantt charts or workflow graphs.
- _abc_impl = <_abc._abc_data object>¶
- _simulation_speed = 20¶
- add_lab_resources_from_database(URI: str)[source]¶
Defines and configures the lab environment (types, names, functionalities, etc. of devices). :param URI: Name of the Lab Environment configuration file in YAML format,
according to the specification TODO:[@Mark: insert link]
- Returns:
- add_lab_resources_from_file(lab_env_filename: str)[source]¶
Defines and configures the lab environment (types, names, functionalities, etc. of devices). :param lab_env_filename: Name of the Lab Environment configuration file in YAML format,
according to the specification TODO:[@Mark: insert link]
- Returns:
- add_labware(labware: List[ContainerInfo])[source]¶
register one or several labware to orchestrator
- add_process(description: str | None = None, file_path: str | None = None, name: str | None = None, process_object=None, delay: int = 0) str[source]¶
Adds a process to be orchestrated, read by the selected process reader You have to either specify a description (i.e. the file content) or a file location. If no name is specified, the process will be named like P_2 (enumerated). If the given name is already taken, it is added a suffix. :return: process object :raises ParsingError
- property available_processes: List[ProcessDescription]¶
Checks the file-system(currently not implemented) or database for available saved processes. All found processes are returned with the available information. :return: A List of tuples [Name, description(if found), filepath(if found)] for each found process
- change_step(job_id, changes)[source]¶
Changes the specified operation. This might also be tried if the operation is already running.
- Parameters:
job_id – The unique id of the operation to change.
changes – A dictionary <parameter_name, new_value> of changes to apply.
- Returns:
Boolean, whether the operation could be changed.
- check_labware_presence(process_names: List[str] = None) Tuple[List[Any], List[Any]][source]¶
Checks whether in the database there is labware registered as required for the process. (In the process is defined, where what labware is required). :param process_names: By default, it is all processes, have been added but not started. :return: A tuple [found, missing]. The first entry is a list of information on existing labware
and the second entry lists requirements of missing labware
- continue_container(container_name)[source]¶
Continues operating the specified container.
- Parameters:
container_name – Unique name of the container to continue processing
- Returns:
- create_labware_location_graph(labware_ids: List[str])[source]¶
history of labware movements (esp. colocations) current state labware distribution
- db_client: StatusDBInterface¶
- error_resolved(operation_id: str, repeat_operation: bool = False)[source]¶
In case an error need to be resolved by hand, use this command afterward to proceed with the process. :param operation_id: Unique identifier for an operation :param repeat_operation: Boolean whether the operation shall be tried again :return: nothing
- property gantt_chart_executed_processes¶
returns the gantt chart including that process.
past :param processes: List of names of all processes to include in the gantt chart. By default, its all. :return: The gantt chart as plotly figure.
- property gantt_chart_scheduled_processes¶
returns the gantt chart including that process.
future
- Parameters:
processes – List of names of all processes to include in the gantt chart. By default, its all.
- Returns:
The gantt chart as plotly figure.
- get_log(start_datetime: datetime = None, stop_datetime: datetime = None, level: int = 0)[source]¶
default for stop: now
- get_parameter(param_name: str)[source]¶
Used as a flexible and easily extendable method to retrieve parameters of the orchestrator
- get_process_state(process_name: str) ProcessExecutionState[source]¶
returns current state of the process :param process_name: :return: ProcessExecutionState
- human_did_job(job_id, result)[source]¶
There are jobs that have to be done by humans(also coded as a lab device). Call this method to inform the scheduler a human has finished the specified operation.
- Parameters:
job_id – unique id of the job, a human was supposed to do
result – In case the job had some results, those should be given here
- Returns:
A “Thank You!” to the human
- property in_time: bool¶
timing state of the orchestrator
- Returns:
_True, if orchestrator is currently in time (=not delayed) _
- Return type:
bool
- inject_db_interface(db_client: StatusDBInterface)[source]¶
- insert_process_step(process_step: Any, parent_step_ids: List[str], child_step_ids: List[str], process_id: str | None = None, waiting_cost: Dict[str, float] = None, max_waiting_time: Dict[str, float] = None)[source]¶
Adds the given process step inbetween the given parents and children with the (optional) given time constraints. If process_id is omitted, there is assumed to be at least one child or parent step. The new step is then assigned to the same process
- interrupt_process_step(step_id: str)[source]¶
Stops the process step as soon as possible and considers it finished
- property logging_level¶
add description
- Type:
TODO
- pause_processes(process_names: List[str] = None) bool[source]¶
Pausing a process of a given name :return: bool, if process could be paused
- process_finished(process_name: str) bool[source]¶
Checks whether the specified process is finished :param process_name: :return:
- process_reader: ProcessReader¶
- process_step_executed_externally(step_id: str, result)[source]¶
TODO: improve description There are jobs that have to be done externally, e.g. by a human or external device. Call this method to inform the scheduler a external process step has finished the specified operation.
- Parameters:
step_id – unique id of the step, a external entity was supposed to do
result – In case the job had some results, those should be given here
- property processes: List[ProcessInfo]¶
_Lists all process info for all current processes_
- Raises:
NotImplementedError – _description_
- Returns:
_description_
- Return type:
List[str]
- remove_container(container_name: str, return_container=False, final_device=None)[source]¶
Removes all information of the specified container. If the flag is set, the scheduler will return the container to its original position. You can also specify a storage device, where it shall be brought.
- Parameters:
container_name – Unique name of the container.
return_container – Shall the container be brought somewhere. By default it is its starting position.
final_device – If return_container is set, you can specify a device the container shall be brought.
- Returns:
Nothing
- remove_labware(labware_ids: List[str])[source]¶
Removes one ore several labware from the orchestrator
- remove_processes(process_names: List[str], return_labwares: bool = False, final_device: str = None)[source]¶
TODO: improve description Removes all information of the specified process. If the flag is set, the scheduler will return all involved labware to their original position. You can also specify a storage device, where all labware shall be brought.
- Parameters:
process_names – Unique name of the process to remove.
return_labwares – Shall the scheduler bring all involved labwares to some location? By default it is their starting position.
final_device – If return_labware is set, you can specify a device where all involved labware shall be brought.
- Returns:
Nothing
- reset_error_state(step_id: str, repeat_operation: bool = False)[source]¶
TODO: improve description In case an error need to be resolved manually, use this command afterwards to proceed with the process. :param step_id: Unique identifier for an operation :param repeat_operation: Boolean whether the operation shall be tried again :return: nothing
- restart_process_from_datetime(process_uri: str, start: datetime = None) bool[source]¶
Restarts a process from a given point in time. There has to be a database interface implemented. :param process_uri: unique uri for the database interface to find information on the process :param start: Point in time from where to restart. The default start point is the last known state. :return: bool, if the process could be restarted
- resume_processes(process_names: List[str] = None) bool[source]¶
Resume form Pausing a process of a given name :return: bool, if all process could be resumed
- schedule_manager: ScheduleManager¶
- select_process_reader(process_reader=ProcessReader.PYTHONLABREADER)[source]¶
Dependency injection of the process reader
- set_parameter(param_name: str, new_value)[source]¶
Used as a flexible and easily extendable method to set parameters of the orchestrator
- set_process_priority(process_name: str, priority: int)[source]¶
Changes the priority of an existing process
- simulate_all_processes(speed: float) bool[source]¶
TODO: description 0-600x ?. :return: bool if all processes could be started
- property simulation_speed: float¶
- start_processes(process_names: List[str] = None) bool[source]¶
Starts the specified added process. This will cause an initial scheduling. :return: bool if the process could be started
- stop_and_remove_all_processes(return_containers=False, final_device=None)[source]¶
Removes all information all processes. If the flag is set, the scheduler will return all involved containers to their original position. You can also specify a storage device, where all containers shall be brought.
- Parameters:
return_containers – Shall the scheduler bring all involved containers to some location? By default, it is their starting position.
final_device – If return_containers is set, you can specify a device where all involved containers shall be brought.
- Returns:
Nothing
- stop_container(container_name)[source]¶
No further operations of the specified container will be started. The current operation will continue.
- Parameters:
container_name – Unique name of the container to stop processing
- Returns:
- stop_processes(process_names: List[str] = None) bool[source]¶
Stops the specified process. All running operations will continue, but no follow up operation will be started.
- Parameters:
process_names – Unique name of the process to stop
- Returns:
bool, if process could be stopped ?
- test_add_process(process)[source]¶
Tries to compute a schedule including the given process. You can get that schedule via get_test_gantt_chart() and get_test_workflow state(). If you choose to really add and start it call add_process()
- Parameters:
process – The process to try to include.
- Returns:
Nothing
- wfg_manager: WFGManager¶
- worker: WorkerInterface¶
- worker_observer: WorkerObserver¶
- property workflow_graph_executed_processes¶
control-flow or workflow ? Creates a graphviz Digraph visualizing the progress of scheduled processes. future TODO: we could add different formats, defined by an WFGFormat(Enum)
- Parameters:
processes – A list of process names that shall be visualized. By default all processes will be included.
- Returns:
WorkFlowGraphViz
- property workflow_graph_scheduled_processes¶
control-flow or workflow ? Creates a graphviz Digraph visualizing the progress of scheduled processes. future TODO: we could add different formats, defined by an WFGFormat(Enum)
- Parameters:
processes – A list of process names that shall be visualized. By default all processes will be included.
- Returns:
WorkFlowGraphViz