import asyncio
import json
import logging
import os
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Dict, Optional, List
from quantnet_mq import Code
from quantnet_mq.schema.models import Status, experiment
from quantnet_controller.common.request_translator import RequestTranslator
from quantnet_controller.common.utils import generate_uuid
from quantnet_controller.core import AbstractDatabase as DB
logger = logging.getLogger(__name__)
[docs]
class RequestType(Enum):
EXPERIMENT = "experiment"
CALIBRATION = "calibration"
SIMULATION = "simulation"
PROTOCOL = "protocol" # For generic protocol requests
[docs]
@dataclass
class RequestParameter:
"""Quant‑Net RequestParameter"""
exp_name: Optional[str] = None
path: Optional[List[str]] = field(default_factory=list)
exp_params: Optional[Dict[str, Any]] = field(default_factory=dict)
[docs]
@dataclass
class Request:
"""First-class object to track all requests in the system."""
# Constructor parameters - store serialized data
# payload_data: Optional[Dict[str, Any]]
request_type: RequestType
parameters: Dict[str, Any] = field(default_factory=dict)
rid: Optional[str] = None
# Auto-generated fields (not in constructor)
id: str = field(init=False)
type: str = field(init=False) # Store enum value as string
created_at: float = field(init=False) # Store as timestamp
updated_at: float = field(init=False) # Store as timestamp
status: Status = field(init=False) # Use Status from quantnet_mq
result: Dict[str, Any] = field(default_factory=dict, init=False)
error: Optional[str] = field(default=None, init=False)
def __post_init__(self):
"""Initialize auto-generated fields after dataclass init."""
self.id = self.rid if self.rid is not None else generate_uuid()
self.type = self.request_type.value if isinstance(self.request_type, Enum) else str(self.request_type)
now = datetime.now(timezone.utc)
self.created_at = now.timestamp()
self.updated_at = self.created_at
self.status = Status(code=Code.OK.value, value=Code.OK.name, message="Request created, not yet started")
# Store func as private attribute (not part of dataclass)
self._func = None
self._payload = None
[docs]
def add_result(self, key, value):
"""
Add a result entry directly to the result dictionary.
Args:
key: Result key (e.g., agent_id, 'error', 'metadata')
value: Result value
"""
self.result[key] = value
[docs]
def update_status(self, code: Code, error=None):
"""
Update the request's status and optional error information.
:param status: New status string (e.g., ``created``, ``queued``, ``executing``,
``completed``, ``failed``)
:type status: str
:param error: Optional error message to record when status is ``failed``
:type error: str | None
:returns: None
"""
self.updated_at = datetime.now(timezone.utc).timestamp()
# Create status object from Code using quantnet_mq Status
self.status = Status(
code=code.value, value=code.name, reason=error if error else None, message=error if error else None
)
if error:
self.error = error
# Store errors in a flat list without nesting
if "errors" not in self.result:
self.result["errors"] = []
self.result["errors"].append(
{"timestamp": datetime.fromtimestamp(self.updated_at, tz=timezone.utc).isoformat(), "error": str(error)}
)
[docs]
def to_dict(self):
"""Convert Request to dictionary using native dataclass serialization."""
data = asdict(self)
# Remove internal fields that shouldn't be serialized
data.pop("request_type", None)
data.pop("rid", None)
# data.pop("func", None)
# Rename payload_data to payload for consistency
# data["payload"] = data.pop("payload_data", None)
# Convert Status object to dict using its serialize method
data["status"] = json.loads(self.status.serialize())
return data
@property
def func(self) -> Optional[Callable]:
"""Get the callable function."""
return getattr(self, "_func", None)
@func.setter
def func(self, value: Optional[Callable]) -> None:
"""Set the callable function."""
self._func = value
@property
def payload(self):
"""Get the callable function."""
return getattr(self, "_payload", None)
@payload.setter
def payload(self, value):
"""Set the callable function."""
self._payload = value
[docs]
class RequestManager:
"""
Manage :class:`Request` objects using a singleton registry per plugin configuration.
This class provides global request tracking while allowing each plugin to maintain
its own request type and optional experiment translator.
:cvar _instances: Mapping of ``plugin_key`` to ``RequestManager`` instances.
:type _instances: dict[str, RequestManager]
:cvar _lock: Asyncio lock protecting singleton creation.
:type _lock: asyncio.Lock
:cvar _shared_db_handler: Shared database handler for all managers.
:type _shared_db_handler: Any
:cvar _shared_active_requests: In‑memory store of active :class:`Request` objects.
:type _shared_active_requests: dict[str, Request]
"""
_instances = {} # Plugin-specific instances: {plugin_key: RequestManager}
_lock = asyncio.Lock()
_shared_db_handler = None # Shared DB handler for all requests
_shared_active_requests = {} # Shared in-memory tracking: {rid: Request}
def __new__(cls, ctx, plugin_schema=None, request_type=RequestType.PROTOCOL, dbname=None, exp_def_path=None):
"""
Create or retrieve the singleton :class:`RequestManager` instance for a given
plugin configuration.
:param ctx: Execution context passed from the server or client.
:type ctx: Any
:param plugin_schema: Optional plugin schema class used to differentiate managers.
:type plugin_schema: type | None
:param request_type: Type of request this manager will handle.
:type request_type: RequestType
:param dbname: Unused placeholder for compatibility with older APIs.
:type dbname: str | None
:param exp_def_path: Optional path to an experiment definition file.
:type exp_def_path: str | None
:returns: The singleton :class:`RequestManager` instance associated with the
``plugin_schema`` and ``request_type``.
:rtype: RequestManager
"""
# Create a unique key for this plugin configuration
plugin_key = f"{plugin_schema.__name__ if plugin_schema else 'default'}_{request_type.value}"
if plugin_key not in cls._instances:
instance = super().__new__(cls)
cls._instances[plugin_key] = instance
instance._initialized = False
return cls._instances[plugin_key]
def __init__(self, ctx, plugin_schema=None, request_type=RequestType.PROTOCOL, dbname=None, exp_def_path=None):
"""
Initialise the :class:`RequestManager` for the specified plugin configuration.
This method is idempotent – subsequent calls for the same configuration
will return the already‑initialised instance.
:param ctx: Execution context.
:type ctx: Any
:param plugin_schema: Optional plugin schema class.
:type plugin_schema: type | None
:param request_type: The request type this manager will service.
:type request_type: RequestType
:param dbname: Placeholder for legacy database name arguments.
:type dbname: str | None
:param exp_def_path: Path to experiment definition file, if required.
:type exp_def_path: str | None
:returns: ``None``
"""
# Only initialize once per plugin configuration
if self._initialized:
return
self.ctx = ctx
self.plugin_schema = plugin_schema
self.request_type = request_type
self._request_queue = asyncio.Queue()
# Initialize shared DB handler once (class-level)
if RequestManager._shared_db_handler is None:
RequestManager._shared_db_handler = DB().handler("Requests")
# Create indices on id and type for fast queries
# RequestManager._shared_db_handler.create_index("id")
# RequestManager._shared_db_handler.create_index("type")
# Use shared DB handler and active requests
self.db_handler = RequestManager._shared_db_handler
self._active_requests = RequestManager._shared_active_requests
# Initialize translator for experiment-type requests
if request_type == RequestType.EXPERIMENT or request_type == RequestType.CALIBRATION:
if exp_def_path is None:
import inspect
# Get the caller's frame to find exp_def_path
caller_frame = inspect.currentframe().f_back
caller_file = caller_frame.f_globals.get("__file__")
if caller_file:
caller_dir = os.path.dirname(os.path.abspath(caller_file))
exp_def_path = os.path.join(caller_dir, "experiment.py")
self.translator = RequestTranslator(self.ctx, experiment)
if exp_def_path and os.path.exists(exp_def_path):
self.translator.load_exp_def(exp_def_path)
logger.info(f"Loaded experiment definition from {exp_def_path}")
else:
logger.warning(f"Experiment definition not found at {exp_def_path}")
else:
self.translator = None
self._initialized = True
logger.info(
f"Initialized RequestManager for {self.plugin_schema.__name__ if self.plugin_schema else 'default'}"
f" with type {self.request_type.value}"
)
[docs]
@classmethod
def get_instance(cls, plugin_schema=None, request_type=RequestType.PROTOCOL):
"""
Retrieve a previously‑created :class:`RequestManager` instance for a given
plugin configuration.
:param plugin_schema: Optional plugin schema class used to differentiate managers.
:type plugin_schema: type | None
:param request_type: The request type this manager handles.
:type request_type: RequestType
:returns: The matching :class:`RequestManager` instance, or ``None`` if it has
not been instantiated yet.
:rtype: RequestManager | None
"""
plugin_key = f"{plugin_schema.__name__ if plugin_schema else 'default'}_{request_type.value}"
return cls._instances.get(plugin_key)
[docs]
@classmethod
def get_all_active_requests(cls):
"""
Return a mapping of all currently active :class:`Request` objects across
every plugin manager.
:returns: Dictionary where the key is the request ID and the value is the
corresponding :class:`Request` instance.
:rtype: dict[str, Request]
"""
return cls._shared_active_requests
[docs]
def new_request(self, payload, parameters=None, rid=None, func=None):
"""
Create and register a new :class:`~quantnet_controller.common.request.Request`.
:param payload: Plugin‑defined request schema (e.g., ``spgRequest`` instance)
containing plugin‑specific data such as nodes, rate, duration, …
:type payload: Any
:param parameters: Optional execution parameters for the request type
(e.g., ``exp_name``, file system ``path`` for experiments)
:type parameters: dict | None
:param rid: Optional request identifier; a UUID will be generated if omitted
:type rid: str | None
:param func: Optional custom async callable for ``PROTOCOL`` requests.
Signature should be ``async def func(request: Request) -> Code``.
:type func: Callable[[Request], Awaitable[Code]] | None
:returns: The newly created :class:`Request` instance, already stored in the
in‑memory registry and persisted to the database.
:rtype: Request
"""
# payload_data = json.loads(payload.serialize())
request = Request(request_type=self.request_type, parameters=asdict(parameters), rid=rid)
request.func = func # Set func as private attribute after creation
request.payload = payload
# Store in shared memory and DB
self._active_requests[request.id] = request
self.db_handler.add(request.to_dict())
logger.info(f"Created new request {request.id} of type {self.request_type}")
return request
[docs]
async def get_request(self, rid, include_result=False, raw=False):
"""
Retrieve a :class:`Request` instance by its identifier.
:param rid: Unique request identifier
:type rid: str
:param include_result: When ``True`` and the request has completed, also fetch
the associated experiment result
:type include_result: bool
:param raw: When ``True``, return the raw database record instead of a Request object
:type raw: json
:returns: The matching :class:`Request` object, or ``None`` if the identifier is
unknown
:rtype: Request | None
"""
# Check in-memory first (shared across all instances)
if rid in self._active_requests:
req = self._active_requests[rid]
else:
# Fall back to DB
record = self.db_handler.get({"id": rid})
if record:
if raw:
return record
# Reconstruct Request from DB
req = Request(
request_type=RequestType(record["type"]),
parameters=record.get("parameters", {}),
rid=record["id"],
)
req.payload = record.get("payload")
# Restore status object
if isinstance(record["status"], dict):
req.status = Status(
code=record["status"]["code"],
value=record["status"]["value"],
reason=record["status"].get("reason"),
message=record["status"].get("message"),
)
else:
# Fallback for old format
req.status = Status(code=Code.UNKNOWN.value, value=Code.UNKNOWN.name)
req.result = record["result"] if include_result else {}
req.error = record.get("error")
req.created_at = record["created_at"]
req.updated_at = record["updated_at"]
# Add back to shared memory
self._active_requests[rid] = req
else:
return None
return req
[docs]
async def find_requests(self, raw=False, filter={}, **kwargs):
"""
Locate requests that satisfy the provided filter criteria.
:param filters: Arbitrary keyword filters mapping field names to desired values
(e.g., ``type="experiment"``, ``status="queued"``)
:type filters: Any
:returns: List of matching :class:`Request` instances
:rtype: list[Request]
"""
if filter:
kwargs["filter"] = filter
records = self.db_handler.find(**kwargs)
if raw:
return records
requests = []
for record in records:
# Check if already in memory
rid = record["id"]
if rid in self._active_requests:
requests.append(self._active_requests[rid])
else:
request = Request(
request_type=RequestType(record["type"]),
parameters=record.get("parameters", {}),
rid=rid,
)
request.payload = record.get("payload")
# Restore status object
if isinstance(record["status"], dict):
request.status = Status(
code=record["status"]["code"],
value=record["status"]["value"],
reason=record["status"].get("reason"),
message=record["status"].get("message"),
)
else:
# Fallback for old format
request.status = Status(code=Code.UNKNOWN.value, value=Code.UNKNOWN.name)
request.result = record.get("result", {})
request.error = record.get("error")
request.created_at = record["created_at"]
request.updated_at = record["updated_at"]
requests.append(request)
return requests
[docs]
def del_request(self, rid):
"""
Remove a request from both in‑memory tracking and persistent storage.
:param rid: Unique request identifier
:type rid: str
:returns: ``True`` if the request was successfully deleted, ``False`` otherwise
:rtype: bool
"""
# Remove from shared memory
if rid in self._active_requests:
del self._active_requests[rid]
# Remove from DB
result = self.db_handler.delete({"id": rid})
logger.info(f"Deleted request {rid}")
return result > 0
[docs]
async def noSchedule(self, request, blocking=True):
"""
Execute a request immediately without placing it on the queue.
:param request: The :class:`Request` to be executed
:type request: Request
:param blocking: If ``True`` the coroutine waits for the request to finish;
if ``False`` a ``Future`` is returned instead
:type blocking: bool
:returns: Execution result code
:rtype: Code
"""
request.update_status(Code.OK)
self.db_handler.upsert({"id": request.id}, request.to_dict())
if blocking:
return await self._execute_request(request)
else:
fut = self._execute_request(request)
logger.info(f"Request {request.id} added to non-blocking execution")
return fut
[docs]
async def schedule(self, request, blocking=True):
"""
Enqueue a request for later execution.
:param request: The :class:`Request` to be queued
:type request: Request
:param blocking: ``True`` to wait for the request to complete,
``False`` to obtain a ``Task`` that can be awaited later
:type blocking: bool
:returns: ``Code`` on blocking execution, otherwise an ``asyncio.Task`` instance
:rtype: Code | asyncio.Task
"""
request.update_status(Code.QUEUED)
self.db_handler.upsert({"id": request.id}, request.to_dict())
await self._request_queue.put(request)
logger.info(f"Request {request.id} added to queue")
# Create a future that will be resolved when this specific request completes
request_future = asyncio.Future()
# Wrapper that processes queue and resolves the future for this request
async def execution_wrapper():
try:
# Process the queue (this will execute our request among others)
await self.process_queue()
# Wait for this specific request to complete
while request.status.code == Code.QUEUED.value:
await asyncio.sleep(0.1) # Poll every 100ms
# Use the status code directly
result = Code(request.status.code)
request_future.set_result(result)
return result
except Exception as e:
logger.error(f"Execution wrapper failed for request {request.id}: {e}")
request_future.set_exception(e)
raise
# Create task for the wrapper
task = asyncio.create_task(execution_wrapper())
if blocking:
# Wait for completion and return result
return await task
else:
# Return the task so caller can await it later
return task
async def _execute_request(self, request):
"""
Dispatch execution logic based on the manager's request type.
:param request: The :class:`Request` to be executed
:type request: Request
:returns: Normalised execution result code
:rtype: Code
"""
request.update_status(Code.OK)
self.db_handler.upsert({"id": request.id}, request.to_dict())
try:
if (
self.request_type == RequestType.EXPERIMENT or self.request_type == RequestType.CALIBRATION
) and self.translator:
# Merge payload data into parameters for experiment execution
exec_params = request.parameters.copy()
exec_params["id"] = request.id
# Execute experiment with result callback
# Uses request.add_result to ensure flat structure
rc = await self.translator.start_experiment(exec_params, handle_result=request.add_result)
elif self.request_type == RequestType.PROTOCOL and request.func:
# Execute custom protocol function
logger.info(f"Executing custom function for protocol request {request.id}")
rc = await request.func(request.payload)
else:
# No execution logic defined, just mark as completed
logger.info(f"No execution logic for request {request.id}, marking as completed")
rc = Code.OK
# Normalize return value to Code
rc = self._normalize_return_code(rc)
if rc != Code.OK:
request.update_status(Code.FAILED, f"Execution failed with code {rc}")
else:
request.update_status(Code.OK)
self.db_handler.upsert({"id": request.id}, request.to_dict())
return rc
except Exception as e:
logger.error(f"Request {request.id} execution failed: {e}")
import traceback
traceback.print_exc()
request.update_status(Code.FAILED, str(e))
self.db_handler.upsert({"id": request.id}, request.to_dict())
return Code.FAILED
def _normalize_return_code(self, rc):
"""
Convert a heterogeneous return value into a :class:`quantnet_mq.Code` enum.
:param rc: Value returned by the user‑provided execution function. May be a
:class:`Code` instance, ``int``, ``bool``, ``str`` or ``None``.
:type rc: Any
:returns: Corresponding :class:`Code` enum member
:rtype: Code
"""
# Already a Code object
if isinstance(rc, Code):
return rc
# Boolean: True = OK, False = FAILED
if isinstance(rc, bool):
return Code.OK if rc else Code.FAILED
# Integer: 0 = OK, non-zero = FAILED
if isinstance(rc, int):
return Code.OK if rc == 0 else Code.FAILED
# String: try to match Code enum values
if isinstance(rc, str):
try:
return Code[rc.upper()]
except (KeyError, AttributeError):
# If string doesn't match any Code, treat as FAILED
logger.warning(f"Unknown return code string '{rc}', treating as FAILED")
return Code.FAILED
# None: treat as OK (function completed without explicit return)
if rc is None:
return Code.OK
# Any other type: log warning and treat as OK if truthy, FAILED if falsy
logger.warning(f"Unexpected return type {type(rc).__name__}: {rc}, converting to Code")
return Code.OK if rc else Code.FAILED
[docs]
async def process_queue(self):
"""
Consume all pending requests from the internal queue and execute them sequentially.
:returns: ``Code.OK`` when the queue has been fully processed
:rtype: Code
"""
while not self._request_queue.empty():
request = await self._request_queue.get()
logger.info(f"Processing queued request {request.id}")
await self._execute_request(request)
return Code.OK
[docs]
async def get_experiment_result(self, id, timeout=60):
"""
Retrieve the result of an experiment request from the translator.
:param id: Identifier of the experiment request
:type id: str
:param timeout: Maximum time (seconds) to wait for the result
:type timeout: int
:returns: Experiment result object, or ``None`` if unavailable
:rtype: Any
"""
if self.translator:
return await self.translator.get_experiment_result(id, timeout)
return None