"""
Contains an implementation of ProcessReader for PythonLab processes
"""
import importlib
import logging
import typer
from pathlib import Path
from laborchestrator.logging_manager import StandardLogger as Logger
import networkx as nx
from laborchestrator.structures import (
SMProcess, ProcessStep, Variable, ContainerInfo, MoveStep, IfNode, Computation, UsedDevice, SchedulingInstance
)
from pythonlab.process import PLProcess
from pythonlab.pythonlab_reader import PLProcessReader
from laborchestrator.process_reader import ProcessReader
from laborchestrator.pythonlab_process_finder import ProcessFinder
import sys
from typing import List, Dict, Any, Optional
[docs]
class PythonLabReader(ProcessReader):
def __init__(self):
super(PythonLabReader, self).__init__()
[docs]
@staticmethod
def costs_from_prio(prio: float):
return 100 * 2 ** -prio
[docs]
@staticmethod
def preprocess_wfg(plp: PLProcess):
"""Some regularisation, translation of priorities into waiting costs and filling of missing values."""
g = plp.workflow
# add small waiting costs for regularization# _origin=origin,
for edge in g.edges.values():
if 'wait_cost' in edge:
edge['wait_cost'] += 1
# set the container priorities to the process priority if they have no individual priority
for cont in plp.labware_resources:
if cont.priority is None:
cont.priority = plp.priority
# add waiting costs for prioritised containers
for cont, node in zip(plp.labware_resources, plp.labware_nodes.values()):
for u, v in nx.dfs_edges(g, node):
g.nodes[node]['wait_to_start_costs'] = PythonLabReader.costs_from_prio(cont.priority)/2
if 'wait_cost' in g[u][v]:
g[u][v]['wait_cost'] += PythonLabReader.costs_from_prio(cont.priority)
[docs]
@staticmethod
def read_process(process: PLProcess | None = None, file_path: str | None = None, name: Optional[str] = None, src=None, **kwargs) -> SMProcess:
"""
The main function. It takes a PythonLabProcess and derives a SMProcess from it. The Information is stored
in Job and ContainerInfo classes from the structures-module.
"""
if not any([process, file_path, src]):
logging.error("Specify at least one of process, file_path or src")
if process:
process = PLProcessReader.parse_process_from_instance(process)
elif file_path:
process = PLProcessReader.parse_process_from_file_path(file_path)
else:
process = PLProcessReader.parse_process_from_source_code(src)
# take the class name if none is given
if name is None:
name = type(process).__name__
smp = SMProcess(name)
PythonLabReader.preprocess_wfg(process)
PythonLabReader.relabel_nodes(process, name)
g = process.workflow
smp.steps = PythonLabReader.read_jobs(g)
smp.containers = PythonLabReader.read_containers(g, process)
smp.variables = PythonLabReader.read_variables(g)
smp.computations = PythonLabReader.read_computations(g)
smp.if_nodes = PythonLabReader.read_if_nodes(g)
PythonLabReader.read_precedences(smp, g)
ProcessReader.adjust_opacities(smp)
smp.update_reagent_opacity()
PythonLabReader.read_wait_cons(smp, g)
PythonLabReader.fill_transfer_times(smp)
return smp
[docs]
@staticmethod
def read_jobs(g: nx.DiGraph):
"""
Extracts all information on jobs in a workflow. The attributes of the jobs are assumed to be in the
data-dictionary which networkx provides for each node.
:param g: The graph containing the whole workflow
:return: A list of all jobs described in the graph
"""
jobs = []
topo = list(nx.topological_sort(g))
for idx, data in g.nodes(data=True):
if data['type'] == 'operation':
# these are the preferred devices for this job
if 'executor' in data:
executors = {type(resource): resource.name for resource in data['executor']}
else:
executors = {}
main_device = data['device_type']
preferred_main = executors[main_device] if main_device in executors else None
kwargs = dict(
name=idx,
cont_names=data['cont_names'],
function=data['fct'],
used_devices=[UsedDevice(main_device, tag='main', preferred=preferred_main)],
duration=data['duration'] if 'duration' in data else 1, # might not be set, yet
label=data['name']
)
if 'reagents' in data:
kwargs['cont_names'].extend(labware.name for labware in data['reagents'])
if 'wait_to_start_costs' in data:
kwargs['wait_to_start_costs'] = data['wait_to_start_costs']
# we save all special data on a data dictionary
kwargs['data'] = {key: value for key, value in data.items() if key not in kwargs}
if data['fct'] == 'move':
# try to find where the container is before that move
origin = PythonLabReader.find_origin(idx, data, topo, g)
if origin:
kwargs['used_devices'].append(origin)
# figure out, where exactly to container is going
if 'position' in data:
kwargs['pref_dest_pos'] = data['position']
preferred_target = executors[data['target']] if data['target'] in executors else None
if "target_name" in data and not preferred_target:
preferred_target = data['target_name']
kwargs['used_devices'].append(UsedDevice(data['target'], tag='target', preferred=preferred_target))
jobs.append(MoveStep(**kwargs))
else:
jobs.append(ProcessStep(**kwargs))
return jobs
[docs]
@staticmethod
def find_origin(idx, data: Dict[str, Any], topo_sort: List[str], g: nx.DiGraph) -> Optional[UsedDevice]:
# sort all nodes before the current node topologically (shortest distance first)
ancestry = reversed(topo_sort[:topo_sort.index(idx)])
# asserting we can only move one container at a time
cont_name = data["cont_names"][0]
for m in ancestry:
data2 = g.nodes[m]
if data2['type'] == 'operation' and \
data2['fct'] == 'move' and \
cont_name in data2['cont_names']:
data['source'] = data2['target']
pref_source = data2['target_name']
Logger.debug(f"source of step {idx} set to {data['source']}|{pref_source}.")
return UsedDevice(data['source'], tag='origin', preferred=pref_source)
if data2['type'] == 'container' and \
data2['name'] == cont_name:
data['source'] = data2['origin_type']
pref_source = data2['origin']
Logger.debug(f"source of step {idx} set to {data['source']}|{pref_source}.")
return UsedDevice(data['source'], tag='origin', preferred=pref_source)
return None
[docs]
@staticmethod
def read_containers(g: nx.DiGraph, p: PLProcess):
containers = []
for idx in p.labware_nodes.values():
data = g.nodes[idx]
containers.append(ContainerInfo(
name=data['name'],
current_device=data['origin'],
current_pos=data['origin_pos'],
start_device=UsedDevice(device_type=data['origin_type'], name=data['origin'], preferred=data['origin']),
lidded=data['lidded'],
labware_type=data.get("plate_type", ""),
filled = data.get("filled", True), # it's an optional parameter in PythonLab
is_reagent="is_reagent" in data,
))
return containers
[docs]
@staticmethod
def read_variables(g: nx.DiGraph):
variables = []
for idx, data in g.nodes(data=True):
if data['type'] == 'variable':
kwargs = dict(
name=idx,
var_name=data['var_name']
)
if 'var_type' in data:
kwargs['var_type'] = data['var_type']
variables.append(Variable(**kwargs))
return variables
[docs]
@staticmethod
def read_computations(g: nx.DiGraph):
computations = []
for idx, data in g.nodes(data=True):
if data['type'] == 'computation':
kwargs = dict(
name=idx,
evaluation=data['function'],
var_name=data['var_name']
)
computations.append(Computation(**kwargs))
return computations
[docs]
@staticmethod
def read_if_nodes(g: nx.DiGraph):
if_nodes = []
for idx, data in g.nodes(data=True):
if data['type'] == 'if_node':
out = [v for buff, v in g.out_edges(idx)]
true_tree = [v for v in out if g[idx][v]['sub_tree']]
false_tree = [v for v in out if not g[idx][v]['sub_tree']]
kwargs = dict(
name=idx,
evaluation=data['function'],
true_tree=true_tree,
false_tree=false_tree,
)
if_nodes.append(IfNode(**kwargs))
return if_nodes
[docs]
@staticmethod
def fill_transfer_times(smp):
for job in smp.steps:
if isinstance(job, MoveStep):
job.duration = 40
if 'lidded' in job.data:
job.duration += 30
[docs]
@staticmethod
def relabel_nodes(p, prefix):
"""
Adds '[ProcessName]_' to all node labels (This makes them unique in the schedule, if the ProcessName is unique)
"""
label_map = {old: f"{prefix}_{old}" for old in list(p.workflow.nodes) + [r.name for r in p.labware_resources]}
nx.relabel_nodes(p.workflow, label_map, copy=False)
new_container_map = dict()
for name, node in p.labware_nodes.items():
nx_node = p.workflow.nodes[label_map[node]]
nx_node['name'] = prefix + '_' + nx_node['name']
new_container_map[nx_node['name']] = label_map[node]
# also change references to containers
for n, data in p.workflow.nodes(data=True):
if 'cont_names' in data:
data['cont_names'] = [label_map[name] for name in data['cont_names']]
for resource in p.labware_resources:
resource._name = label_map[resource.name]
# restore the links between name and label in the PLProcess accordingly
p.labware_nodes = new_container_map
[docs]
@staticmethod
def read_precedences(smp: SMProcess, g: nx.DiGraph):
operable = {node.name: node for node in smp.steps + smp.if_nodes + smp.variables + smp.computations}
for u, v, data in g.edges(data=True):
if u in operable:
if v in operable:
operable[v].prior.append(u)
elif v in []:
pass
for job in operable.values():
job.is_start = len(job.prior) == 0
[docs]
@staticmethod
def read_wait_cons(smp: SMProcess, g: nx.DiGraph):
job_by_id = {job.name: job for job in smp.steps}
# iterate through edges and copy the values into Job-class
for u, v, data in g.edges(data=True):
if v in job_by_id:
job = job_by_id[v]
if 'max_wait' in data:
# we can not yet handle maximum waiting time to start
if not job.is_start:
job.max_wait[u] = data['max_wait']
if 'wait_cost' in data:
job.wait_cost[u] = data['wait_cost']
if "min_wait" in data:
job.min_wait[u] = data['min_wait']
for n, data in g.nodes(data=True):
if 'wait_to_start_cost' in data:
job_by_id[n].wait_to_start_costs += data['wait_to_start_cost']
[docs]
@staticmethod
def get_available_processes(file_dir: str) -> List[Any]:
file_path = Path(file_dir).resolve()
parent_dir = str(file_path.parent.parent)
dir_name = file_path.parent.name
sys.path.append(parent_dir)
module = importlib.import_module(dir_name)
return ProcessFinder.get_processes(module)
[docs]
def read_process_test(
process: Path = typer.Option(..., "-p", "--process", help="PythonLab Process"),
show_wfg: bool = typer.Option(False, "--show-wfg", help="Visualize the WFG of the process"),
):
"""
This is intended for testing the import and reading of a PythonLab process.
"""
with process.open("r") as reader:
source_code = reader.read()
print("Successfully read file")
process = PythonLabReader.read_process(src=source_code)
print("process created")
if show_wfg:
jssp = SchedulingInstance()
jssp.add_process(process)
fig = jssp.visualize_wfg()
fig.render("wfg", view=True)
[docs]
def try_to_read_process():
typer.run(read_process_test)