dragon.ai.agent.communication.dragon_comm.DragonQueueProtocol

class DragonQueueProtocol[source]

Bases: CommunicationProtocol

Communication protocol backed by Dragon Queue and Dragon Distributed Dictionary.

Each agent gets one instance of this class wrapping its own input queue. The agent calls receive() to pull messages from its queue and publish / read / delete to interact with a shared DDict that is passed in per-call.

Parameters

serialized_input_queue:

A serialized Dragon queue handle representing this agent’s input queue. Obtained by serializing a dragon.native.queue.Queue instance.

__init__(queue: Queue = None) None [source]

Methods

__init__([queue])

destroy()

receive([timeout])

Await a message on this agent's input queue without blocking the event loop.

send(message)

Send message to the recipient's queue.

__init__(queue: Queue = None) None [source]
send(message: Message) None [source]

Send message to the recipient’s queue.

Deserializes the target queue from message.recipient_serialized_queue and puts the serialized message onto it.

Note

Not called in the current dispatcher-based flow. Kept for future direct agent-to-agent or A2A communication scenarios.

async receive(timeout: float | None = None) Message | None [source]

Await a message on this agent’s input queue without blocking the event loop.

dragon.native.queue.Queue.get() is a synchronous blocking call. Calling it directly on the event loop thread would freeze all other concurrent asyncio Tasks for the full duration of the timeout (up to 1 second per poll cycle when the queue is idle).

asyncio.to_thread submits the blocking call to Python’s default ThreadPoolExecutor. The pool reuses a persistent idle thread for repeated calls — no thread is created or destroyed per invocation, so overhead is negligible even when the queue is empty for extended periods (e.g., 30+ seconds between messages). The event loop remains free to run other Tasks (LLM calls, tool calls, etc.) while the pool thread waits on the Dragon queue.

Returns

Message or None

Deserialized Message, or None on timeout.

destroy()[source]