Source code for laborchestrator.sila_server.feature_implementations.laborchestratorservice_impl
# Generated by sila2.code_generator; sila2.__version__: 0.10.3
from __future__ import annotations
import traceback
from typing import TYPE_CHECKING
from sila2.server import MetadataDict
from ..generated.laborchestratorservice import (
GetStatus_Responses,
LabOrchestratorServiceBase,
LoadLabConfiguration_Responses,
LoadLabConfigurationFile_Responses,
LoadProcess_Responses,
LoadProcessFile_Responses,
StartProcess_Responses,
StopProcess_Responses,
)
if TYPE_CHECKING:
from ..server import Server
[docs]
class LabOrchestratorServiceImpl(LabOrchestratorServiceBase):
def __init__(self, parent_server: Server) -> None:
super().__init__(parent_server=parent_server)
[docs]
def LoadProcess(self, LabProcessName: str, *, metadata: MetadataDict) -> LoadProcess_Responses:
raise NotImplementedError # TODO
[docs]
def LoadProcessFile(self, LabProcessFileName: str, *, metadata: MetadataDict) -> LoadProcessFile_Responses:
try:
process_name = self.parent_server.orchestrator.add_process(file_path=LabProcessFileName)
print(process_name)
return process_name
except Exception as ex:
print(ex, traceback.print_exc())
return "FAIL"
[docs]
def LoadLabConfiguration(
self, LabConfigurationName: str, *, metadata: MetadataDict
) -> LoadLabConfiguration_Responses:
raise NotImplementedError # TODO
[docs]
def LoadLabConfigurationFile(
self, LabConfigurationFileName: str, *, metadata: MetadataDict
) -> LoadLabConfigurationFile_Responses:
raise NotImplementedError # TODO
[docs]
def StartProcess(self, ProcessName: str, *, metadata: MetadataDict) -> StartProcess_Responses:
self.parent_server.orchestrator.start_processes([ProcessName])
[docs]
def StopProcess(self, ProcessName: str, *, metadata: MetadataDict) -> StopProcess_Responses:
raise NotImplementedError # TODO
[docs]
def GetStatus(self, ProcessName: str, *, metadata: MetadataDict) -> GetStatus_Responses:
status = self.parent_server.orchestrator.get_process_state(ProcessName)
response = status.name.title()
return response