QUANT-NET Message Bus
The QUANT-NET Message Bus module provide RPC and Publish/Subscribe capabilities for the control plane, and the package is a common dependency for the Controller and Agent components. The quantnet_mq module provides the following functionalies:
Remote procedure call (RPC) client and server implementations
Publish/Subscribe (pub/sub) client and server implementations
Core schemas for the QUANT-NET Control Plane network data model
Auto-generated Python objects for all defined schema
Example Usage
RPC client
import asyncio
from quantnet_mq.rpcclient import RPCClient
async def main():
client = RPCClient("example_client")
client.set_handler("myRequest", None,
"quantnet_mq.schema.models.myns.myRequest")
await client.start()
# send a message and wait up to for a response 5s
msg = {"arg1": "value1", "arg2": 999.99}
req = await client.call("myRequest, msg, timeout=5.0)
print (req)
if __name__ == "__main__":
asyncio.run(main())
Pub/Sub receiver
import asyncio
from quantnet_mq.rpcclient import MsgServer
async def handle_msg(data):
print (data)
async def main():
client = MsgServer()
mclient.subscribe("mytopic", self.handle_msg)
await mclient.start()
# wait as long as needed here ...
if __name__ == "__main__":
asyncio.run(main())
Loading user-defined schema
With user-defined protocol plugins, it may often be necessary to dynamically load the necessary schema definitions.
Completing the
mynsRPC example above:
from quantnet_mq.rpcclient import RPCClient
from quantnet_mq.schema.models import Schema
async def main(self):
Schema.load_schema("../schema/example.yaml", ns="myns")
client = RPCClient("myRequest")
client.set_handler("myRequest", None,
"quantnet_mq.schema.models.myns.myRequest")
await client.start()
from quantnet_mq.schema.models import myns
# use namespaced objects ...
There is special handling for schema definitions that reference objects.yaml. This core schema packaged with quantnet_mq provides a set of common object types, including RPC response and Status formats, that may be useful. The source schema file contains further details.
For example, a user-defined schema may contain a reference of the form:
properties:
status:
$ref: "objects.yaml#/components/schemas/Status"
Working with auto-generated schema objects
It is straighforward to debug and test newly defined schema using the Python REPL.
Using the following example.yaml schema:
---
asyncapi: "2.6.0"
id: "urn:gov:quant-net"
info:
description: "Quant-Net Simple Example"
title: "RPC Example Endpoint"
version: "1.0.0"
components:
messages:
RequestMessage:
name: RequestMessage
messageId: example.request
payload:
"$ref": "#/components/schemas/RequestMessage"
schemas:
RequestMessage:
title: myRequest
type: object
required:
- cmd
- agentId
- payload
properties:
cmd:
type: string
agentId:
type: string
payload:
type: object
required:
- arg1
- arg2
properties:
arg1:
type: string
arg2:
type: number
We may then load and use the schema objects directly using quantnet_mq:
>>> from quantnet_mq.schema.models import Schema
>>> from quantnet_mq.schema import models
>>> Schema.load_schema("example.yaml", ns="myns")
>>> models.myns
<module 'myns'>
>>> req = models.myns.myRequest()
>>> try:
... req.payload = {"arg1": "Hello", "arg2": "World"}
... except Exception as e:
... print (e)
...
World is neither an integer nor a float
while setting 'arg2' in payload_<anonymous>
>>>
>>> req.payload = {"arg1": "Hello", "arg2": 99.99}
>>> req.agentId = "agent1"
>>> req.cmd = "myRequest"
>>> req.serialize()
'{"cmd": "myRequest", "agentId": "agent1", "payload": {"arg1": "Hello", "arg2": 99.99}, "agentID": "agent1", "command": "myRequest"}'
>>>