Source code for laborchestrator.pythonlab_process_finder

"""
This module provides the ProcessFinder.
A tool ta get a list of all available PythonLan processes in a specified directory or module.
"""
import os
import importlib
from importlib import util as import_util
import pkgutil
import traceback
from typing import NamedTuple, List, Optional
from types import ModuleType
import logging

from pythonlab.process import PLProcess

default_dir = os.path.dirname(__file__)


[docs] class ImportableProcess(NamedTuple): module: ModuleType name: str file_path: Optional[str] = None
[docs] class ProcessFinder:
[docs] @staticmethod def to_process_id(ip: ImportableProcess): """ In case two processes have the same name, this is a readable, unique string to distinguish them :param ip: :return: """ return f"{ip.module.__name__}.{ip.name}"
[docs] @staticmethod def create_process_from_id(process_id: str, pck: ModuleType) -> Optional[PLProcess]: try: importable = ProcessFinder.get_processes(pck) for p in importable: if ProcessFinder.to_process_id(p) == process_id: return ProcessFinder.create_process(p) except Exception as ex: Logger.error(f"Could not import {process_id}: {ex}\n {traceback.print_exc()}") return None
[docs] @staticmethod def get_processes(pck: ModuleType) -> List[ImportableProcess]: importlib.reload(pck) process_names = ProcessFinder._find_processes(pck) return process_names
[docs] @staticmethod def _find_processes(pck: ModuleType) -> List[ImportableProcess]: """ recursively iterate through namespace Specifying the second argument (prefix) to iter_modules makes the returned name an absolute name instead of a relative one. This allows import_module to work without having to do additional modification to the name. s. https://packaging.python.org/guides/creating-and-discovering-plugins/ """ processes = [] for finder, name, ispkg in pkgutil.iter_modules(pck.__path__): mod_name = pck.__name__ + "." + name try: submodule = importlib.import_module(mod_name) importlib.reload(submodule) if ispkg: pass else: for attr in dir(submodule): try: buff = getattr(submodule, attr)() if isinstance(buff, PLProcess): processes.append(ImportableProcess(submodule, attr)) except: # there will be lots of errors to ignore because not all attributes are callable #Logger.debug(traceback.print_exc()) pass except Exception as ex: #Logger.debug(f"{ex}\n{traceback.print_exc()}") pass return processes
[docs] @staticmethod def importable_processes_from_string(src: str, debug_printouts: bool = False) -> List[ImportableProcess]: found_processes = [] module_name = "tmp_module" spec = import_util.spec_from_loader(module_name, loader=None) tmp_module = import_util.module_from_spec(spec) exec(src, tmp_module.__dict__) for attr_name in dir(tmp_module): try: attr = getattr(tmp_module, attr_name) inst = attr() if isinstance(inst, PLProcess): found_processes.append(ImportableProcess(tmp_module, attr_name)) except: # there will be lots of errors to ignore because not all attributes are callable # Logger.debug(traceback.print_exc()) if debug_printouts: if callable(attr) and issubclass(attr, PLProcess) and hasattr(attr, "process"): print(f"Tried to import and instantiate {attr_name}.") print(traceback.print_exc()) return found_processes
[docs] @staticmethod def create_process(importable_process: ImportableProcess) -> PLProcess: process = getattr(importable_process.module, importable_process.name)() assert isinstance(process, PLProcess) return process