Source code for laborchestrator.pythonlab_process_finder
"""
This module provides the ProcessFinder.
A tool ta get a list of all available PythonLan processes in a specified directory or module.
"""
import os
import importlib
from importlib import util as import_util
import pkgutil
import traceback
from typing import NamedTuple, List, Optional
from types import ModuleType
import logging
from pythonlab.process import PLProcess
default_dir = os.path.dirname(__file__)
[docs]
class ImportableProcess(NamedTuple):
module: ModuleType
name: str
file_path: Optional[str] = None
[docs]
class ProcessFinder:
[docs]
@staticmethod
def to_process_id(ip: ImportableProcess):
"""
In case two processes have the same name, this is a readable, unique string to distinguish them
:param ip:
:return:
"""
return f"{ip.module.__name__}.{ip.name}"
[docs]
@staticmethod
def create_process_from_id(process_id: str, pck: ModuleType) -> Optional[PLProcess]:
try:
importable = ProcessFinder.get_processes(pck)
for p in importable:
if ProcessFinder.to_process_id(p) == process_id:
return ProcessFinder.create_process(p)
except Exception as ex:
Logger.error(f"Could not import {process_id}: {ex}\n {traceback.print_exc()}")
return None
[docs]
@staticmethod
def get_processes(pck: ModuleType) -> List[ImportableProcess]:
importlib.reload(pck)
process_names = ProcessFinder._find_processes(pck)
return process_names
[docs]
@staticmethod
def _find_processes(pck: ModuleType) -> List[ImportableProcess]:
"""
recursively iterate through namespace
Specifying the second argument (prefix) to iter_modules makes the
returned name an absolute name instead of a relative one. This allows
import_module to work without having to do additional modification to
the name.
s. https://packaging.python.org/guides/creating-and-discovering-plugins/
"""
processes = []
for finder, name, ispkg in pkgutil.iter_modules(pck.__path__):
mod_name = pck.__name__ + "." + name
try:
submodule = importlib.import_module(mod_name)
importlib.reload(submodule)
if ispkg:
pass
else:
for attr in dir(submodule):
try:
buff = getattr(submodule, attr)()
if isinstance(buff, PLProcess):
processes.append(ImportableProcess(submodule, attr))
except:
# there will be lots of errors to ignore because not all attributes are callable
#Logger.debug(traceback.print_exc())
pass
except Exception as ex:
#Logger.debug(f"{ex}\n{traceback.print_exc()}")
pass
return processes
[docs]
@staticmethod
def importable_processes_from_string(src: str, debug_printouts: bool = False) -> List[ImportableProcess]:
found_processes = []
module_name = "tmp_module"
spec = import_util.spec_from_loader(module_name, loader=None)
tmp_module = import_util.module_from_spec(spec)
exec(src, tmp_module.__dict__)
for attr_name in dir(tmp_module):
try:
attr = getattr(tmp_module, attr_name)
inst = attr()
if isinstance(inst, PLProcess):
found_processes.append(ImportableProcess(tmp_module, attr_name))
except:
# there will be lots of errors to ignore because not all attributes are callable
# Logger.debug(traceback.print_exc())
if debug_printouts:
if callable(attr) and issubclass(attr, PLProcess) and hasattr(attr, "process"):
print(f"Tried to import and instantiate {attr_name}.")
print(traceback.print_exc())
return found_processes
[docs]
@staticmethod
def create_process(importable_process: ImportableProcess) -> PLProcess:
process = getattr(importable_process.module, importable_process.name)()
assert isinstance(process, PLProcess)
return process