Source code for laborchestrator.pythonlab_reader

"""
Contains an implementation of ProcessReader for PythonLab processes
"""
import importlib
import logging
import typer
from pathlib import Path

from laborchestrator.logging_manager import StandardLogger as Logger

import networkx as nx
from laborchestrator.structures import (
    SMProcess, ProcessStep, Variable, ContainerInfo, MoveStep, IfNode, Computation, UsedDevice, SchedulingInstance
)
from pythonlab.process import PLProcess
from pythonlab.pythonlab_reader import PLProcessReader
from laborchestrator.process_reader import ProcessReader
from laborchestrator.pythonlab_process_finder import ProcessFinder
import sys
from typing import List, Dict, Any, Optional


[docs] class PythonLabReader(ProcessReader): def __init__(self): super(PythonLabReader, self).__init__()
[docs] @staticmethod def costs_from_prio(prio: float): return 100 * 2 ** -prio
[docs] @staticmethod def preprocess_wfg(plp: PLProcess): """Some regularisation, translation of priorities into waiting costs and filling of missing values.""" g = plp.workflow # add small waiting costs for regularization# _origin=origin, for edge in g.edges.values(): if 'wait_cost' in edge: edge['wait_cost'] += 1 # set the container priorities to the process priority if they have no individual priority for cont in plp.labware_resources: if cont.priority is None: cont.priority = plp.priority # add waiting costs for prioritised containers for cont, node in zip(plp.labware_resources, plp.labware_nodes.values()): for u, v in nx.dfs_edges(g, node): g.nodes[node]['wait_to_start_costs'] = PythonLabReader.costs_from_prio(cont.priority)/2 if 'wait_cost' in g[u][v]: g[u][v]['wait_cost'] += PythonLabReader.costs_from_prio(cont.priority)
[docs] @staticmethod def read_process(process: PLProcess | None = None, file_path: str | None = None, name: Optional[str] = None, src=None, **kwargs) -> SMProcess: """ The main function. It takes a PythonLabProcess and derives a SMProcess from it. The Information is stored in Job and ContainerInfo classes from the structures-module. """ if not any([process, file_path, src]): logging.error("Specify at least one of process, file_path or src") if process: process = PLProcessReader.parse_process_from_instance(process) elif file_path: process = PLProcessReader.parse_process_from_file_path(file_path) else: process = PLProcessReader.parse_process_from_source_code(src) # take the class name if none is given if name is None: name = type(process).__name__ smp = SMProcess(name) PythonLabReader.preprocess_wfg(process) PythonLabReader.relabel_nodes(process, name) g = process.workflow smp.steps = PythonLabReader.read_jobs(g) smp.containers = PythonLabReader.read_containers(g, process) smp.variables = PythonLabReader.read_variables(g) smp.computations = PythonLabReader.read_computations(g) smp.if_nodes = PythonLabReader.read_if_nodes(g) PythonLabReader.read_precedences(smp, g) ProcessReader.adjust_opacities(smp) smp.update_reagent_opacity() PythonLabReader.read_wait_cons(smp, g) PythonLabReader.fill_transfer_times(smp) return smp
[docs] @staticmethod def read_jobs(g: nx.DiGraph): """ Extracts all information on jobs in a workflow. The attributes of the jobs are assumed to be in the data-dictionary which networkx provides for each node. :param g: The graph containing the whole workflow :return: A list of all jobs described in the graph """ jobs = [] topo = list(nx.topological_sort(g)) for idx, data in g.nodes(data=True): if data['type'] == 'operation': # these are the preferred devices for this job if 'executor' in data: executors = {type(resource): resource.name for resource in data['executor']} else: executors = {} main_device = data['device_type'] preferred_main = executors[main_device] if main_device in executors else None kwargs = dict( name=idx, cont_names=data['cont_names'], function=data['fct'], used_devices=[UsedDevice(main_device, tag='main', preferred=preferred_main)], duration=data['duration'] if 'duration' in data else 1, # might not be set, yet label=data['name'] ) if 'reagents' in data: kwargs['cont_names'].extend(labware.name for labware in data['reagents']) if 'wait_to_start_costs' in data: kwargs['wait_to_start_costs'] = data['wait_to_start_costs'] # we save all special data on a data dictionary kwargs['data'] = {key: value for key, value in data.items() if key not in kwargs} if data['fct'] == 'move': # try to find where the container is before that move origin = PythonLabReader.find_origin(idx, data, topo, g) if origin: kwargs['used_devices'].append(origin) # figure out, where exactly to container is going if 'position' in data: kwargs['pref_dest_pos'] = data['position'] preferred_target = executors[data['target']] if data['target'] in executors else None if "target_name" in data and not preferred_target: preferred_target = data['target_name'] kwargs['used_devices'].append(UsedDevice(data['target'], tag='target', preferred=preferred_target)) jobs.append(MoveStep(**kwargs)) else: jobs.append(ProcessStep(**kwargs)) return jobs
[docs] @staticmethod def find_origin(idx, data: Dict[str, Any], topo_sort: List[str], g: nx.DiGraph) -> Optional[UsedDevice]: # sort all nodes before the current node topologically (shortest distance first) ancestry = reversed(topo_sort[:topo_sort.index(idx)]) # asserting we can only move one container at a time cont_name = data["cont_names"][0] for m in ancestry: data2 = g.nodes[m] if data2['type'] == 'operation' and \ data2['fct'] == 'move' and \ cont_name in data2['cont_names']: data['source'] = data2['target'] pref_source = data2['target_name'] Logger.debug(f"source of step {idx} set to {data['source']}|{pref_source}.") return UsedDevice(data['source'], tag='origin', preferred=pref_source) if data2['type'] == 'container' and \ data2['name'] == cont_name: data['source'] = data2['origin_type'] pref_source = data2['origin'] Logger.debug(f"source of step {idx} set to {data['source']}|{pref_source}.") return UsedDevice(data['source'], tag='origin', preferred=pref_source) return None
[docs] @staticmethod def read_containers(g: nx.DiGraph, p: PLProcess): containers = [] for idx in p.labware_nodes.values(): data = g.nodes[idx] containers.append(ContainerInfo( name=data['name'], current_device=data['origin'], current_pos=data['origin_pos'], start_device=UsedDevice(device_type=data['origin_type'], name=data['origin'], preferred=data['origin']), lidded=data['lidded'], labware_type=data.get("plate_type", ""), filled = data.get("filled", True), # it's an optional parameter in PythonLab is_reagent="is_reagent" in data, )) return containers
[docs] @staticmethod def read_variables(g: nx.DiGraph): variables = [] for idx, data in g.nodes(data=True): if data['type'] == 'variable': kwargs = dict( name=idx, var_name=data['var_name'] ) if 'var_type' in data: kwargs['var_type'] = data['var_type'] variables.append(Variable(**kwargs)) return variables
[docs] @staticmethod def read_computations(g: nx.DiGraph): computations = [] for idx, data in g.nodes(data=True): if data['type'] == 'computation': kwargs = dict( name=idx, evaluation=data['function'], var_name=data['var_name'] ) computations.append(Computation(**kwargs)) return computations
[docs] @staticmethod def read_if_nodes(g: nx.DiGraph): if_nodes = [] for idx, data in g.nodes(data=True): if data['type'] == 'if_node': out = [v for buff, v in g.out_edges(idx)] true_tree = [v for v in out if g[idx][v]['sub_tree']] false_tree = [v for v in out if not g[idx][v]['sub_tree']] kwargs = dict( name=idx, evaluation=data['function'], true_tree=true_tree, false_tree=false_tree, ) if_nodes.append(IfNode(**kwargs)) return if_nodes
[docs] @staticmethod def fill_transfer_times(smp): for job in smp.steps: if isinstance(job, MoveStep): job.duration = 40 if 'lidded' in job.data: job.duration += 30
[docs] @staticmethod def relabel_nodes(p, prefix): """ Adds '[ProcessName]_' to all node labels (This makes them unique in the schedule, if the ProcessName is unique) """ label_map = {old: f"{prefix}_{old}" for old in list(p.workflow.nodes) + [r.name for r in p.labware_resources]} nx.relabel_nodes(p.workflow, label_map, copy=False) new_container_map = dict() for name, node in p.labware_nodes.items(): nx_node = p.workflow.nodes[label_map[node]] nx_node['name'] = prefix + '_' + nx_node['name'] new_container_map[nx_node['name']] = label_map[node] # also change references to containers for n, data in p.workflow.nodes(data=True): if 'cont_names' in data: data['cont_names'] = [label_map[name] for name in data['cont_names']] for resource in p.labware_resources: resource._name = label_map[resource.name] # restore the links between name and label in the PLProcess accordingly p.labware_nodes = new_container_map
[docs] @staticmethod def read_precedences(smp: SMProcess, g: nx.DiGraph): operable = {node.name: node for node in smp.steps + smp.if_nodes + smp.variables + smp.computations} for u, v, data in g.edges(data=True): if u in operable: if v in operable: operable[v].prior.append(u) elif v in []: pass for job in operable.values(): job.is_start = len(job.prior) == 0
[docs] @staticmethod def read_wait_cons(smp: SMProcess, g: nx.DiGraph): job_by_id = {job.name: job for job in smp.steps} # iterate through edges and copy the values into Job-class for u, v, data in g.edges(data=True): if v in job_by_id: job = job_by_id[v] if 'max_wait' in data: # we can not yet handle maximum waiting time to start if not job.is_start: job.max_wait[u] = data['max_wait'] if 'wait_cost' in data: job.wait_cost[u] = data['wait_cost'] if "min_wait" in data: job.min_wait[u] = data['min_wait'] for n, data in g.nodes(data=True): if 'wait_to_start_cost' in data: job_by_id[n].wait_to_start_costs += data['wait_to_start_cost']
[docs] @staticmethod def get_available_processes(file_dir: str) -> List[Any]: file_path = Path(file_dir).resolve() parent_dir = str(file_path.parent.parent) dir_name = file_path.parent.name sys.path.append(parent_dir) module = importlib.import_module(dir_name) return ProcessFinder.get_processes(module)
[docs] def read_process_test( process: Path = typer.Option(..., "-p", "--process", help="PythonLab Process"), show_wfg: bool = typer.Option(False, "--show-wfg", help="Visualize the WFG of the process"), ): """ This is intended for testing the import and reading of a PythonLab process. """ with process.open("r") as reader: source_code = reader.read() print("Successfully read file") process = PythonLabReader.read_process(src=source_code) print("process created") if show_wfg: jssp = SchedulingInstance() jssp.add_process(process) fig = jssp.visualize_wfg() fig.render("wfg", view=True)
[docs] def try_to_read_process(): typer.run(read_process_test)