dragon.ai.agent.core.sub_agent.SubAgent
- class SubAgent[source]
Bases:
DragonAgentA stateless sub-agent that processes tasks via LLM reasoning + tools.
All per-task state (upstream context, tool history, results) is stored in the shared Dragon Distributed Dictionary — the agent itself holds nothing between tasks.
Behavior is fully determined by
AgentConfig.role, which becomes the LLM system prompt persona.Execution modes:
Persistent listen loop — call
listen()(inherited fromDragonAgent). The agent blocks on its input queue, processes each incoming message, and writes results back to DDict. This is the primary path used by Dragon Batch dispatchers.Direct call — call
process()directly for testing or non-batch workflows.
Methods
__init__(*args, **kwargs)Return all tools available to this agent, separated by source.
listen()Dispatch tasks from the input queue as concurrent asyncio Tasks.
process(task_id, header, accessor)Process a single task by reading upstream context and invoking the LLM.
- async process(task_id: str , header: DispatchHeader, accessor: DDictAccessor) dict [str , Any ][source]
Process a single task by reading upstream context and invoking the LLM.
Steps:
Read upstream results from DDict using the
task_id/agent_idreferences inheader.task.Invoke the LLM in a tool-call loop via
_invoke_llm_with_tools().Return the result dict.
Parameters
- task_id:
Unique identifier for this batch run / user request.
- header:
DispatchHeaderwith task description, serialized DDict handle, and optional completion event.- accessor:
DDictAccessorwrapping the shared Dragon Distributed Dictionary for this batch run.
Returns
- dict
{"response": <final_answer_string>}.
- async listen() None [source]
Dispatch tasks from the input queue as concurrent asyncio Tasks.
Flow:
Connect any pending MCP servers (deferred from
__init__).Await
receive()— blocks up to 1 s when queue is idle (accepted stall; seeDragonQueueProtocol).On message arrival, create an
asyncio.Task()for_handle_message()and immediately loop back to receive.Multiple tasks run concurrently — the event loop is free during all LLM, MCP, and tool I/O waits.
Loop exits cleanly when
self.shutdown_eventis set.
Worst-case shutdown latency is ~1 s (the queue-poll timeout).
- list_tools() dict [str , Any ][source]
Return all tools available to this agent, separated by source.
Returns a dict with two keys:
"registry"— local Python tools registered intool_registry. Each entry is an OpenAI-compatible schema dict as returned byto_schema()."mcp"— remote MCP tools owned by the internalToolDispatcher, grouped by server alias. Each alias maps to a list of OpenAI-compatible schema dicts. Empty{}when no dispatcher or no MCP servers are connected.
Example output:
{ "registry": [ {"type": "function", "function": {"name": "web_search", ...}}, {"type": "function", "function": {"name": "calculate_word_count", ...}}, ], "mcp": { "jupyter": [ {"type": "function", "function": {"name": "jupyter__create_notebook", ...}}, ], }, }