dragon.ai.agent.communication.dragon_comm.DragonQueueProtocol
- class DragonQueueProtocol[source]
Bases:
CommunicationProtocolCommunication protocol backed by Dragon Queue and Dragon Distributed Dictionary.
Each agent gets one instance of this class wrapping its own input queue. The agent calls
receive()to pull messages from its queue andpublish/read/deleteto interact with a shared DDict that is passed in per-call.Parameters
- serialized_input_queue:
A serialized Dragon queue handle representing this agent’s input queue. Obtained by serializing a
dragon.native.queue.Queueinstance.
Methods
__init__([queue])destroy()receive([timeout])Await a message on this agent's input queue without blocking the event loop.
send(message)Send message to the recipient's queue.
- send(message: Message) None [source]
Send message to the recipient’s queue.
Deserializes the target queue from
message.recipient_serialized_queueand puts the serialized message onto it.Note
Not called in the current dispatcher-based flow. Kept for future direct agent-to-agent or A2A communication scenarios.
- async receive(timeout: float | None = None) Message | None [source]
Await a message on this agent’s input queue without blocking the event loop.
dragon.native.queue.Queue.get()is a synchronous blocking call. Calling it directly on the event loop thread would freeze all other concurrent asyncio Tasks for the full duration of the timeout (up to 1 second per poll cycle when the queue is idle).asyncio.to_threadsubmits the blocking call to Python’s defaultThreadPoolExecutor. The pool reuses a persistent idle thread for repeated calls — no thread is created or destroyed per invocation, so overhead is negligible even when the queue is empty for extended periods (e.g., 30+ seconds between messages). The event loop remains free to run other Tasks (LLM calls, tool calls, etc.) while the pool thread waits on the Dragon queue.Returns
- Message or None
Deserialized
Message, orNoneon timeout.