PingPong
PingPong protocol
This tutorial provides the steps necessary to introduce a simple ping-pong protocol between the Controller and Agent. Besides being an informative exercise, this protocol may be useful in characterizing the Controller’s messaging reachability and latency to one or more running Agents in your control plane network.
Our ping-pong protocol requirements are as follows:
Allow an external client to contact the Controller using a
pingPongRequestmessageThe request message will contain the following information:
A list of destinations (Agent IDs)
The number of ping-pong iterations
The client will asynchronously receive a
pingPongRecordmessage with the results for each destination specified in the requestThe number of successful ping-pong attempts
The averaged round-trip-time information
The Controller will maintain a
pingPongRecordin its database for the last request made to each Agent
With the requirements stated, you will exercise the following control plane integrations to implement this functionality:
Define a protocol schema for the Message Bus
Create a ProtocolPlugin for the Controller that handles external client requests, sends pings to the Agents, and tracks the protocol results.
Create a CommandInterpreter for the Agent that responds to pings from the Controller.
Create a test client to make
pingPingRequestsand display results.
The complete ping-pong tutorial source code may be found here
Protocol schema
Three types of messages are required for our ping-pong protocol:
The request and response from the client to Controller
The
pingandpongthat are communicated between the Controller and AgentsThe summary record that is communicated back to the client.
Below is the pingPongRequest definition. This protocol message is intended use is as an RPC in the control plane, so it contains the required fields cmd, agentId, and payload. The payload contains the request-specific fields including type (a string), destinations (an array of strings), and iterations (an integer).
PingPongRequest:
title: pingPongRequest
type: object
required:
- cmd
- agentId
- payload
properties:
cmd:
type: string
agentId:
type: string
payload:
type: object
required:
- type
- destinations
- iterations
properties:
type:
type: string
destinations:
type: array
minItems: 1
items:
type: string
iterations:
type: number
The pingPongRecord will serve two purposes by defining 1) the structure of data to save for each Agent’s ping request, and 2) the message format for the summary record to communicate back to the requesting client. Our record contains an id field to identify the Agent, phase to indicate start/done/failure states, start and end timestamps to time the request, and a number of statistic fields for summary reporting.
PingPongRecord:
title: pingPongRecord
type: object
required:
- id
- phase
properties:
id:
type: string
phase:
type: string
start_ts:
type: number
end_ts:
type: number
result:
type: string
iterations:
type: number
rtt_min:
type: number
rtt_max:
type: number
rtt_avg:
type: number
rtt_mdev:
type: number
successes:
type: number
The ping and pong messages are ommitted for brevity but may be found in the complete ping-pong YAML located within the schema/ sub-directory.
Controller ProtocolPlugin
Now that we have a complete schema definition, we can begin to implement the protocol implementation for the Controller. Inside of pingpong/__init__.py we define an instance of the abstract ProtocolPlugin class. The PingPongProtocol will adhere to this abstract interface and allow for the registration of handlers that match any received messages (i.e. “commands”) defined in our ping-pong schema.
import logging
from quantnet_controller.common.plugin import ProtocolPlugin, PluginType
from quantnet_controller.common.request import RequestManager, RequestType
from quantnet_mq import Code
from quantnet_mq.schema.models import pingpong, Status as responseStatus
from pingponger import PingPonger
logger = logging.getLogger(__name__)
class PingPongProtocol(ProtocolPlugin):
def __init__(self, context):
super().__init__("pingpong", PluginType.PROTOCOL, context)
self._client_commands = [
("pingpong.ping", None, "quantnet_mq.schema.models.pingpong.ping"),
]
self._server_commands = [
("pingpong", self.handle_pingpong, "quantnet_mq.schema.models.pingpong.pingPongRequest"),
]
self._msg_commands = list()
self.ctx = context
# Initialize RequestManager for protocol-type requests
self.request_manager = RequestManager(
context, plugin_schema=pingpong.pingPongRequest, request_type=RequestType.PROTOCOL
)
self._pingponger = PingPonger(config=context.config, rpcclient=context.rpcclient, msgclient=context.msgclient)
async def handle_pingpong(self, request):
"""Handle pingpong request."""
logger.info(f"Received pingpong request: {request.serialize()}")
# Create plugin-specific payload object
payload = pingpong.pingPongRequest(**request)
rc = Code.OK
if payload.payload.type == "ping":
try:
# Create Request object with custom function
req = self.request_manager.new_request(payload=payload, parameters={}, func=self._pingponger.pingpong)
# Execute immediately without queueing
rc = await self.request_manager.noSchedule(req, blocking=True)
except Exception as e:
logger.error(f"Could not schedule pingpong request: {e}")
import traceback
traceback.print_exc()
rc = Code.FAILED
else:
logger.error(f"Unknown pingpong request type {payload.payload.type}")
rc = Code.FAILED
return pingpong.pingPongResponse(
status=responseStatus(code=rc.value, value=Code(rc).name), token=payload.payload.token
)
The Controller plugin imports a PingPonger class that implements the ping-pong protocol logic and record management. Whenever a Controller with this plugin loaded receives a pingPongRequest it will invoke the handle_pingpong() method to begin the ping-pong protocol. The plugin also registers a ping command as a valid RPC call conforming to the specified ping-pong protocol schema.
Agent Command Interpreter
Our most straighforward code module for this tutorial is at the Agent side. Here we implement an instance of the abstract CMDInterpreter class to handle ping commands received from the Controller. A handle_ping() method is defined to process the ping requests. The extent of the handler processing is to construct a pong response that simply encodes a timestamp and a message that lists all of the hardware drivers loaded within the running Agent.
import time
import logging
from quantnet_mq.schema.models import pingpong, Status as responseStatus
from quantnet_mq import Code
from quantnet_agent.hal.HAL import CMDInterpreter
log = logging.getLogger(__name__)
class PingPong(CMDInterpreter):
def __init__(self, hal):
super().__init__(hal)
async def handle_ping(self, request):
log.info("Received ping request: %s", request.serialize())
device_configured = (
f"This agent is configured with the following drivers: {' '.join([i for i in self.hal.devs])}"
)
return pingpong.pong(timestamp=time.time(),
message=device_configured)
def get_commands(self):
commands = {"pingpong.ping": [self.handle_ping,
"quantnet_mq.schema.models.pingpong.ping"]}
return commands
Client program
Finally, we are able to implement a testing client to drive our new ping-pong protocol implementation.
import os
import sys
import json
import asyncio
from quantnet_mq.rpcclient import RPCClient
from quantnet_mq.msgserver import MsgServer
from quantnet_mq.schema.models import Schema
class MyPingPonger():
def __init__(self, destinations=list(), iters=5):
self._dests = destinations
self._iters = iters
self._pending = len(self._dests)
async def start_pingpong(self, client):
msg = {"type": "ping", "destinations": self._dests, "iterations": self._iters}
ret = await client.call("pingpong", msg, timeout=20.0)
ret = json.loads(ret)
return ret
def handle_pong(self, msg):
from quantnet_mq.schema.models import pingpong
res = pingpong.pingPongRecord(**json.loads(msg))
print(f"--- {res.id} ping statistics ---")
print(f"{res.iterations} requests made, {res.successes} received, time {(res.end_ts-res.start_ts)*1e3:.0f}ms")
if res.successes:
rtt_min = float(res.rtt_min)
rtt_avg = float(res.rtt_avg)
rtt_max = float(res.rtt_max)
rtt_mdev = float(res.rtt_mdev)
print(f"rtt min/avg/max/mdev {rtt_min:.3f}/{rtt_avg:.3f}/{rtt_max:.3f}/{rtt_mdev:.3f} ms")
self._pending -= 1
async def main(self):
# Setup RPC client with our PingPong schema
Schema.load_schema("../schema/pingpong.yaml", ns="pingpong")
client = RPCClient("pingpong-client", host=os.getenv("HOST", "localhost"))
client.set_handler("pingpong", None, "quantnet_mq.schema.models.pingpong.pingPongRequest")
await client.start()
# Subscibe to pong topic
mclient = MsgServer(host=os.getenv("HOST", "localhost"))
mclient.subscribe("pong", self.handle_pong)
await mclient.start()
# Begin pingpong request
res = await self.start_pingpong(client)
# Wait for pong responses (as received at controller)
while (self._pending):
await asyncio.sleep(1)
if __name__ == "__main__":
dests = ["agent1", "agent2"]
asyncio.run(MyPingPonger(dests, iters=5).main())
Our test_pingpong.py is an asyncio application that makes use of the Message Bus RPC and Messaging facilities to invoke the ping-pong protocol at the Controller and handle the pingPongRecords returned, respectively. Note that the test program must load the ping-pong schema file so that it can make use of the generated schema models.
Using the Quick Start environment, an example execution of the client with agent1 and agent2 specified as destinations where agent1 is online but not agent2:
$ docker exec -ti controller bash
root@e038e8ec4071:/# cd /quant-net-plugins/plugins/pingpong/
root@e038e8ec4071:/quant-net-plugins/plugins/pingpong# HOST=broker python3 test_pingpong.py
--- LBNL-Q ping statistics ---
5 requests made, 5 received, time 5107ms
rtt min/avg/max/mdev 2.704/19.802/44.927/22.162 ms
message: Agent LBNL-Q is configured with the following drivers: exp_framework, lightsource, epc, polarimeter, egp, messaging
--- UCB-Q ping statistics ---
5 requests made, 5 received, time 5109ms
rtt min/avg/max/mdev 1.964/19.341/44.573/22.769 ms
message: Agent UCB-Q is configured with the following drivers: exp_framework, lightsource, epc, polarimeter, egp, messaging
--- agent2 ping statistics ---
5 requests made, 0 received, time 10009ms