laborchestrator.orchestrator_implementation module

module doc

class laborchestrator.orchestrator_implementation.Orchestrator(reader: str = 'PythonLab', worker_type: ~typing.Type[~laborchestrator.engine.worker_interface.WorkerInterface] = <class 'laborchestrator.engine.worker_interface.WorkerInterface'>)[source]

Bases: OrchestratorInterface

A class for managing one or more processes in a laboratory. You can add processes (also while others are already running), monitor their progress. The class incorporates an intelligent scheduler. It adapts the schedule to delays, errors, manual interference and addition of processes. Before really adding a process, you can check what the new schedule would look like. The progress can be observed via gantt charts or workflow graphs.

_abc_impl = <_abc._abc_data object>
_simulation_speed = 20
add_lab_resources_from_database(URI: str)[source]

Defines and configures the lab environment (types, names, functionalities, etc. of devices). :param URI: Name of the Lab Environment configuration file in YAML format,

according to the specification TODO:[@Mark: insert link]

Returns:

add_lab_resources_from_file(lab_env_filename: str)[source]

Defines and configures the lab environment (types, names, functionalities, etc. of devices). :param lab_env_filename: Name of the Lab Environment configuration file in YAML format,

according to the specification TODO:[@Mark: insert link]

Returns:

add_labware(labware: List[ContainerInfo])[source]

register one or several labware to orchestrator

add_process(description: str | None = None, file_path: str | None = None, name: str | None = None, process_object=None, delay: int = 0) str[source]

Adds a process to be orchestrated, read by the selected process reader You have to either specify a description (i.e. the file content) or a file location. If no name is specified, the process will be named like P_2 (enumerated). If the given name is already taken, it is added a suffix. :return: process object :raises ParsingError

add_to_schedule(process_name)[source]
property available_processes: List[ProcessDescription]

Checks the file-system(currently not implemented) or database for available saved processes. All found processes are returned with the available information. :return: A List of tuples [Name, description(if found), filepath(if found)] for each found process

change_step(job_id, changes)[source]

Changes the specified operation. This might also be tried if the operation is already running.

Parameters:
  • job_id – The unique id of the operation to change.

  • changes – A dictionary <parameter_name, new_value> of changes to apply.

Returns:

Boolean, whether the operation could be changed.

check_labware_presence(process_names: List[str] = None) Tuple[List[Any], List[Any]][source]

Checks whether in the database there is labware registered as required for the process. (In the process is defined, where what labware is required). :param process_names: By default, it is all processes, have been added but not started. :return: A tuple [found, missing]. The first entry is a list of information on existing labware

and the second entry lists requirements of missing labware

continue_container(container_name)[source]

Continues operating the specified container.

Parameters:

container_name – Unique name of the container to continue processing

Returns:

create_labware_location_graph(labware_ids: List[str])[source]

history of labware movements (esp. colocations) current state labware distribution

db_client: StatusDBInterface
error_resolved(operation_id: str, repeat_operation: bool = False)[source]

In case an error need to be resolved by hand, use this command afterward to proceed with the process. :param operation_id: Unique identifier for an operation :param repeat_operation: Boolean whether the operation shall be tried again :return: nothing

execution_on_time() bool[source]
export_current_scheduling_problem(filename: str)[source]
property gantt_chart_executed_processes

returns the gantt chart including that process.

past :param processes: List of names of all processes to include in the gantt chart. By default, its all. :return: The gantt chart as plotly figure.

property gantt_chart_scheduled_processes

returns the gantt chart including that process.

future

Parameters:

processes – List of names of all processes to include in the gantt chart. By default, its all.

Returns:

The gantt chart as plotly figure.

get_log(start_datetime: datetime = None, stop_datetime: datetime = None, level: int = 0)[source]

default for stop: now

get_operable_node(idx)[source]
get_parameter(param_name: str)[source]

Used as a flexible and easily extendable method to retrieve parameters of the orchestrator

get_process_state(process_name: str) ProcessExecutionState[source]

returns current state of the process :param process_name: :return: ProcessExecutionState

get_process_step_state(step_id: str)[source]

TODO: add description

human_did_job(job_id, result)[source]

There are jobs that have to be done by humans(also coded as a lab device). Call this method to inform the scheduler a human has finished the specified operation.

Parameters:
  • job_id – unique id of the job, a human was supposed to do

  • result – In case the job had some results, those should be given here

Returns:

A “Thank You!” to the human

property in_time: bool

timing state of the orchestrator

Returns:

_True, if orchestrator is currently in time (=not delayed) _

Return type:

bool

inject_db_interface(db_client: StatusDBInterface)[source]
insert_process_step(process_step: Any, parent_step_ids: List[str], child_step_ids: List[str], process_id: str | None = None, waiting_cost: Dict[str, float] = None, max_waiting_time: Dict[str, float] = None)[source]

Adds the given process step inbetween the given parents and children with the (optional) given time constraints. If process_id is omitted, there is assumed to be at least one child or parent step. The new step is then assigned to the same process

interrupt_process_step(step_id: str)[source]

Stops the process step as soon as possible and considers it finished

property logging_level

add description

Type:

TODO

pause_processes(process_names: List[str] = None) bool[source]

Pausing a process of a given name :return: bool, if process could be paused

process_finished(process_name: str) bool[source]

Checks whether the specified process is finished :param process_name: :return:

process_reader: ProcessReader
process_step_executed_externally(step_id: str, result)[source]

TODO: improve description There are jobs that have to be done externally, e.g. by a human or external device. Call this method to inform the scheduler a external process step has finished the specified operation.

Parameters:
  • step_id – unique id of the step, a external entity was supposed to do

  • result – In case the job had some results, those should be given here

property processes: List[ProcessInfo]

_Lists all process info for all current processes_

Raises:

NotImplementedError – _description_

Returns:

_description_

Return type:

List[str]

remove_container(container_name: str, return_container=False, final_device=None)[source]

Removes all information of the specified container. If the flag is set, the scheduler will return the container to its original position. You can also specify a storage device, where it shall be brought.

Parameters:
  • container_name – Unique name of the container.

  • return_container – Shall the container be brought somewhere. By default it is its starting position.

  • final_device – If return_container is set, you can specify a device the container shall be brought.

Returns:

Nothing

remove_labware(labware_ids: List[str])[source]

Removes one ore several labware from the orchestrator

remove_process_step(step_id: str)[source]

TODO: add description

remove_processes(process_names: List[str], return_labwares: bool = False, final_device: str = None)[source]

TODO: improve description Removes all information of the specified process. If the flag is set, the scheduler will return all involved labware to their original position. You can also specify a storage device, where all labware shall be brought.

Parameters:
  • process_names – Unique name of the process to remove.

  • return_labwares – Shall the scheduler bring all involved labwares to some location? By default it is their starting position.

  • final_device – If return_labware is set, you can specify a device where all involved labware shall be brought.

Returns:

Nothing

reset_error_state(step_id: str, repeat_operation: bool = False)[source]

TODO: improve description In case an error need to be resolved manually, use this command afterwards to proceed with the process. :param step_id: Unique identifier for an operation :param repeat_operation: Boolean whether the operation shall be tried again :return: nothing

restart_process_from_datetime(process_uri: str, start: datetime = None) bool[source]

Restarts a process from a given point in time. There has to be a database interface implemented. :param process_uri: unique uri for the database interface to find information on the process :param start: Point in time from where to restart. The default start point is the last known state. :return: bool, if the process could be restarted

resume_processes(process_names: List[str] = None) bool[source]

Resume form Pausing a process of a given name :return: bool, if all process could be resumed

retry_process_step(step_id: str)[source]

TODO: add description

schedule_manager: ScheduleManager
select_process_reader(process_reader=ProcessReader.PYTHONLABREADER)[source]

Dependency injection of the process reader

set_parameter(param_name: str, new_value)[source]

Used as a flexible and easily extendable method to set parameters of the orchestrator

set_process_priority(process_name: str, priority: int)[source]

Changes the priority of an existing process

sila_server: Server
simulate_all_processes(speed: float) bool[source]

TODO: description 0-600x ?. :return: bool if all processes could be started

property simulation_speed: float
start_processes(process_names: List[str] = None) bool[source]

Starts the specified added process. This will cause an initial scheduling. :return: bool if the process could be started

start_sila_interface()[source]
stop_and_remove_all_processes(return_containers=False, final_device=None)[source]

Removes all information all processes. If the flag is set, the scheduler will return all involved containers to their original position. You can also specify a storage device, where all containers shall be brought.

Parameters:
  • return_containers – Shall the scheduler bring all involved containers to some location? By default, it is their starting position.

  • final_device – If return_containers is set, you can specify a device where all involved containers shall be brought.

Returns:

Nothing

stop_container(container_name)[source]

No further operations of the specified container will be started. The current operation will continue.

Parameters:

container_name – Unique name of the container to stop processing

Returns:

stop_processes(process_names: List[str] = None) bool[source]

Stops the specified process. All running operations will continue, but no follow up operation will be started.

Parameters:

process_names – Unique name of the process to stop

Returns:

bool, if process could be stopped ?

test_add_process(process)[source]

Tries to compute a schedule including the given process. You can get that schedule via get_test_gantt_chart() and get_test_workflow state(). If you choose to really add and start it call add_process()

Parameters:

process – The process to try to include.

Returns:

Nothing

wfg_manager: WFGManager
worker: WorkerInterface
worker_observer: WorkerObserver
property workflow_graph_executed_processes

control-flow or workflow ? Creates a graphviz Digraph visualizing the progress of scheduled processes. future TODO: we could add different formats, defined by an WFGFormat(Enum)

Parameters:

processes – A list of process names that shall be visualized. By default all processes will be included.

Returns:

WorkFlowGraphViz

property workflow_graph_scheduled_processes

control-flow or workflow ? Creates a graphviz Digraph visualizing the progress of scheduled processes. future TODO: we could add different formats, defined by an WFGFormat(Enum)

Parameters:

processes – A list of process names that shall be visualized. By default all processes will be included.

Returns:

WorkFlowGraphViz