"""
The ScheduleManager oversees the scheduling and rescheduling process. It should be notified, when the current schedule
gets invalid for some reason. It starts and tries to maintain a connection to a scheduler_server. From this server
it requests new schedules when required. It uses the WFG class to encode and decode messages to the scheduling server.
"""
import time
import traceback
from threading import Thread
from enum import Enum
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import networkx as nx
import warnings
from labscheduler.dev_tools.algorithms_eval_config import time_limit
from sila2.framework import CommandExecutionStatus, SilaConnectionError
from laborchestrator.structures import SchedulingInstance, ProcessStep, Schedule, ProcessExecutionState
from laborchestrator.logging_manager import StandardLogger as Logger
try:
from labscheduler.sila_server import Client as SchedulerClient
except ModuleNotFoundError:
from sila2.client import SilaClient as SchedulerClient
from laborchestrator.workflowgraph import WorkFlowGraph as wfg
from laborchestrator.database_integration import StatusDBInterface
[docs]
class ScheduleQuality(Enum):
OPTIMAL = 0
FEASIBLE = 1
INFEASIBLE = 2
NOT_FOUND = 3
UNKNOWN = 4
AUTOMATIC_RESCHEDULING = True
[docs]
class ScheduleManager:
_scheduler_client: Optional[SchedulerClient] = None
_rescheduling: bool
_schedule_optimal: bool
_schedule_valid: bool
jssp: SchedulingInstance
_hold_rescheduling: bool
time_limit_short = 2
time_limit_long = 5
db_client: Optional[StatusDBInterface]
schedule_quality: ScheduleQuality = ScheduleQuality.UNKNOWN
def __init__(self, jssp: SchedulingInstance, db_client: Optional[StatusDBInterface] = None):
# TODO Change this from yaml to a proper format
self.lab_config_file = None # used to configure the scheduler on reconnection
self._rescheduling = False
self._changing_wfg = False
self._hold_rescheduling = False
self._schedule_valid = True
self._schedule_optimal = True
self.jssp = jssp
self.db_client = db_client
Thread(target=self._manage_rescheduling, daemon=True).start()
Thread(target=self.keep_horizon, daemon=True).start()
Thread(target=self.scheduler_heart_beat, daemon=True).start()
[docs]
def _manage_rescheduling(self):
while True:
if not (self.is_rescheduling() or self._hold_rescheduling):
try:
start = time.time()
if self._scheduler_client is not None:
# try to find a solution quickly
if not self._schedule_optimal or not self._schedule_valid:
self._reschedule(time_limit=self.time_limit_short, try_hard=False)
# if the schedule is still invalid try hard to find a solution
if not self._schedule_valid and self.time_limit_long > self.time_limit_short:
self._reschedule(time_limit=self.time_limit_long, try_hard=True)
# do not immediately try again when scheduling failed
if not self._schedule_valid and time.time() - start < 5:
time.sleep(3)
except Exception as ex:
Logger.error(f"{ex}\n, {traceback.print_exc()}")
time.sleep(3)
time.sleep(.1)
[docs]
def _reschedule(self, time_limit: float, try_hard: bool) -> None:
"""
Handles the flags and result from a scheduling attempt
:param time_limit:
:param try_hard:
:return:
"""
try:
start = time.time()
Logger.info("rescheduling")
self._rescheduling = True
# when the schedule gets marked invalid again during rescheduling, we will have to reschedule again
was_valid = self._schedule_valid
self._schedule_valid = True
algo_info = self.scheduler_client.SchedulingService.CurrentAlgorithm.get()
max_problem_size = algo_info.MaxProblemSize
J = self.extract_near_future(max_problem_size)
self.jssp.future = J
Logger.info(f"{time.time()-start}: problem size: {len(J)}, max problem size: {max_problem_size}")
# check whether there is anything to schedule
if not J or len(J) == 0:
self._schedule_optimal = True
return
if self.db_client:
not_started = [job for job in J.values() if not job.start]
guesses = self.db_client.get_estimated_durations(not_started)
for job, guess in zip(not_started, guesses):
if guess:
job.duration = guess
Logger.info(f'{time.time()-start}:time guessing done')
# the actual scheduling attempt
schedule, quality = self._get_schedule_from_scheduler(time_limit, J)
if quality in ScheduleQuality.__members__:
self.schedule_quality = ScheduleQuality[quality]
else:
Logger.error(f"{quality} is no member of SolutionQuality.")
Logger.info(f'{time.time()-start}:got the {quality} schedule from scheduler')
if schedule is None:
self.schedule_quality = ScheduleQuality.NOT_FOUND
success = False
# set the validity back
self._schedule_valid = was_valid
else:
self.jssp.set_schedule(schedule)
success = True
self._schedule_optimal = True
return success
except Exception as ex:
Logger.error(f"{ex}\n, {traceback.print_exc()}")
finally:
self._rescheduling = False
[docs]
def _get_schedule_from_scheduler(self, time_limit: float, J: Dict[str, ProcessStep]) -> (
Tuple[Optional[Schedule], str]
):
"""
Requests a schedule from the scheduler server
:param time_limit: limit for the computation time
:param J: dictionary of jobs to schedule
:return: A schedule if the scheduler found one, else None
"""
scheduler_client = self.scheduler_client
if scheduler_client:
sila_wfg = wfg.create_sila_structure_from_jobs(J.values(), self.jssp.combined_wfg)
processes = [p for name, p in self.jssp.process_by_name.items() if name in self._processes_to_schedule]
wfg.add_waiting_dummies(sila_wfg, processes)
Logger.debug(datetime.now())
cmd = scheduler_client.SchedulingService.ComputeSchedule(sila_wfg, time_limit)
start = time.time()
# wait for the command to finish and get the result
while not cmd.done:
time.sleep(.05)
if self._scheduler_client is None:
Logger.warning("Scheduler went offline while scheduling command was running.")
break
# if the server is restarted while this command is running,
# this would be an infinite loop and block rescheduling
if time.time() > start + 2 * time_limit + 10:
Logger.warning(f"Interupting solver after {time.time() - start} seconds"
f" exceeding time limit of {time_limit} seconds too much.")
break
Logger.debug(datetime.now())
if cmd.status == CommandExecutionStatus.finishedSuccessfully:
result = cmd.get_responses().Result
sila_schedule = result.Schedule
# the quality is either "OPTIMAL", "FEASIBLE" or "INFEASIBLE"
schedule_quality = result.SolutionQuality
schedule = wfg.create_schedule_from_sila_struct(sila_schedule)
return schedule, schedule_quality
return None, "INFEASIBLE"
@property
def _processes_to_schedule(self) -> list[str]:
# get the current state of every process
states = self.jssp.process_stati_by_name
# those get always scheduled
states_to_schedule = {ProcessExecutionState.RUNNING}
# i no preocess is running, the paused and scheduled processes are included in the schedule
if ProcessExecutionState.RUNNING not in states.values():
states_to_schedule.add(ProcessExecutionState.PAUSED)
# scheduled processes are mainly used to check, how the schedule would look
states_to_schedule.add(ProcessExecutionState.SCHEDULED)
# if none is active, add all finished processes (for a nice view of everything)
if ProcessExecutionState.SCHEDULED not in states.values() and\
ProcessExecutionState.PAUSED not in states.values():
states_to_schedule.add(ProcessExecutionState.FINISHED)
return [name for name, state in states.items() if state in states_to_schedule]
[docs]
def keep_horizon(self):
last_scheduled = 0
while True:
time.sleep(4 * self.time_limit_short + 10)
try:
if self._rescheduling:
continue
if not self.scheduler_client:
continue
algo_info = self.scheduler_client.SchedulingService.CurrentAlgorithm.get()
max_problem_size = algo_info.MaxProblemSize
J = self.extract_near_future(max_problem_size)
scheduled = self.jssp.schedule.keys()
# check whether at least a third of the available steps for each available container is scheduled
def check_schedule() -> bool:
for name, process in self.jssp.process_by_name.items():
if name in self._processes_to_schedule:
for cont in process.containers:
if not cont.is_reagent:
available = [idx for idx, job in J.items() if not job.start and
cont.name in job.cont_names]
in_schedule = sum(idx in scheduled for idx in available)
if in_schedule < len(available)/3:
Logger.info(f"for container {cont.name} only {in_schedule} jobs are scheduled."
f" Reschedule to get {len(available)}")
return False
return True
# mark the schedule invalid if it does not contain enough steps per container
if not check_schedule():
self.mark_schedule_invalid()
except Exception as ex:
Logger.error(ex, traceback.print_exc())
@property
def scheduler_client(self) -> Optional[SchedulerClient]:
# check if the scheduler is alive before returning it
self.is_connected_to_scheduler()
return self._scheduler_client
[docs]
def try_scheduler_connection(self, timeout=5):
"""
Tries to find a scheduler in the network and establishes connection in both directions
:return:
"""
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Any warnings issued here will be suppressed
self._scheduler_client = SchedulerClient.discover(server_name='Scheduler', insecure=True,
timeout=timeout)
Logger.info("Connected scheduler and worker :-)")
try:
if self.lab_config_file:
self.configure_lab(self.lab_config_file)
Logger.info("Configured the lab in scheduler")
except Exception as ex:
Logger.warning(f"Lab configuration failed: {ex}\n{traceback.print_exc()}")
except TimeoutError as ex:
Logger.warning("Could not discover suitable scheduler server.")
[docs]
def is_connected_to_scheduler(self) -> bool:
"""
Tests and returns whether a scheduler server is connected
:return:
"""
if self._scheduler_client is None:
return False
try:
self._scheduler_client.SiLAService.ServerName.get()
return True
except SilaConnectionError:
self._scheduler_client = None
Logger.warning(f"The scheduler seems to have gone offline")
return False
[docs]
def scheduler_heart_beat(self):
"""
Frequently checks if the scheduler server is still online while the scheduler client is not None.
"""
counter = 0
while True:
time.sleep(3)
# check whether the server to an existing client is still online
self.is_connected_to_scheduler()
# find a scheduler server if necessary
if self._scheduler_client is None:
counter += 1
Logger.info(f"connection attempts: {counter}")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Any warnings issued here will be suppressed
self.try_scheduler_connection()
[docs]
def hold_rescheduling(self):
self._hold_rescheduling = True
[docs]
def continue_rescheduling(self):
self._hold_rescheduling = False
[docs]
def mark_schedule_suboptimal(self):
if AUTOMATIC_RESCHEDULING:
self._schedule_optimal = False
[docs]
def mark_schedule_invalid(self, enforce: bool = AUTOMATIC_RESCHEDULING):
"""
Sets the schedule to invalid and thereby triggers a rescheduling. If AUTOMATIC_RESCHEDULING is deactivated
this will not happen unless the parameter enforce is set (which for example happens after a process is started).
"""
if enforce:
self._schedule_valid = False
[docs]
def is_rescheduling(self):
return self._rescheduling
[docs]
def schedule_executable(self) -> bool:
if not self._schedule_valid:
return False
if self._rescheduling:
return False
return True