dragon.ai.agent.core.sub_agent.SubAgent

class SubAgent[source]

Bases: DragonAgent

A stateless sub-agent that processes tasks via LLM reasoning + tools.

All per-task state (upstream context, tool history, results) is stored in the shared Dragon Distributed Dictionary — the agent itself holds nothing between tasks.

Behavior is fully determined by AgentConfig.role, which becomes the LLM system prompt persona.

Execution modes:

  • Persistent listen loop — call listen() (inherited from DragonAgent). The agent blocks on its input queue, processes each incoming message, and writes results back to DDict. This is the primary path used by Dragon Batch dispatchers.

  • Direct call — call process() directly for testing or non-batch workflows.

__init__(*args, **kwargs) None [source]

Methods

__init__(*args, **kwargs)

list_tools()

Return all tools available to this agent, separated by source.

listen()

Dispatch tasks from the input queue as concurrent asyncio Tasks.

process(task_id, header, accessor)

Process a single task by reading upstream context and invoking the LLM.

__init__(*args, **kwargs) None [source]
async process(task_id: str , header: DispatchHeader, accessor: DDictAccessor) dict [str , Any ][source]

Process a single task by reading upstream context and invoking the LLM.

Steps:

  1. Read upstream results from DDict using the task_id / agent_id references in header.task.

  2. Invoke the LLM in a tool-call loop via _invoke_llm_with_tools().

  3. Return the result dict.

Parameters

task_id:

Unique identifier for this batch run / user request.

header:

DispatchHeader with task description, serialized DDict handle, and optional completion event.

accessor:

DDictAccessor wrapping the shared Dragon Distributed Dictionary for this batch run.

Returns

dict

{"response": <final_answer_string>}.

async listen() None [source]

Dispatch tasks from the input queue as concurrent asyncio Tasks.

Flow:

  1. Connect any pending MCP servers (deferred from __init__).

  2. Await receive() — blocks up to 1 s when queue is idle (accepted stall; see DragonQueueProtocol).

  3. On message arrival, create an asyncio.Task() for _handle_message() and immediately loop back to receive.

  4. Multiple tasks run concurrently — the event loop is free during all LLM, MCP, and tool I/O waits.

  5. Loop exits cleanly when self.shutdown_event is set.

Worst-case shutdown latency is ~1 s (the queue-poll timeout).

list_tools() dict [str , Any ][source]

Return all tools available to this agent, separated by source.

Returns a dict with two keys:

  • "registry" — local Python tools registered in tool_registry. Each entry is an OpenAI-compatible schema dict as returned by to_schema().

  • "mcp" — remote MCP tools owned by the internal ToolDispatcher, grouped by server alias. Each alias maps to a list of OpenAI-compatible schema dicts. Empty {} when no dispatcher or no MCP servers are connected.

Example output:

{
    "registry": [
        {"type": "function", "function": {"name": "web_search", ...}},
        {"type": "function", "function": {"name": "calculate_word_count", ...}},
    ],
    "mcp": {
        "jupyter": [
            {"type": "function", "function": {"name": "jupyter__create_notebook", ...}},
        ],
    },
}