laborchestrator.engine package

Submodules

Module contents

class laborchestrator.engine.ScheduleManager(jssp: SchedulingInstance, db_client: StatusDBInterface | None = None)[source]

Bases: object

_get_schedule_from_scheduler(time_limit: float, J: Dict[str, ProcessStep]) Tuple[Schedule | None, str][source]

Requests a schedule from the scheduler server :param time_limit: limit for the computation time :param J: dictionary of jobs to schedule :return: A schedule if the scheduler found one, else None

_hold_rescheduling: bool
_manage_rescheduling()[source]
property _processes_to_schedule: list[str]
_reschedule(time_limit: float, try_hard: bool) None[source]

Handles the flags and result from a scheduling attempt :param time_limit: :param try_hard: :return:

_rescheduling: bool
_schedule_optimal: bool
_schedule_valid: bool
_scheduler_client: Client | None = None
configure_lab(yaml_file: str) bool[source]
continue_rescheduling()[source]
db_client: StatusDBInterface | None
extract_near_future(n_steps: int) Dict[str, ProcessStep][source]
hold_rescheduling()[source]
is_connected_to_scheduler() bool[source]

Tests and returns whether a scheduler server is connected :return:

is_rescheduling()[source]
jssp: SchedulingInstance
keep_horizon()[source]
mark_schedule_invalid(enforce: bool = True)[source]

Sets the schedule to invalid and thereby triggers a rescheduling. If AUTOMATIC_RESCHEDULING is deactivated this will not happen unless the parameter enforce is set (which for example happens after a process is started).

mark_schedule_suboptimal()[source]
schedule_executable() bool[source]
schedule_quality: ScheduleQuality = 4
property scheduler_client: Client | None
scheduler_heart_beat()[source]

Frequently checks if the scheduler server is still online while the scheduler client is not None.

time_limit_long = 5
time_limit_short = 2
try_scheduler_connection(timeout=5)[source]

Tries to find a scheduler in the network and establishes connection in both directions :return:

class laborchestrator.engine.WFGManager(jssp: SchedulingInstance, schedule_manager: ScheduleManager)[source]

Bases: object

_check_wfg()[source]
static _do_computation(computation: Computation, operable)[source]
_eval_if(if_node: IfNode, operable: Dict[str, Variable])[source]
static _set_variable(var: Variable, operable)[source]
_wfg_checker()[source]
complete_preferences()[source]

Writes the preferences of for used devices into origin and destination of adjacent MoveSteps (if uniquely determined). This method is just for convenience. :return:

jssp: SchedulingInstance
set_origins()[source]

The origin device types of MoveJobs are convenient to have, but might change at runtime. This method sets all the definite ones. :return:

class laborchestrator.engine.WorkerInterface(jssp: SchedulingInstance, schedule_manager: ScheduleManager, db_client: StatusDBInterface)[source]

Bases: object

_work()[source]
check_prerequisites(process: SMProcess) Tuple[bool, str][source]

This method will be called when a process is started (Not when it is resumed) :param process: The process object, that just started :return: A report (as string) of problems found

db_client: StatusDBInterface
determine_destination_position(step: MoveStep) int | None[source]

The position in the destination device is set at runtime according to free space. By default, it is the next free one or a given preference (if that is free) :param step: :return: index of the position or None if none is available

execute_process_step(step_id: str, device: str, device_kwargs: Dict[str, Any]) ClientObservableCommandInstance | ObservableProtocolHandler[source]

Gets called, when the time for step has come, all prerequisites for the step are fulfilled and when the assigned device has capacity. Overwrite it, to make something happen. :param device_kwargs: arguments to be forwarded to the server :param step_id: :param device: :return:

job_is_due(step: ProcessStep, assignment: ScheduledAssignment)[source]

checks whether the given job should be stated now :param step: the job to investigate :param assignment: the job’s scheduled assignment to time and device :return:

observation_handlers: Dict[str, ClientObservableCommandInstance | ObservableProtocolHandler]
process_step_finished(step_id: str, result: NamedTuple | None)[source]

Gets called, when the corresponding step finished. Overwrite it, to make something happen. :param step_id: :param result: Might be None :return:

simulate_process_step(step_id: str, device: str, device_kwargs: Dict[str, Any]) ClientObservableCommandInstance | ObservableProtocolHandler[source]
simulation_mode: bool = False
class laborchestrator.engine.WorkerObserver(wfg_manager: WFGManager, schedule_manager: ScheduleManager, jssp: SchedulingInstance, worker: WorkerInterface)[source]

Bases: object

_observe_protocol(step_id: str)[source]
is_delayed_significantly(job: ProcessStep) Tuple[bool, float][source]
jssp: SchedulingInstance
observe()[source]

Master thread :return:

observed_jobs: Set[str]
schedule_manager: ScheduleManager
wfg_manager: WFGManager
worker: WorkerInterface