Source code for quantnet_agent.hal.HAL

import logging
import ast
import os
import importlib
import sys
from abc import ABC, abstractmethod
import traceback
from pydantic import TypeAdapter

log = logging.getLogger(__name__)
driver_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "driver")


def get_driver_module(classname):
    for filename in os.listdir(driver_path):
        f_path = os.path.join(driver_path, filename)
        if not os.path.isfile(f_path):
            continue
        with open(f_path) as file:
            node = ast.parse(file.read())
            classes = [n for n in node.body if isinstance(n, ast.ClassDef)]
            for class_ in classes:
                if classname == class_.name:
                    try:
                        spec = importlib.util.spec_from_file_location(
                            os.path.splitext(filename)[0], os.path.join(driver_path, filename)
                        )
                        module = importlib.util.module_from_spec(spec)
                        sys.modules["module.name"] = module
                        spec.loader.exec_module(module)
                        mod = getattr(module, classname)
                        return mod
                    except Exception:
                        log.error(f"Cannot load driver class {class_.name}")
                        log.error(traceback.format_exc())
                        return
    log.error(f"Cannot find class {classname} from plugin dir")


class HardwareAbstractionLayer:
    def __init__(self, config,  msgclient):

        self.devs = {}
        self._msgclient = msgclient
        self._config = config

        for device, property in config.devices.items():
            enabled = property.get("enabled")
            if enabled and not TypeAdapter(bool).validate_python(enabled):
                continue
            if device in self.devs:
                log.error(f"Duplicated Device {device}")
                continue
            dev_cls = get_driver_module(property["driver"])
            if dev_cls is None:
                continue
            try:
                dev = dev_cls(property,
                              config.node_file,
                              config.mq_broker_host,
                              config.mq_broker_port)
            except Exception as e:
                log.error(f"Cannot instantiate driver class {dev_cls}: {e}")
                continue
            self.devs[device] = dev


[docs] class Interpreter(ABC): """Abstract base class for the Agent Interpreter module. Example:: i = Interpreter(hal) :param hal: :doc:`HAL </hal>` object created by :class:`~quantnet_agent.hal.node`. """ def __init__(self, hal): self.hal = hal
[docs] class CoreInterpreter(Interpreter): """Abstract base class for the built-in Agent Interpreter module (e.g., :class:`~quantnet_agent.hal.interpreter.scheduler`). Example:: i = CoreInterpreter(node) :param node: :class:`~quantnet_agent.hal.node` object created by :class:`~quantnet_agent.service.node`. """ def __init__(self, node) -> None: self.node = node
[docs] @abstractmethod def get_commands(self): """Returns a dictionary of RPC handlers for a built-in interpreter to register with the RPC server in the controller. The dictionary keys are the names of RPC endpoints, and the values are lists composed of an RPC handler pointer and a schema model corresponding to the RPC. Example:: def get_commands(self): commands = { "scheduler.getSchedule": [self.get_schedule, "quantnet_mq.schema.models.scheduler.getSchedule"] } return commands """ pass
[docs] class CMDInterpreter(Interpreter): """Abstract base class for the Agent command Interpreter module (e.g., Link calibration module :class:`~quantnet_agent.hal.interpreter.calibration`). Example:: i = CMDInterpreter(hal) :param hal: :doc:`HAL </hal>` object created by :class:`~quantnet_agent.hal.node`. """ def __init__(self, hal) -> None: super().__init__(hal)
[docs] @abstractmethod def get_commands(self): """Returns a dictionary of RPC handlers for a command interpreter to register with the RPC server in the controller. The dictionary keys are the names of RPC endpoints, and the values are lists composed of an RPC handler pointer and a schema model corresponding to the RPC. Example:: def get_commands(self): commands = { "calibration.calibration": [self.measure, "quantnet_mq.schema.models.calibration.calibration"] } return commands """ pass
[docs] class ScheduleableInterpreter(CMDInterpreter): """Abstract base class for a schedulable Agent command Interpreter module (e.g., :class:`~quantnet_agent.hal.interpreter.ExperimentFramework`). Example:: i = ScheduleableInterpreter(hal) :param hal: :doc:`HAL </hal>` object created by :class:`~quantnet_agent.hal.node`. """ def __init__(self, hal) -> None: super().__init__(hal)
[docs] @abstractmethod def get_commands(self): """Returns a dictionary of RPC handlers for a schedulable command interpreter to register with the RPC server in the controller. The dictionary keys are the names of RPC endpoints, and the values are lists composed of an RPC handler pointer and a schema model corresponding to the RPC. Example:: def get_commands(self): commands = { "experiment.getState": [self.get_state, "quantnet_mq.schema.models.experiment.getState"] } return commands """ pass
[docs] @abstractmethod def get_schedulable_commands(self): """Returns a dictionary of RPC handlers for a schedulable command interpreter to register with the RPC server in the two-level scheduler. The RPC endpoints returned by this function are used by the agent scheduler instead of the agent node, enabling the scheduler to allocate a handler to an available timeslot when requested by the global scheduler. Keys in the dictionary are the names of the RPC endpoints, and the values are lists composed of an RPC handler pointer, a schema model corresponding to the RPC, and a schema model for the response. Example:: def get_schedulable_commands(self): commands = { "experiment.submit": [ self.submit, "quantnet_mq.schema.models.experiment.submit", experiment.submitResponse, ] } return commands """ pass
[docs] class LocalTaskInterpreter(Interpreter): """Abstract base class for a local Agent command Interpreter module (e.g., local device calibration). Example:: i = LocalTaskInterpreter(hal) :param hal: :doc:`HAL </hal>` object created by :class:`~quantnet_agent.hal.node`. """ def __init__(self, hal): super().__init__(hal)
[docs] @abstractmethod def run(self, *args, **kwargs): """Execute a local task (e.g., device calibration). :param args: Parameters to use for running the local task. :type args: list :param kwargs: Keyword parameter to use for running the local task. :type kwargs: dict """ pass
[docs] @abstractmethod def stop(self): """Stops a local task started with the `run` method.""" pass
[docs] @abstractmethod async def receive(self, *args, **kwargs): """Receive result of the `run` method.""" pass