Source code for laborchestrator.orchestrator_implementation

"""module doc"""
import inspect
import traceback
from laborchestrator.logging_manager import StandardLogger as Logger
from datetime import datetime, timedelta
import json
import time
from typing import Dict, List, Optional, Any, Type, Tuple

from laborchestrator.workflowgraph import WorkFlowGraph
from laborchestrator.structures import SchedulingInstance, StepStatus, ContainerInfo, MoveStep, ProcessStep
from laborchestrator.pythonlab_reader import PythonLabReader
from laborchestrator.process_reader import ProcessReader
from laborchestrator.pythonlab_process_finder import ProcessFinder
from laborchestrator.engine import WorkerInterface, WFGManager, ScheduleManager, WorkerObserver
from laborchestrator.orchestrator_interface import (
    OrchestratorInterface,
    ProcessReader as ProcessReaderEnum,
    ProcessInfo,
    ProcessExecutionState, ProcessDescription
)
from laborchestrator.database_integration import StatusDBInterface, StatusDBDummyImplementation
from laborchestrator.sila_server import Server as OrchestratorServer


[docs] class Orchestrator(OrchestratorInterface): """ A class for managing one or more processes in a laboratory. You can add processes (also while others are already running), monitor their progress. The class incorporates an intelligent scheduler. It adapts the schedule to delays, errors, manual interference and addition of processes. Before really adding a process, you can check what the new schedule would look like. The progress can be observed via gantt charts or workflow graphs. """ sila_server: OrchestratorServer def __init__(self, reader: str = "PythonLab", worker_type: Type[WorkerInterface] = WorkerInterface): """Constructs a new empty interface. It es initialized in stop state. Call start_processing() to get it into running state.""" # the container for all experiments and a copy where processes will be added for tryouts self.test_jssp = SchedulingInstance() # structure for start, stop, work self.select_process_reader() # create the five main parts of the inner logic self.jssp = SchedulingInstance() self.db_client = StatusDBDummyImplementation() self.schedule_manager = ScheduleManager(self.jssp) self.wfg_manager = WFGManager(self.jssp, schedule_manager=self.schedule_manager) self.worker = worker_type(self.jssp, schedule_manager=self.schedule_manager, db_client=self.db_client) self.worker_observer = WorkerObserver(schedule_manager=self.schedule_manager, wfg_manager=self.wfg_manager, jssp=self.jssp, worker=self.worker) # used to enumerate all processes with no specified name self.process_counter = 0
[docs] def start_sila_interface(self): self.sila_server = OrchestratorServer(self) self.sila_server.start_insecure("127.0.0.1", 50088)
[docs] def check_labware_presence(self, process_names: List[str] = None) -> Tuple[List[Any], List[Any]]: if process_names is None: process_names = [] # take all processes with no step started for name, process in self.jssp.process_by_name.items(): if all(step.status == StepStatus.WAITING for step in process.steps): process_names.append(name) found = [] missing = [] for name in process_names: for cont in self.jssp.process_by_name[name].containers: if self.db_client.get_container_at_position(cont.start_device.name, cont.current_pos): found.append(cont) else: missing.append(cont) return found, missing
[docs] def add_labware(self, labware: List[ContainerInfo]): for cont in labware: self.db_client.add_container(cont)
[docs] def remove_labware(self, labware_ids: List[str]): for labware_id in labware_ids: if labware_id in self.jssp.container_info_by_name: self.db_client.remove_container(self.jssp.container_info_by_name[labware_id]) else: Logger.warning(f"No container with name {labware_id} found for current processes")
[docs] def create_labware_location_graph(self, labware_ids: List[str]): Logger.info("Labware graph creation is not implemented, yet.")
# A class to read new processes. By default, it is a PythonLabReader process_reader: ProcessReader worker: WorkerInterface worker_observer: WorkerObserver wfg_manager: WFGManager schedule_manager: ScheduleManager _simulation_speed = 20 db_client: StatusDBInterface
[docs] def inject_db_interface(self, db_client: StatusDBInterface): if not isinstance(db_client, StatusDBInterface): Logger.error(f"Invalid database interface of type: {type(db_client)}") return self.db_client = db_client self.worker.db_client = db_client self.schedule_manager.db_client = db_client
[docs] def add_lab_resources_from_file(self, lab_env_filename: str): # load the file with open(lab_env_filename, 'r') as instream: yaml = instream.read() # send it to the scheduler success = self.schedule_manager.configure_lab(yaml) if not success: Logger.warning("Unable to set lab environment")
[docs] def add_lab_resources_from_database(self, URI: str): raise NotImplementedError
[docs] def execution_on_time(self) -> bool: return not self.jssp.schedule_violated()
[docs] def add_process(self, description: Optional[str] = None, file_path: Optional[str] = None, name: Optional[str] = None, process_object=None, delay: int = 0) -> str: self.process_counter += 1 if name is None or name in self.jssp.process_by_name: name = f"P{self.process_counter}" assert name not in self.jssp.process_by_name # theoretical possible # create the process object and its database representation new_process = self.process_reader.read_process( process=process_object, file_path=file_path, name=name, src=description ) description = src = inspect.getsource(type(new_process)) # add the process to the database, if it is not already there available_processes = self.db_client.get_available_processes() for process_name, process_uuid in available_processes: src = self.db_client.get_process(process_uuid) if src == description: break else: name_in_db = type(process_object).__name__ if process_object else name process_uuid = self.db_client.add_process_to_db(name=name_in_db, src=description) experiment_uuid = self.db_client.create_experiment(process_id=process_uuid) new_process.experiment_uuid = experiment_uuid # possibly add the delayed start if delay: start_time = datetime.now() + timedelta(minutes=delay) new_process.min_start = start_time self.jssp.add_process(new_process) # this should be done once, before the process might be scheduled # it is only necessary to correct the mistake made by the reader which sets the origin of movements after # barcode-reads to the robots arm for step in self.jssp.step_by_id.values(): if isinstance(step, MoveStep): #for i in dll.DefinedTypes: step.used_devices = [d for d in step.used_devices if not d.tag == 'origin'] self.wfg_manager.set_origins() self.jssp.start_time = datetime.today() return name
[docs] def select_process_reader(self, process_reader=ProcessReaderEnum.PYTHONLABREADER): # todo this is not any better than normal hard code if process_reader == ProcessReaderEnum.PYTHONLABREADER: self.process_reader = PythonLabReader()
[docs] def get_parameter(self, param_name: str): if param_name == "scheduling_time": return self.schedule_manager.time_limit_short
[docs] def set_parameter(self, param_name: str, new_value): if param_name == "scheduling_time": self.schedule_manager.time_limit_short = float(new_value)
@property def available_processes(self) -> List[ProcessDescription]: found_processes = [] for name, uuid in self.db_client.get_available_processes(): description = self.db_client.get_process(uuid) found_processes.append(ProcessDescription(name=name, description=description)) return found_processes @property def processes(self) -> List[ProcessInfo]: result = [ProcessInfo(name=name, priority=process.priority, state=process.status) for name, process in self.jssp.process_by_name.items()] return result
[docs] def start_processes(self, process_names: List[str] = None) -> bool: if process_names is None: process_names = self.jssp.process_by_name.keys() for name in process_names: if name in self.worker.jssp.running_processes_names: Logger.warning("Already running") else: # perform a worker/lab specific check whether everything is ready for the start #ready, message = self.worker.check_prerequisites(self.jssp.process_by_name[name]) #Logger.info(message) #if ready: # Starts the specified added process. This will cause a rescheduling. self.jssp.start_process(name) self.schedule_manager.mark_schedule_invalid(enforce=True) return True
[docs] def stop_processes(self, process_names: List[str] = None) -> bool: if process_names is None: process_names = self.jssp.process_by_name.keys() for name in process_names: self.jssp.stop_process(name) return True
[docs] def pause_processes(self, process_names: List[str] = None) -> bool: if process_names is None: process_names = self.jssp.process_by_name.keys() for name in process_names: self.jssp.stop_process(name) return True
[docs] def resume_processes(self, process_names: List[str] = None) -> bool: return self.start_processes(process_names)
[docs] def restart_process_from_datetime(self, process_uri: str, start: datetime = None) -> bool: raise NotImplementedError
[docs] def get_process_state(self, process_name: str) -> ProcessExecutionState: if process_name not in self.jssp.process_by_name: raise Exception(f"Process named {process_name} not found.") return self.jssp.process_stati_by_name[process_name]
[docs] def set_process_priority(self, process_name: str, priority: int): raise NotImplementedError
[docs] def remove_processes(self, process_names: List[str], return_labwares: bool = False, final_device: str = None): if process_names is None: process_names = self.jssp.process_by_name.keys() for name in process_names: if not self.jssp.process_stati_by_name[name] == ProcessExecutionState.IDLE: self.schedule_manager.mark_schedule_invalid() self.jssp.remove_process(name) return True
[docs] def simulate_all_processes(self, speed: float) -> bool: for op in self.jssp.step_by_id.values(): op.duration /= speed self.worker.simulation_mode = True self.schedule_manager.time_limit_short = 1 self.start_processes() return True
@property def simulation_speed(self) -> float: return self._simulation_speed @simulation_speed.setter def simulation_speed(self, speed: float): self._simulation_speed = speed @property def in_time(self) -> bool: return not self.jssp.schedule_violated()
[docs] def export_current_scheduling_problem(self, filename: str): print(f"exporting to {filename}") problem = self.schedule_manager.extract_near_future(n_steps=5000) sila_wfg = WorkFlowGraph.create_sila_structure_from_jobs(problem.values(), self.jssp.combined_wfg) with open(filename, "w") as writer: json.dump(sila_wfg, writer, indent=4)
@property def gantt_chart_scheduled_processes(self, processes: List[str] = None): return self.jssp.gannt_chart() @property def gantt_chart_executed_processes(self, processes: List[str] = None): return self.jssp.gannt_chart() @property def workflow_graph_scheduled_processes(self, processes: List[str] = None): return self.jssp.visualize_wfg() @property def workflow_graph_executed_processes(self, processes: List[str] = None): return self.jssp.visualize_wfg()
[docs] def insert_process_step(self, process_step: Any, parent_step_ids: List[str], child_step_ids: List[str], process_id: Optional[str] = None, waiting_cost: Dict[str, float] = None, max_waiting_time: Dict[str, float] = None): raise NotImplementedError
[docs] def interrupt_process_step(self, step_id: str): self.error_resolved(step_id, repeat_operation=False)
[docs] def retry_process_step(self, step_id: str): self.error_resolved(step_id, repeat_operation=True)
[docs] def remove_process_step(self, step_id: str): raise NotImplementedError
[docs] def get_process_step_state(self, step_id: str): return self.jssp.step_by_id[step_id].status
[docs] def process_step_executed_externally(self, step_id: str, result): self.error_resolved(step_id, repeat_operation=False)
[docs] def reset_error_state(self, step_id: str, repeat_operation: bool = False): self.error_resolved(step_id, repeat_operation=repeat_operation)
[docs] def get_log(self, start_datetime: datetime = None, stop_datetime: datetime = None, level: int = 0): raise NotImplementedError
@property def logging_level(self): return Logger.getLogger().level @logging_level.setter def logging_level(self, level: int): Logger.getLogger().setLevel(level)
[docs] def stop_container(self, container_name): """ No further operations of the specified container will be started. The current operation will continue. :param container_name: Unique name of the container to stop processing :return: """ raise NotImplementedError
[docs] def continue_container(self, container_name): """ Continues operating the specified container. :param container_name: Unique name of the container to continue processing :return: """ raise NotImplementedError
[docs] def remove_container(self, container_name: str, return_container=False, final_device=None): """ Removes all information of the specified container. If the flag is set, the scheduler will return the container to its original position. You can also specify a storage device, where it shall be brought. :param container_name: Unique name of the container. :param return_container: Shall the container be brought somewhere. By default it is its starting position. :param final_device: If *return_container* is set, you can specify a device the container shall be brought. :return: Nothing """ raise NotImplementedError
[docs] def error_resolved(self, operation_id: str, repeat_operation: bool = False): """ In case an error need to be resolved by hand, use this command afterward to proceed with the process. :param operation_id: Unique identifier for an operation :param repeat_operation: Boolean whether the operation shall be tried again :return: nothing """ if operation_id in self.jssp.operable: print(f"Recovering step {operation_id}. Repeat the step: {repeat_operation}") Logger.info(f"Recovering step {operation_id}. Repeat the step: {repeat_operation}") operation = self.jssp.operable[operation_id] if isinstance(operation, ProcessStep): if operation_id in self.worker_observer.observed_jobs: self.worker_observer.observed_jobs.remove(operation_id) if repeat_operation: # remove all information, the step was started operation.status = StepStatus.WAITING operation.start = None operation.finish = None if not repeat_operation: if operation.start is None: operation.start = datetime.today() operation.finish = datetime.today() + timedelta(seconds=1) operation.duration = (operation.finish - operation.start).total_seconds() operation.status = StepStatus.FINISHED if operation_id in self.worker.observation_handlers: print(f"removing {operation_id} from observation_handlers.") Logger.debug(f"removing {operation_id} from observation_handlers.") self.worker.observation_handlers.pop(operation_id) for cont_name in operation.cont_names: cont = self.jssp.container_info_by_name[cont_name] cont.in_error_state = False self.schedule_manager.mark_schedule_invalid(enforce=True) else: Logger.warning("Recovering anything but process steps is not implemented") Logger.error(f"Can only recover operations, not {operation_id}")
[docs] def stop_and_remove_all_processes(self, return_containers=False, final_device=None): """ Removes all information all processes. If the flag is set, the scheduler will return all involved containers to their original position. You can also specify a storage device, where all containers shall be brought. :param return_containers: Shall the scheduler bring all involved containers to some location? By default, it is\ their starting position. :param final_device: If *return_containers* is set, you can specify a device where all involved containers\ shall be brought. :return: Nothing """ raise NotImplementedError
[docs] def test_add_process(self, process): """ Tries to compute a schedule including the given process. You can get that schedule via *get_test_gantt_chart()*\ and *get_test_workflow state()*. If you choose to really add and start it call *add_process()* :param process: The process to try to include. :return: Nothing """ raise NotImplementedError
[docs] def change_step(self, job_id, changes): """ Changes the specified operation. This might also be tried if the operation is already running. :param job_id: The unique id of the operation to change. :param changes: A dictionary <parameter_name, new_value> of changes to apply. :return: Boolean, whether the operation could be changed. """ raise NotImplementedError
[docs] def human_did_job(self, job_id, result): """ There are jobs that have to be done by humans(also coded as a lab device). Call this method to inform the scheduler a human has finished the specified operation. :param job_id: unique id of the job, a human was supposed to do :param result: In case the job had some results, those should be given here :return: A "Thank You!" to the human """ pass
[docs] def process_finished(self, process_name: str) -> bool: """ Checks whether the specified process is finished :param process_name: :return: """ p = self.jssp.process_by_name[process_name] return all(job.status == StepStatus.FINISHED for job in p.steps if job.opacity > 0)
[docs] def get_operable_node(self, idx): if idx in self.jssp.operable: return self.jssp.operable[idx] return None
[docs] def add_to_schedule(self, process_name): if process_name in self.jssp.process_by_name: p = self.jssp.process_by_name[process_name] if p.status == ProcessExecutionState.IDLE: p.status = ProcessExecutionState.SCHEDULED if not self.jssp.running_processes_names: self.schedule_manager.mark_schedule_invalid(enforce=True)