import traceback
from dash import dcc, html, no_update
import time
from laborchestrator.logging_manager import StandardLogger as Logger
from dash.dependencies import State, Input, Output
from dash_extensions.enrich import MultiplexerTransform, DashProxy
from .traffic_light import TrafficLight, register_traffic_light_callbacks
import dash_interactive_graphviz
from threading import Thread
from laborchestrator.structures import ProcessStep, Variable, StepStatus
from laborchestrator.pythonlab_process_finder import ProcessFinder, ImportableProcess
from laborchestrator.orchestrator_interface import OrchestratorInterface
from laborchestrator.orchestrator_implementation import Orchestrator
import sys
import os
from importlib import reload
tests_dir = os.path.join(os.path.dirname(__file__), '..', 'tests')
sys.path.append(tests_dir)
import test_data
[docs]
def to_process_id(ip: ImportableProcess):
return f"{ip.module.__name__}.{ip.name}"
block_style = {
'display': 'flex',
'flexDirection': 'column', # Stack children vertically
'alignItems': 'center', # Center align vertically in the block
'justifyContent': 'center', # Optional: center align content within block
'verticalAlign': 'top', # Align block with rest of the app
}
[docs]
def create_timestamp() -> str:
lct = time.localtime()
y, mo, d, h, mi, s = [str(lct.__getattribute__(v)).rjust(2, '0') for v in
['tm_year', 'tm_mon', 'tm_mday', 'tm_hour', 'tm_min', 'tm_sec']]
timestamp = f"{y}{mo}{d}_{h}{mi}{s}"
return timestamp
[docs]
class SMDashApp:
# sm_interface: OrchestratorInterface
sm_interface: Orchestrator # bit hacky, but makes it easier to program. This GUI will be replaced anyway
app: DashProxy
p: Thread
selected_operation: str
@property
def importable_processes(self):
# refresh the modules, so changes in the process apply
reload(self.process_module)
return ProcessFinder.get_processes(self.process_module)
def __init__(self, sm_interface: OrchestratorInterface, port=8050, process_module=test_data):
self.sm_interface = sm_interface
self.process_module = process_module
self.test_view = False
self.process_to_add = None
self.process_type = {}
#self.last_jssp = {idx: job.status for idx, job in sm_interface.jssp.definite_job_by_id.items()}
self.app = DashProxy("Orchestrator webinterface", prevent_initial_callbacks=True,
suppress_callback_exceptions=True, transforms=[MultiplexerTransform()])
self.p = Thread(target=self.app.run, daemon=True, kwargs=dict(debug=False, port=port), )
self.selected_operation = ""
self.app.layout = html.Div(children=[
html.H1(children='Status of the scheduled processes'),
html.Div(children='''
A web application to observe, schedule and manipulate processes running with the pythonlaborchestrator.
'''),
dcc.Graph(
id='gantt',
figure=""
),
html.Div(children=[
html.Div(children=[
dcc.Checklist(id='stop_gantt', options=[{'label': 'Update Gantt-Chart', 'value': 'yes'}], value=['yes']),
dcc.Checklist(id='stop_wfg', options=[{'label': 'Update WFG', 'value': 'yes'}], value=['yes']),
html.Div(children=[html.Button('Recover error', id='action', n_clicks=0),
dcc.Checklist(id='repeat', options=[{'label': 'Repeat?', 'value': 'yes'}], value=[],
style={'display': 'inline-block'})]),
html.Button('Run Pre Check', id='pre_check', n_clicks=0),
html.Button("Export current WFG", id='export', n_clicks=0),
html.Button(children=["Export current", html.Br(), "Scheduling Problem"],
id='export_problem', n_clicks=0),
], style=block_style
),
html.Div(children=[
html.Div(id='delay_label', children="Delay in minutes:"),
dcc.Textarea(id="delay", value='0', style={'width': 100, 'height': 20}),
dcc.Dropdown(options=[{'value': to_process_id(ip), 'label': ip.name}
for ip in self.importable_processes],
id='process_select', style={"width": 250}),
html.Button("Add process", id='add_process', n_clicks=0),
], style=block_style)
,
html.Div(children=[
dcc.Dropdown(options=[], id='processes', style={"width": 250}),
html.Button('Stop Process', id='stop_process', n_clicks=0),
html.Button('Start Process', id='continue_process', n_clicks=0),
html.Button('Remove Process', id='remove_process', n_clicks=0),
html.Button('Schedule Process', id='schedule_process', n_clicks=0),
html.Button('Reschedule', id='reschedule', n_clicks=0),
], style=block_style
),
html.Div(children=[
html.Button("Add Containers to DB", id='add_to_db', n_clicks=0),
html.Button("Mark Containers removed", id='remove_from_db', n_clicks=0),
], style=block_style),
html.Div(children=[
html.Div(id='schedule_time_label', children=["Schedule ", html.Br(), "computing time ", html.Br(), " in seconds:"]),
dcc.Textarea(value=self.sm_interface.get_parameter("scheduling_time"), id="schedule_time",
style={'width': 30, 'height': 20}),
html.Button("Get", id="schedule_time_get", n_clicks=0),
html.Button("Set", id="schedule_time_set", n_clicks=0),
], style=block_style),
TrafficLight("traffic_light", size=40),
], style={
'display': 'flex', # Place blocks side by side
'alignItems': 'flex-start', # Align blocks at the top
}
),
#html.Button("Emergency STOP", id='emergency_stop', n_clicks=0,
# style={'background-color': 'red', 'font-size': '55px', 'width': '300px', 'height': '150px',
# 'vertical-align': 'top', 'display': 'inline-block'}),
dash_interactive_graphviz.DashInteractiveGraphviz(
id="wfg", engine='dot',
dot_source="digraph {\n\thello -> world\n}"
),
html.Div(id='node', children='No node selected'),
html.Div(id='visu', children=None),
dcc.Interval(
id='interval-component',
interval=500,
n_intervals=0
),
],)
register_traffic_light_callbacks(self.app, "traffic_light")
@self.app.callback(
Input(component_id='export_problem', component_property='n_clicks'),
Output(component_id='export_problem', component_property='n_clicks')
)
def export_current_scheduling_problem(n_clicks):
timestamp = create_timestamp()
filename = f"jssp_{timestamp}.json"
self.sm_interface.export_current_scheduling_problem(filename)
@self.app.callback(
Input(component_id='process_select', component_property='value'),
Output(component_id='process_select', component_property='options')
)
def refresh_dropdown_menu(n_clicks):
# refreshes the options according to what's in the file system
return [{'value': to_process_id(ip), 'label': ip.name} for ip in self.importable_processes]
@self.app.callback(
Input(component_id='remove_from_db', component_property='n_clicks'),
State(component_id='processes', component_property='value'),
Output(component_id='remove_from_db', component_property='n_clicks')
)
def remove_containers_from_db(n_clicks, value):
# todo this is not, how it should be done. use interface methods instead.
labware_to_remove = [c.name for c in self.sm_interface.jssp.process_by_name[value].containers]
self.sm_interface.remove_labware(labware_ids=labware_to_remove)
return no_update
@self.app.callback(
Input(component_id='add_to_db', component_property='n_clicks'),
State(component_id='processes', component_property='value'),
Output(component_id='add_to_db', component_property='n_clicks'),
)
def add_containers_to_db(n_clicks, value):
present, missing = self.sm_interface.check_labware_presence([value])
Logger.info(f"found: {present}\nmissing: {missing}")
self.sm_interface.add_labware(missing)
return no_update
@self.app.callback(
Input(component_id='export', component_property='n_clicks'),
State(component_id='gantt', component_property='figure'),
State(component_id='wfg', component_property='dot_source'),
Output(component_id='export', component_property='n_clicks'),
)
def export_state(n_clicks, gantt, wfg):
timestamp = create_timestamp()
with open(f"WFG_{timestamp}.txt", "w") as writer:
writer.write(wfg)
with open(f"Schedule_{timestamp}.txt", "w") as writer:
writer.write(self.sm_interface.gantt_chart_scheduled_processes.to_json())
return no_update
@self.app.callback(
Input(component_id='add_process', component_property='n_clicks'),
Output(component_id='process_select', component_property='title'),
State(component_id='process_select', component_property='value'),
State(component_id='delay', component_property='value'),
)
def add_process(n_clicks, value, delay):
Logger.info(f"add {value} with {delay} minutes delay ")
try:
delay = int(delay)
except:
Logger.warning(f"could not translate delay {delay} to integer")
delay = 0
for ip in self.importable_processes:
p_id = ProcessFinder.to_process_id(ip)
if p_id == value:
process = ProcessFinder.create_process(ip)
name = self.sm_interface.add_process(process_object=process, delay=delay)
self.process_type[name] = p_id.split('.')[-1]
return 'chosen'
return 'chosen'
@self.app.callback(
Input('interval-component', 'n_intervals'),
Output(component_id='processes', component_property='options')
)
def update_active_processes(n_intervals):
options = []
for process in self.sm_interface.processes:
name = process.name
options.append(dict(value=name, label=f"{name} ({self.process_type[name] if name in self.process_type else ''})"))
return options
@self.app.callback(
Input(component_id='continue_process', component_property='n_clicks'),
State(component_id='processes', component_property='value'),
Output(component_id='processes', component_property='value'),
)
def start_process(n_clicks, value):
self.sm_interface.start_processes([value])
return no_update
@self.app.callback(
Input(component_id='stop_process', component_property='n_clicks'),
State(component_id='processes', component_property='value'),
Output(component_id='processes', component_property='value'),
)
def stop_process(n_clicks, value):
self.sm_interface.stop_processes([value])
return no_update
@self.app.callback(
Input(component_id='remove_process', component_property='n_clicks'),
State(component_id='processes', component_property='value'),
Output(component_id='processes', component_property='value'),
)
def remove_process(n_clicks, value):
self.sm_interface.remove_processes([value])
return no_update
@self.app.callback(
Input(component_id='schedule_process', component_property='n_clicks'),
State(component_id='processes', component_property='value'),
Output(component_id='processes', component_property='value'),
)
def schedule_process(n_clicks, value):
try:
self.sm_interface.add_to_schedule(value)
except Exception as ex:
Logger.exception(traceback.print_exc())
return no_update
@self.app.callback(
Input(component_id='pre_check', component_property='n_clicks'),
State(component_id='processes', component_property='value'),
Output(component_id='node', component_property='children'),
)
def pre_check_process(n_clicks, value):
try:
process = self.sm_interface.jssp.process_by_name[value]
ready, report = self.sm_interface.worker.check_prerequisites(process)
return f"Ready to start:{ready}\n\n{report}"
except Exception as ex:
print(ex, traceback.print_exc())
return no_update
@self.app.callback(
Input(component_id='reschedule', component_property='n_clicks'),
Output(component_id='reschedule', component_property='n_clicks'),
)
def reschedule(n_clicks):
# this is not using a proper interface method
self.sm_interface.schedule_manager.mark_schedule_invalid(enforce=True)
@self.app.callback(
Input(component_id='schedule_time_get', component_property='n_clicks'),
Output(component_id='schedule_time', component_property='value'),
)
def get_schedule_time(n_clicks):
return self.sm_interface.get_parameter("scheduling_time")
@self.app.callback(
Input(component_id='schedule_time_set', component_property='n_clicks'),
State(component_id="schedule_time", component_property="value"),
Output(component_id='schedule_time_get', component_property='n_clicks'),
)
def set_schedule_time(n_clicks, value):
print(f"Setting scheduling time to {value}")
self.sm_interface.set_parameter("scheduling_time", value)
return no_update
@self.app.callback(
Input(component_id="emergency_stop", component_property='n_clicks'),
Output(component_id="emergency_stop", component_property='n_clicks'),
)
def stop(n_clicks):
print("this is just a test")
@self.app.callback(
Output(component_id='node', component_property='children'),
Input(component_id='wfg', component_property='selected_node'),
)
def select_operation(selected_node):
self.selected_operation = selected_node
if selected_node in self.sm_interface.jssp.operable:
op = self.sm_interface.jssp.operable[selected_node]
if isinstance(op, Variable):
if op.result is not None:
print('at this point we could visualize the variables result')
return str(op)
else:
nx_attr = self.sm_interface.jssp.process_by_name['P1'].wfg.nodes[selected_node]
if nx_attr['type'] == 'container':
cont_name = nx_attr['name']
return str(self.sm_interface.jssp.container_info_by_name[cont_name])
return 'no operation selected'
@self.app.callback(
Output(component_id='visu', component_property='children'),
Input(component_id='wfg', component_property='selected_node'),
)
def select_operation2(selected_node):
var = self.sm_interface.get_operable_node(selected_node)
if isinstance(var, Variable):
if var.result is not None:
if "Data" in var.result.__dir__():
s = var.result.Data
return None
@self.app.callback(
Output(component_id='node', component_property='children'),
Input(component_id='action', component_property='n_clicks'),
State(component_id='repeat', component_property='value')
)
def manipulate(n_clicks, value):
job = self.sm_interface.get_operable_node(self.selected_operation)
repeat = 'yes' in value
if not job.status == StepStatus.ERROR:
print("Selected Operation is not in Error-State")
Logger.warning("Selected Operation is not in Error-State")
self.sm_interface.error_resolved(self.selected_operation, repeat_operation=repeat)
return f"Recovered {self.selected_operation}, repeat: {repeat}"
@self.app.callback(
[Output(component_id='gantt', component_property='figure'),
Output(component_id='traffic_light-active', component_property="data")],
Input(component_id='interval-component', component_property='n_intervals'),
State(component_id='stop_gantt', component_property='value')
)
def refresh_gantt(n_intervals, value):
if 'yes' in value:
if n_intervals % 2 == 0:
try:
gantt = self.sm_interface.gantt_chart_scheduled_processes
return [gantt, self.sm_interface.schedule_manager.schedule_quality.value]
except Exception as ex:
return [no_update, no_update]
return no_update
@self.app.callback(
Output(component_id='wfg', component_property='dot_source'),
Input('interval-component', 'n_intervals'),
State(component_id='stop_wfg', component_property='value')
)
def refresh_wfg(n_intervals, value):
try:
if 'yes' not in value:
return no_update
update = False
#new_jssp = {idx: job.status for idx, job in sm_interface.jssp.definite_job_by_id.items()}
#for idx, state in new_jssp.items():
# if idx not in self.last_jssp or not state == self.last_jssp[idx]:
# update = True
if n_intervals % 5 == 0:
update = True
if update:
fig_str = str(self.sm_interface.workflow_graph_scheduled_processes)
return fig_str
return no_update
except Exception as ex:
print(ex, traceback.print_exc())
print("wfg-update failed")
return no_update
[docs]
def run(self):
import logging
if self.p.is_alive():
Logger.warning('Server is already running. Restarting server')
self.stop()
logging.getLogger('werkzeug').setLevel(logging.ERROR)
self.p.start()
[docs]
def stop(self):
print("Sorry, I don't know, how to stop")