Source code for laborchestrator.old_dash_app

import traceback

from dash import dcc, html, no_update
import time
from laborchestrator.logging_manager import StandardLogger as Logger
from dash.dependencies import State, Input, Output
from dash_extensions.enrich import MultiplexerTransform, DashProxy
from .traffic_light import TrafficLight, register_traffic_light_callbacks
import dash_interactive_graphviz
from threading import Thread
from laborchestrator.structures import ProcessStep, Variable, StepStatus
from laborchestrator.pythonlab_process_finder import ProcessFinder, ImportableProcess
from laborchestrator.orchestrator_interface import OrchestratorInterface
from laborchestrator.orchestrator_implementation import Orchestrator
import sys
import os
from importlib import reload

tests_dir = os.path.join(os.path.dirname(__file__), '..', 'tests')
sys.path.append(tests_dir)
import test_data


[docs] def to_process_id(ip: ImportableProcess): return f"{ip.module.__name__}.{ip.name}"
block_style = { 'display': 'flex', 'flexDirection': 'column', # Stack children vertically 'alignItems': 'center', # Center align vertically in the block 'justifyContent': 'center', # Optional: center align content within block 'verticalAlign': 'top', # Align block with rest of the app }
[docs] def create_timestamp() -> str: lct = time.localtime() y, mo, d, h, mi, s = [str(lct.__getattribute__(v)).rjust(2, '0') for v in ['tm_year', 'tm_mon', 'tm_mday', 'tm_hour', 'tm_min', 'tm_sec']] timestamp = f"{y}{mo}{d}_{h}{mi}{s}" return timestamp
[docs] class SMDashApp: # sm_interface: OrchestratorInterface sm_interface: Orchestrator # bit hacky, but makes it easier to program. This GUI will be replaced anyway app: DashProxy p: Thread selected_operation: str @property def importable_processes(self): # refresh the modules, so changes in the process apply reload(self.process_module) return ProcessFinder.get_processes(self.process_module) def __init__(self, sm_interface: OrchestratorInterface, port=8050, process_module=test_data): self.sm_interface = sm_interface self.process_module = process_module self.test_view = False self.process_to_add = None self.process_type = {} #self.last_jssp = {idx: job.status for idx, job in sm_interface.jssp.definite_job_by_id.items()} self.app = DashProxy("Orchestrator webinterface", prevent_initial_callbacks=True, suppress_callback_exceptions=True, transforms=[MultiplexerTransform()]) self.p = Thread(target=self.app.run, daemon=True, kwargs=dict(debug=False, port=port), ) self.selected_operation = "" self.app.layout = html.Div(children=[ html.H1(children='Status of the scheduled processes'), html.Div(children=''' A web application to observe, schedule and manipulate processes running with the pythonlaborchestrator. '''), dcc.Graph( id='gantt', figure="" ), html.Div(children=[ html.Div(children=[ dcc.Checklist(id='stop_gantt', options=[{'label': 'Update Gantt-Chart', 'value': 'yes'}], value=['yes']), dcc.Checklist(id='stop_wfg', options=[{'label': 'Update WFG', 'value': 'yes'}], value=['yes']), html.Div(children=[html.Button('Recover error', id='action', n_clicks=0), dcc.Checklist(id='repeat', options=[{'label': 'Repeat?', 'value': 'yes'}], value=[], style={'display': 'inline-block'})]), html.Button('Run Pre Check', id='pre_check', n_clicks=0), html.Button("Export current WFG", id='export', n_clicks=0), html.Button(children=["Export current", html.Br(), "Scheduling Problem"], id='export_problem', n_clicks=0), ], style=block_style ), html.Div(children=[ html.Div(id='delay_label', children="Delay in minutes:"), dcc.Textarea(id="delay", value='0', style={'width': 100, 'height': 20}), dcc.Dropdown(options=[{'value': to_process_id(ip), 'label': ip.name} for ip in self.importable_processes], id='process_select', style={"width": 250}), html.Button("Add process", id='add_process', n_clicks=0), ], style=block_style) , html.Div(children=[ dcc.Dropdown(options=[], id='processes', style={"width": 250}), html.Button('Stop Process', id='stop_process', n_clicks=0), html.Button('Start Process', id='continue_process', n_clicks=0), html.Button('Remove Process', id='remove_process', n_clicks=0), html.Button('Schedule Process', id='schedule_process', n_clicks=0), html.Button('Reschedule', id='reschedule', n_clicks=0), ], style=block_style ), html.Div(children=[ html.Button("Add Containers to DB", id='add_to_db', n_clicks=0), html.Button("Mark Containers removed", id='remove_from_db', n_clicks=0), ], style=block_style), html.Div(children=[ html.Div(id='schedule_time_label', children=["Schedule ", html.Br(), "computing time ", html.Br(), " in seconds:"]), dcc.Textarea(value=self.sm_interface.get_parameter("scheduling_time"), id="schedule_time", style={'width': 30, 'height': 20}), html.Button("Get", id="schedule_time_get", n_clicks=0), html.Button("Set", id="schedule_time_set", n_clicks=0), ], style=block_style), TrafficLight("traffic_light", size=40), ], style={ 'display': 'flex', # Place blocks side by side 'alignItems': 'flex-start', # Align blocks at the top } ), #html.Button("Emergency STOP", id='emergency_stop', n_clicks=0, # style={'background-color': 'red', 'font-size': '55px', 'width': '300px', 'height': '150px', # 'vertical-align': 'top', 'display': 'inline-block'}), dash_interactive_graphviz.DashInteractiveGraphviz( id="wfg", engine='dot', dot_source="digraph {\n\thello -> world\n}" ), html.Div(id='node', children='No node selected'), html.Div(id='visu', children=None), dcc.Interval( id='interval-component', interval=500, n_intervals=0 ), ],) register_traffic_light_callbacks(self.app, "traffic_light") @self.app.callback( Input(component_id='export_problem', component_property='n_clicks'), Output(component_id='export_problem', component_property='n_clicks') ) def export_current_scheduling_problem(n_clicks): timestamp = create_timestamp() filename = f"jssp_{timestamp}.json" self.sm_interface.export_current_scheduling_problem(filename) @self.app.callback( Input(component_id='process_select', component_property='value'), Output(component_id='process_select', component_property='options') ) def refresh_dropdown_menu(n_clicks): # refreshes the options according to what's in the file system return [{'value': to_process_id(ip), 'label': ip.name} for ip in self.importable_processes] @self.app.callback( Input(component_id='remove_from_db', component_property='n_clicks'), State(component_id='processes', component_property='value'), Output(component_id='remove_from_db', component_property='n_clicks') ) def remove_containers_from_db(n_clicks, value): # todo this is not, how it should be done. use interface methods instead. labware_to_remove = [c.name for c in self.sm_interface.jssp.process_by_name[value].containers] self.sm_interface.remove_labware(labware_ids=labware_to_remove) return no_update @self.app.callback( Input(component_id='add_to_db', component_property='n_clicks'), State(component_id='processes', component_property='value'), Output(component_id='add_to_db', component_property='n_clicks'), ) def add_containers_to_db(n_clicks, value): present, missing = self.sm_interface.check_labware_presence([value]) Logger.info(f"found: {present}\nmissing: {missing}") self.sm_interface.add_labware(missing) return no_update @self.app.callback( Input(component_id='export', component_property='n_clicks'), State(component_id='gantt', component_property='figure'), State(component_id='wfg', component_property='dot_source'), Output(component_id='export', component_property='n_clicks'), ) def export_state(n_clicks, gantt, wfg): timestamp = create_timestamp() with open(f"WFG_{timestamp}.txt", "w") as writer: writer.write(wfg) with open(f"Schedule_{timestamp}.txt", "w") as writer: writer.write(self.sm_interface.gantt_chart_scheduled_processes.to_json()) return no_update @self.app.callback( Input(component_id='add_process', component_property='n_clicks'), Output(component_id='process_select', component_property='title'), State(component_id='process_select', component_property='value'), State(component_id='delay', component_property='value'), ) def add_process(n_clicks, value, delay): Logger.info(f"add {value} with {delay} minutes delay ") try: delay = int(delay) except: Logger.warning(f"could not translate delay {delay} to integer") delay = 0 for ip in self.importable_processes: p_id = ProcessFinder.to_process_id(ip) if p_id == value: process = ProcessFinder.create_process(ip) name = self.sm_interface.add_process(process_object=process, delay=delay) self.process_type[name] = p_id.split('.')[-1] return 'chosen' return 'chosen' @self.app.callback( Input('interval-component', 'n_intervals'), Output(component_id='processes', component_property='options') ) def update_active_processes(n_intervals): options = [] for process in self.sm_interface.processes: name = process.name options.append(dict(value=name, label=f"{name} ({self.process_type[name] if name in self.process_type else ''})")) return options @self.app.callback( Input(component_id='continue_process', component_property='n_clicks'), State(component_id='processes', component_property='value'), Output(component_id='processes', component_property='value'), ) def start_process(n_clicks, value): self.sm_interface.start_processes([value]) return no_update @self.app.callback( Input(component_id='stop_process', component_property='n_clicks'), State(component_id='processes', component_property='value'), Output(component_id='processes', component_property='value'), ) def stop_process(n_clicks, value): self.sm_interface.stop_processes([value]) return no_update @self.app.callback( Input(component_id='remove_process', component_property='n_clicks'), State(component_id='processes', component_property='value'), Output(component_id='processes', component_property='value'), ) def remove_process(n_clicks, value): self.sm_interface.remove_processes([value]) return no_update @self.app.callback( Input(component_id='schedule_process', component_property='n_clicks'), State(component_id='processes', component_property='value'), Output(component_id='processes', component_property='value'), ) def schedule_process(n_clicks, value): try: self.sm_interface.add_to_schedule(value) except Exception as ex: Logger.exception(traceback.print_exc()) return no_update @self.app.callback( Input(component_id='pre_check', component_property='n_clicks'), State(component_id='processes', component_property='value'), Output(component_id='node', component_property='children'), ) def pre_check_process(n_clicks, value): try: process = self.sm_interface.jssp.process_by_name[value] ready, report = self.sm_interface.worker.check_prerequisites(process) return f"Ready to start:{ready}\n\n{report}" except Exception as ex: print(ex, traceback.print_exc()) return no_update @self.app.callback( Input(component_id='reschedule', component_property='n_clicks'), Output(component_id='reschedule', component_property='n_clicks'), ) def reschedule(n_clicks): # this is not using a proper interface method self.sm_interface.schedule_manager.mark_schedule_invalid(enforce=True) @self.app.callback( Input(component_id='schedule_time_get', component_property='n_clicks'), Output(component_id='schedule_time', component_property='value'), ) def get_schedule_time(n_clicks): return self.sm_interface.get_parameter("scheduling_time") @self.app.callback( Input(component_id='schedule_time_set', component_property='n_clicks'), State(component_id="schedule_time", component_property="value"), Output(component_id='schedule_time_get', component_property='n_clicks'), ) def set_schedule_time(n_clicks, value): print(f"Setting scheduling time to {value}") self.sm_interface.set_parameter("scheduling_time", value) return no_update @self.app.callback( Input(component_id="emergency_stop", component_property='n_clicks'), Output(component_id="emergency_stop", component_property='n_clicks'), ) def stop(n_clicks): print("this is just a test") @self.app.callback( Output(component_id='node', component_property='children'), Input(component_id='wfg', component_property='selected_node'), ) def select_operation(selected_node): self.selected_operation = selected_node if selected_node in self.sm_interface.jssp.operable: op = self.sm_interface.jssp.operable[selected_node] if isinstance(op, Variable): if op.result is not None: print('at this point we could visualize the variables result') return str(op) else: nx_attr = self.sm_interface.jssp.process_by_name['P1'].wfg.nodes[selected_node] if nx_attr['type'] == 'container': cont_name = nx_attr['name'] return str(self.sm_interface.jssp.container_info_by_name[cont_name]) return 'no operation selected' @self.app.callback( Output(component_id='visu', component_property='children'), Input(component_id='wfg', component_property='selected_node'), ) def select_operation2(selected_node): var = self.sm_interface.get_operable_node(selected_node) if isinstance(var, Variable): if var.result is not None: if "Data" in var.result.__dir__(): s = var.result.Data return None @self.app.callback( Output(component_id='node', component_property='children'), Input(component_id='action', component_property='n_clicks'), State(component_id='repeat', component_property='value') ) def manipulate(n_clicks, value): job = self.sm_interface.get_operable_node(self.selected_operation) repeat = 'yes' in value if not job.status == StepStatus.ERROR: print("Selected Operation is not in Error-State") Logger.warning("Selected Operation is not in Error-State") self.sm_interface.error_resolved(self.selected_operation, repeat_operation=repeat) return f"Recovered {self.selected_operation}, repeat: {repeat}" @self.app.callback( [Output(component_id='gantt', component_property='figure'), Output(component_id='traffic_light-active', component_property="data")], Input(component_id='interval-component', component_property='n_intervals'), State(component_id='stop_gantt', component_property='value') ) def refresh_gantt(n_intervals, value): if 'yes' in value: if n_intervals % 2 == 0: try: gantt = self.sm_interface.gantt_chart_scheduled_processes return [gantt, self.sm_interface.schedule_manager.schedule_quality.value] except Exception as ex: return [no_update, no_update] return no_update @self.app.callback( Output(component_id='wfg', component_property='dot_source'), Input('interval-component', 'n_intervals'), State(component_id='stop_wfg', component_property='value') ) def refresh_wfg(n_intervals, value): try: if 'yes' not in value: return no_update update = False #new_jssp = {idx: job.status for idx, job in sm_interface.jssp.definite_job_by_id.items()} #for idx, state in new_jssp.items(): # if idx not in self.last_jssp or not state == self.last_jssp[idx]: # update = True if n_intervals % 5 == 0: update = True if update: fig_str = str(self.sm_interface.workflow_graph_scheduled_processes) return fig_str return no_update except Exception as ex: print(ex, traceback.print_exc()) print("wfg-update failed") return no_update
[docs] def run(self): import logging if self.p.is_alive(): Logger.warning('Server is already running. Restarting server') self.stop() logging.getLogger('werkzeug').setLevel(logging.ERROR) self.p.start()
[docs] def stop(self): print("Sorry, I don't know, how to stop")