Source code for laborchestrator.engine.schedule_manager

"""
The ScheduleManager oversees the scheduling and rescheduling process. It should be notified, when the current schedule
gets invalid for some reason. It starts and tries to maintain a connection to a scheduler_server. From this server
it requests new schedules when required. It uses the WFG class to encode and decode messages to the scheduling server.
"""

import time
import traceback
from threading import Thread
from enum import Enum
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import networkx as nx
import warnings

from labscheduler.dev_tools.algorithms_eval_config import time_limit
from sila2.framework import CommandExecutionStatus, SilaConnectionError
from laborchestrator.structures import SchedulingInstance, ProcessStep, Schedule, ProcessExecutionState
from laborchestrator.logging_manager import StandardLogger as Logger
try:
    from labscheduler.sila_server import Client as SchedulerClient
except ModuleNotFoundError:
    from sila2.client import SilaClient as SchedulerClient
from laborchestrator.workflowgraph import WorkFlowGraph as wfg
from laborchestrator.database_integration import StatusDBInterface


[docs] class ScheduleQuality(Enum): OPTIMAL = 0 FEASIBLE = 1 INFEASIBLE = 2 NOT_FOUND = 3 UNKNOWN = 4
AUTOMATIC_RESCHEDULING = True
[docs] class ScheduleManager: _scheduler_client: Optional[SchedulerClient] = None _rescheduling: bool _schedule_optimal: bool _schedule_valid: bool jssp: SchedulingInstance _hold_rescheduling: bool time_limit_short = 2 time_limit_long = 5 db_client: Optional[StatusDBInterface] schedule_quality: ScheduleQuality = ScheduleQuality.UNKNOWN def __init__(self, jssp: SchedulingInstance, db_client: Optional[StatusDBInterface] = None): # TODO Change this from yaml to a proper format self.lab_config_file = None # used to configure the scheduler on reconnection self._rescheduling = False self._changing_wfg = False self._hold_rescheduling = False self._schedule_valid = True self._schedule_optimal = True self.jssp = jssp self.db_client = db_client Thread(target=self._manage_rescheduling, daemon=True).start() Thread(target=self.keep_horizon, daemon=True).start() Thread(target=self.scheduler_heart_beat, daemon=True).start()
[docs] def _manage_rescheduling(self): while True: if not (self.is_rescheduling() or self._hold_rescheduling): try: start = time.time() if self._scheduler_client is not None: # try to find a solution quickly if not self._schedule_optimal or not self._schedule_valid: self._reschedule(time_limit=self.time_limit_short, try_hard=False) # if the schedule is still invalid try hard to find a solution if not self._schedule_valid and self.time_limit_long > self.time_limit_short: self._reschedule(time_limit=self.time_limit_long, try_hard=True) # do not immediately try again when scheduling failed if not self._schedule_valid and time.time() - start < 5: time.sleep(3) except Exception as ex: Logger.error(f"{ex}\n, {traceback.print_exc()}") time.sleep(3) time.sleep(.1)
[docs] def _reschedule(self, time_limit: float, try_hard: bool) -> None: """ Handles the flags and result from a scheduling attempt :param time_limit: :param try_hard: :return: """ try: start = time.time() Logger.info("rescheduling") self._rescheduling = True # when the schedule gets marked invalid again during rescheduling, we will have to reschedule again was_valid = self._schedule_valid self._schedule_valid = True algo_info = self.scheduler_client.SchedulingService.CurrentAlgorithm.get() max_problem_size = algo_info.MaxProblemSize J = self.extract_near_future(max_problem_size) self.jssp.future = J Logger.info(f"{time.time()-start}: problem size: {len(J)}, max problem size: {max_problem_size}") # check whether there is anything to schedule if not J or len(J) == 0: self._schedule_optimal = True return if self.db_client: not_started = [job for job in J.values() if not job.start] guesses = self.db_client.get_estimated_durations(not_started) for job, guess in zip(not_started, guesses): if guess: job.duration = guess Logger.info(f'{time.time()-start}:time guessing done') # the actual scheduling attempt schedule, quality = self._get_schedule_from_scheduler(time_limit, J) if quality in ScheduleQuality.__members__: self.schedule_quality = ScheduleQuality[quality] else: Logger.error(f"{quality} is no member of SolutionQuality.") Logger.info(f'{time.time()-start}:got the {quality} schedule from scheduler') if schedule is None: self.schedule_quality = ScheduleQuality.NOT_FOUND success = False # set the validity back self._schedule_valid = was_valid else: self.jssp.set_schedule(schedule) success = True self._schedule_optimal = True return success except Exception as ex: Logger.error(f"{ex}\n, {traceback.print_exc()}") finally: self._rescheduling = False
[docs] def _get_schedule_from_scheduler(self, time_limit: float, J: Dict[str, ProcessStep]) -> ( Tuple[Optional[Schedule], str] ): """ Requests a schedule from the scheduler server :param time_limit: limit for the computation time :param J: dictionary of jobs to schedule :return: A schedule if the scheduler found one, else None """ scheduler_client = self.scheduler_client if scheduler_client: sila_wfg = wfg.create_sila_structure_from_jobs(J.values(), self.jssp.combined_wfg) processes = [p for name, p in self.jssp.process_by_name.items() if name in self._processes_to_schedule] wfg.add_waiting_dummies(sila_wfg, processes) Logger.debug(datetime.now()) cmd = scheduler_client.SchedulingService.ComputeSchedule(sila_wfg, time_limit) start = time.time() # wait for the command to finish and get the result while not cmd.done: time.sleep(.05) if self._scheduler_client is None: Logger.warning("Scheduler went offline while scheduling command was running.") break # if the server is restarted while this command is running, # this would be an infinite loop and block rescheduling if time.time() > start + 2 * time_limit + 10: Logger.warning(f"Interupting solver after {time.time() - start} seconds" f" exceeding time limit of {time_limit} seconds too much.") break Logger.debug(datetime.now()) if cmd.status == CommandExecutionStatus.finishedSuccessfully: result = cmd.get_responses().Result sila_schedule = result.Schedule # the quality is either "OPTIMAL", "FEASIBLE" or "INFEASIBLE" schedule_quality = result.SolutionQuality schedule = wfg.create_schedule_from_sila_struct(sila_schedule) return schedule, schedule_quality return None, "INFEASIBLE"
@property def _processes_to_schedule(self) -> list[str]: # get the current state of every process states = self.jssp.process_stati_by_name # those get always scheduled states_to_schedule = {ProcessExecutionState.RUNNING} # i no preocess is running, the paused and scheduled processes are included in the schedule if ProcessExecutionState.RUNNING not in states.values(): states_to_schedule.add(ProcessExecutionState.PAUSED) # scheduled processes are mainly used to check, how the schedule would look states_to_schedule.add(ProcessExecutionState.SCHEDULED) # if none is active, add all finished processes (for a nice view of everything) if ProcessExecutionState.SCHEDULED not in states.values() and\ ProcessExecutionState.PAUSED not in states.values(): states_to_schedule.add(ProcessExecutionState.FINISHED) return [name for name, state in states.items() if state in states_to_schedule]
[docs] def extract_near_future(self, n_steps: int) -> Dict[str, ProcessStep]: g = self.jssp.combined_wfg J_old = self.jssp.definite_step_by_id # filter out the processes, we do not want to schedule to_schedule = self._processes_to_schedule J_old = {idx: job for idx, job in J_old.items() if job.process_name in to_schedule} # first add all started jobs J = {idx: job for idx, job in J_old.items() if job.start is not None} # collect the containers that are no reagents C = {name: cont for name, cont in self.jssp.container_info_by_name.items() if not cont.is_reagent} # TODO should steps with ancestors with opacity < 1 be included in schedules? # sort all descendants of current job in topological order # first sort all jobs topo_sort = list(nx.topological_sort(g)) # filter out started jobs topo_sort = list(filter(lambda n: n in J_old, topo_sort)) # second separate the unfinished jobs of each container real_decendants = [] for name in C: descent = nx.descendants(g, name) real_decendants.extend(descent) count = 0 for idx in topo_sort: if count > n_steps: break job = J_old[idx] if idx in real_decendants: count += 1 J[idx] = job # iteratively fill in all prerequisites of already added jobs operable = self.jssp.operable unsafe = set(J.keys()) while len(unsafe) > 0: idx = unsafe.pop() # never add container nodes for idx_o in operable[idx].prior: if idx_o not in J: # add only definite jobs (other operable nodes are linked as well) if idx_o in J_old: J[idx_o] = J_old[idx_o] unsafe.add(idx_o) return J
[docs] def keep_horizon(self): last_scheduled = 0 while True: time.sleep(4 * self.time_limit_short + 10) try: if self._rescheduling: continue if not self.scheduler_client: continue algo_info = self.scheduler_client.SchedulingService.CurrentAlgorithm.get() max_problem_size = algo_info.MaxProblemSize J = self.extract_near_future(max_problem_size) scheduled = self.jssp.schedule.keys() # check whether at least a third of the available steps for each available container is scheduled def check_schedule() -> bool: for name, process in self.jssp.process_by_name.items(): if name in self._processes_to_schedule: for cont in process.containers: if not cont.is_reagent: available = [idx for idx, job in J.items() if not job.start and cont.name in job.cont_names] in_schedule = sum(idx in scheduled for idx in available) if in_schedule < len(available)/3: Logger.info(f"for container {cont.name} only {in_schedule} jobs are scheduled." f" Reschedule to get {len(available)}") return False return True # mark the schedule invalid if it does not contain enough steps per container if not check_schedule(): self.mark_schedule_invalid() except Exception as ex: Logger.error(ex, traceback.print_exc())
[docs] def configure_lab(self, yaml_file: str) -> bool: scheduler_client = self.scheduler_client if scheduler_client: scheduler_client.LabConfigurationController.LoadJobShopFromFile(ConfigurationFile=yaml_file) return True return False
@property def scheduler_client(self) -> Optional[SchedulerClient]: # check if the scheduler is alive before returning it self.is_connected_to_scheduler() return self._scheduler_client
[docs] def try_scheduler_connection(self, timeout=5): """ Tries to find a scheduler in the network and establishes connection in both directions :return: """ try: with warnings.catch_warnings(): warnings.simplefilter("ignore") # Any warnings issued here will be suppressed self._scheduler_client = SchedulerClient.discover(server_name='Scheduler', insecure=True, timeout=timeout) Logger.info("Connected scheduler and worker :-)") try: if self.lab_config_file: self.configure_lab(self.lab_config_file) Logger.info("Configured the lab in scheduler") except Exception as ex: Logger.warning(f"Lab configuration failed: {ex}\n{traceback.print_exc()}") except TimeoutError as ex: Logger.warning("Could not discover suitable scheduler server.")
[docs] def is_connected_to_scheduler(self) -> bool: """ Tests and returns whether a scheduler server is connected :return: """ if self._scheduler_client is None: return False try: self._scheduler_client.SiLAService.ServerName.get() return True except SilaConnectionError: self._scheduler_client = None Logger.warning(f"The scheduler seems to have gone offline") return False
[docs] def scheduler_heart_beat(self): """ Frequently checks if the scheduler server is still online while the scheduler client is not None. """ counter = 0 while True: time.sleep(3) # check whether the server to an existing client is still online self.is_connected_to_scheduler() # find a scheduler server if necessary if self._scheduler_client is None: counter += 1 Logger.info(f"connection attempts: {counter}") with warnings.catch_warnings(): warnings.simplefilter("ignore") # Any warnings issued here will be suppressed self.try_scheduler_connection()
[docs] def hold_rescheduling(self): self._hold_rescheduling = True
[docs] def continue_rescheduling(self): self._hold_rescheduling = False
[docs] def mark_schedule_suboptimal(self): if AUTOMATIC_RESCHEDULING: self._schedule_optimal = False
[docs] def mark_schedule_invalid(self, enforce: bool = AUTOMATIC_RESCHEDULING): """ Sets the schedule to invalid and thereby triggers a rescheduling. If AUTOMATIC_RESCHEDULING is deactivated this will not happen unless the parameter enforce is set (which for example happens after a process is started). """ if enforce: self._schedule_valid = False
[docs] def is_rescheduling(self): return self._rescheduling
[docs] def schedule_executable(self) -> bool: if not self._schedule_valid: return False if self._rescheduling: return False return True