Dragon AI Agent Framework — User Guide

This guide covers everything you need to build, configure, and run multi-agent AI pipelines on HPC clusters using the Dragon AI Agent Framework. It is written for first-time users. By the end you will understand every available API, every configuration option, and enough architecture to debug performance issues.

For the internal architecture details (Dragon primitives, source-level code paths), see Agent Framework — Developer Guide. For the auto-generated API reference, see Agent Framework.

Overview — What the Framework Does

The Dragon AI Agent Framework runs LLM-powered multi-agent workflows on HPC clusters. You define:

  1. Agents — persistent processes with a role (system prompt), tools, and an LLM backend.

  2. A pipeline — a directed acyclic graph (DAG) of tasks, each assigned to an agent.

  3. An orchestrator — the entry point that creates shared state, builds the DAG, dispatches tasks to agents, and collects results.

At runtime the framework:

  • Creates a shared DDict (Distributed Dictionary) called the Scoreboard that stores all task state, results, and observability data.

  • Builds a Dragon Batch DAG where each node is a lightweight dispatcher closure that puts a message on the target agent’s input Queue and blocks on a completion Event.

  • Each agent runs an asyncio event loop, picks up messages from its Queue, runs an LLM tool-calling loop, writes results to the Scoreboard, and signals the dispatcher.

  • Agents are stateless — zero per-task state is held between tasks. All context lives in the Scoreboard DDict.

┌─ Orchestrator Process ──────────────────────────────────────────────┐
│  DDict (Scoreboard)  ←→  Dragon Batch DAG  ←→  HITL TCP Bridge      │
│                           │    │    │           Trace TCP Bridge    │
└───────────────────────────┼────┼────┼───────────────────────────────┘
                            │    │    │
             ┌──────────────┘    │    └──────────────┐
             ▼                   ▼                   ▼
┌─ Agent Process ───┐  ┌─ Agent Process ───┐  ┌─ Agent Process ───┐
│  Input Queue      │  │  Input Queue      │  │  Input Queue      │
│  LLM Proxy        │  │  LLM Proxy        │  │  LLM Proxy        │
│  Tool Registry    │  │  Tool Registry    │  │  Tool Registry    │
│  MCP Clients      │  │  MCP Clients      │  │  MCP Clients      │
│  asyncio loop     │  │  asyncio loop     │  │  asyncio loop     │
└───────────────────┘  └───────────────────┘  └───────────────────┘

Step 1 — Set Up the Inference Backend

Before creating agents you need an LLM inference service. The Inference class manages the vLLM backend with multi-GPU tensor parallelism, request batching, and dynamic worker management.

Listing 83 Inference pipeline setup
 1from dragon.native.queue import Queue
 2from dragon.ai.inference.inference_utils import Inference
 3from dragon.ai.inference.config import (
 4    InferenceConfig, ModelConfig, HardwareConfig, BatchingConfig,
 5    GuardrailsConfig, DynamicWorkerConfig,
 6)
 7
 8inference_queue = Queue()
 9
10inference = Inference(
11    config=InferenceConfig(
12        model=ModelConfig(
13            model_name="meta-llama/Llama-3.1-70B-Instruct",
14            hf_token="hf_...",
15            tp_size=4,              # tensor parallelism across 4 GPUs
16            max_tokens=4096,
17            max_model_len=8192,
18        ),
19        hardware=HardwareConfig(
20            num_gpus=4,
21            num_nodes=1,
22        ),
23        batching=BatchingConfig(
24            enabled=True,
25            batch_wait_seconds=0.1,
26            max_batch_size=32,
27        ),
28        guardrails=GuardrailsConfig(enabled=False),
29        dynamic_worker=DynamicWorkerConfig(enabled=False),
30        flask_secret_key="",
31        run_type="agent",
32        token="",
33    ),
34    input_queue=inference_queue,
35)
36inference.initialize()

The inference_queue is the shared Queue that agents will use to send LLM requests. Multiple agents can share the same queue — the inference backend handles batching and routing internally.

InferenceConfig Reference

Sub-config

Key fields

ModelConfig

model_name (str), hf_token (str), tp_size (int — tensor parallelism), max_tokens (int — max new tokens), dtype (str, default "bfloat16"), top_k (int, default 50), top_p (float, default 0.95), system_prompt (list[str])

HardwareConfig

num_nodes (int, -1 = auto), num_gpus (int, -1 = all), num_inf_workers_per_cpu (int, default 4), node_offset (int, default 0 — skip first N nodes)

BatchingConfig

enabled (bool, default True), batch_type ("dynamic" or "pre-batch"), batch_wait_seconds (float, default 0.1), max_batch_size (int, default 60)

GuardrailsConfig

enabled (bool, default True), prompt_guard_model (str), prompt_guard_sensitivity (float, 0.0–1.0)

DynamicWorkerConfig

enabled (bool, default True), min_active_workers_per_cpu (int), spin_down_threshold_seconds (int), spin_up_threshold_seconds (int), spin_up_prompt_threshold (int)

Separate Summarizer Model

For memory management with the SUMMARIZE strategy (see MemoryConfig Reference), you can run a second, smaller model on a separate GPU partition:

Listing 84 Summarizer on a second node
 1summarizer_queue = Queue()
 2summarizer_inference = Inference(
 3    config=InferenceConfig(
 4        model=ModelConfig(
 5            model_name="meta-llama/Llama-3.1-8B-Instruct",
 6            hf_token="hf_...", tp_size=1, max_tokens=2048,
 7        ),
 8        hardware=HardwareConfig(num_gpus=1, num_nodes=1, node_offset=1),
 9        batching=BatchingConfig(enabled=True, batch_wait_seconds=0.05),
10        guardrails=GuardrailsConfig(enabled=False),
11        dynamic_worker=DynamicWorkerConfig(enabled=False),
12        flask_secret_key="", run_type="agent", token="",
13    ),
14    input_queue=summarizer_queue,
15)
16summarizer_inference.initialize()

Step 2 — Define Tools

Tools are callable functions that agents invoke during their reasoning loop. The LLM decides which tool to call and with what arguments based on the tool’s name, description, and parameter schema.

The ToolRegistry holds all tools for a single agent.

Registration Methods

There are three ways to register tools:

1. ``@registry.tool`` decorator (recommended for new code):

 1from dragon.ai.agent.tools.registry import ToolRegistry
 2
 3registry = ToolRegistry()
 4
 5@registry.tool
 6def search_database(query: str, max_results: int = 10) -> dict:
 7    """Search the experiment database for matching records.
 8
 9    :param query: The search query string.
10    :param max_results: Maximum number of results to return.
11    """
12    # ... perform search ...
13    return {"results": records}

The decorator wraps the function in a FunctionTool and registers it automatically. The tool name is fn.__name__, the description is the first line of the docstring, and parameters are derived from type hints and :param: docstring entries.

2. ``registry.register(callable)`` (for existing functions):

1from tools.analyzer import analyze_convergence
2registry.register(analyze_convergence)

3. Subclass ``BaseTool`` (for full control over schema):

 1from dragon.ai.agent.tools.base import BaseTool
 2
 3class RunSimulation(BaseTool):
 4    name = "run_simulation"
 5    description = "Run a Monte Carlo simulation with the given parameters."
 6
 7    def run(self, input: dict) -> dict:
 8        n_samples = input["n_samples"]
 9        # ... run simulation ...
10        return {"status": "complete", "result": result}
11
12    def to_schema(self) -> dict:
13        return {
14            "type": "function",
15            "function": {
16                "name": self.name,
17                "description": self.description,
18                "parameters": {
19                    "type": "object",
20                    "properties": {
21                        "n_samples": {"type": "integer", "description": "Number of samples"},
22                    },
23                    "required": ["n_samples"],
24                },
25            },
26        }
27
28registry.register(RunSimulation())

Async Tools

Async callables are detected automatically. The dispatcher awaits them instead of calling synchronously:

1@registry.tool
2async def fetch_remote_data(url: str, timeout: float = 30.0) -> dict:
3    """Fetch data from a remote HTTP endpoint."""
4    async with aiohttp.ClientSession() as session:
5        async with session.get(url, timeout=timeout) as resp:
6            return await resp.json()

Async tools yield the event loop while waiting for I/O, allowing other concurrent tasks on the same agent to make progress.

ToolRegistry API

Method

Description

register(tool)

Register a BaseTool instance or a plain callable (auto-wrapped in FunctionTool). Overwrites existing tools with the same name.

unregister(name)

Remove a tool by name. No-op if not present.

get(name)

Return the tool registered under name. Raises KeyError if not found.

has(name)

Return True if a tool with name is registered.

list_tools()

Return a list of OpenAI-compatible tool/function schema dicts.

tool_names()

Return a sorted list of registered tool names.

tool(fn)

Decorator that wraps fn as a FunctionTool and registers it.

Step 3 — Configure Agents

Each agent needs an AgentConfig that defines its identity, role, and connections.

Listing 85 AgentConfig example
 1from dragon.ai.agent.config.agent_config import AgentConfig
 2from dragon.ai.agent.config.memory_config import MemoryConfig, MemoryStrategy
 3
 4planner_config = AgentConfig(
 5    agent_id="planner",
 6    name="Experiment Planner",
 7    role="You are an experiment planner. Given a research question, "
 8         "design a set of experiments to answer it.",
 9    inference_queue=inference_queue,
10    summarizer_inference_queue=summarizer_queue,   # optional
11    memory=MemoryConfig(
12        strategy=MemoryStrategy.SUMMARIZE,
13        max_kept_turns=8,
14        summarize_after_turns=6,
15    ),
16    max_tool_call_iterations=15,
17    max_concurrent_requests=16,
18    approval_filter=lambda name, args: name == "propose_experiment",
19)

AgentConfig Reference

Field

Default

Description

agent_id

(required)

Unique identifier within the pipeline.

name

(required)

Human-readable display name.

role

(required)

System prompt — the LLM’s persona for this agent.

inference_queue

None

Dragon Queue feeding the inference backend. When set, a DragonQueueLLMProxy is auto-created.

input_queue

None

The agent’s input Queue — populated automatically via the reply queue handshake during agent startup. Do not set manually.

max_concurrent_requests

None (=32)

Maximum concurrent in-flight LLM requests per agent. Controls the size of the ResponseQueuePool.

summarizer_inference_queue

None

Separate inference queue for a dedicated summarizer model. Used when memory.strategy=SUMMARIZE.

node_affinity

""

Placement hint (hostname or node index). Informational — actual placement is controlled by Policy during process launch.

approval_filter

None

HITL gating predicate: (tool_name: str, tool_args: dict) -> bool. When set, tool calls matching the filter are paused for human approval before execution.

max_tool_call_iterations

20

Maximum LLM iterations in the tool-calling loop. Each iteration is one LLM call that may produce a tool request or final answer. Increase for agents with many tool calls or HITL feedback rounds.

memory

None

Memory management config. None = keep everything (no pruning). See MemoryConfig Reference.

MemoryConfig Reference

Controls how the agent manages its conversation history during the tool-calling loop. Without memory management, the message list grows without bound and can exceed the model’s context window.

 1from dragon.ai.agent.config.memory_config import MemoryConfig, MemoryStrategy
 2
 3# No pruning (default — keeps everything)
 4AgentConfig(..., memory=None)
 5
 6# Sliding window — drop old turns
 7AgentConfig(..., memory=MemoryConfig(
 8    strategy=MemoryStrategy.SLIDING_WINDOW,
 9    max_kept_turns=8,
10))
11
12# Summarize old turns via LLM
13AgentConfig(..., memory=MemoryConfig(
14    strategy=MemoryStrategy.SUMMARIZE,
15    max_kept_turns=8,
16    summarize_after_turns=6,
17))

MemoryStrategy enum:

Value

Behavior

FULL

Keep every message. Same as memory=None. Simple but risks context overflow on long tasks.

SLIDING_WINDOW

Drop older turns beyond max_kept_turns. Inserts a synthetic note: [Memory: N earlier tool-call pairs were removed].

SUMMARIZE

When summarize_after_turns pruneable turns have accumulated, call the LLM to condense them into a rolling summary. Prior summaries are incrementally updated, not regenerated.

MemoryConfig fields:

Field

Default

Description

strategy

SLIDING_WINDOW

Memory management strategy.

max_kept_turns

8

Number of recent turn-pairs to keep in full. A turn-pair is one assistant tool-call request + its tool result(s).

max_tool_result_chars

5000

Maximum characters per tool result content field.

summarize_after_turns

6

(SUMMARIZE only) Trigger summarization when this many pruneable turns have accumulated.

summarizer_max_tool_chars

None

Maximum chars per tool-result when building the summarizer input. None = no truncation.

summarizer_max_content_chars

None

Maximum chars per non-tool message when building the summarizer input. None = no truncation.

MCPServerConfig Reference

Connect agents to remote MCP (Model Context Protocol) servers for additional tools:

1from dragon.ai.agent.config.agent_config import MCPServerConfig
2
3mcp_servers = [
4    MCPServerConfig(
5        url="http://mcp-server:8080/mcp",
6        alias="jupyter",
7        token="my-api-token",
8    ),
9]

Tools from MCP servers are automatically discovered at agent startup and exposed with scoped names: {alias}__{tool_name} (e.g., jupyter__create_notebook).

Field

Default

Description

url

(required)

Full HTTP/HTTPS URL of the MCP server endpoint.

alias

(required)

Short unique label used as the tool name prefix.

token

None

Bearer token for authentication.

max_retries

3

Connection attempts before raising ConnectionError.

retry_delay

0.5

Seconds between retry attempts.

timeout

5.0

Per-attempt connection timeout in seconds.

Step 4 — Build a Pipeline

A Pipeline defines the DAG topology. Each PipelineNode specifies which agent handles it and which nodes it depends on.

Listing 86 Multi-agent pipeline DAG
 1from dragon.ai.agent.config.pipeline import Pipeline, PipelineNode
 2
 3pipeline = Pipeline(nodes=[
 4    PipelineNode(
 5        agent_id="planner",
 6        task_description="Design experiments to test the hypothesis.",
 7    ),
 8    PipelineNode(
 9        agent_id="runner",
10        task_description="Execute the planned experiments.",
11        depends_on=["planner"],
12    ),
13    PipelineNode(
14        agent_id="analyzer",
15        task_description="Analyze the experiment results.",
16        depends_on=["runner"],
17    ),
18    PipelineNode(
19        agent_id="reporter",
20        task_description="Write a summary report.",
21        depends_on=["analyzer"],
22    ),
23])

Nodes are topologically sorted automatically. Nodes with no dependencies run first; nodes whose dependencies are all satisfied run in parallel.

PipelineNode Reference

Field

Default

Description

agent_id

(required)

Unique node identifier (also the agent id for agent-backed nodes).

task_description

""

Human-readable task sent to the agent. Ignored for function nodes.

fn

None

Optional callable for function nodes (no LLM). Must accept (*upstream_results: TaskResult) -> TaskResult.

depends_on

[]

List of agent_id values whose results must be available first.

Validation: Pipeline.__post_init__ rejects duplicate agent_id values, undefined depends_on references, and cycles.

Function Nodes

Not every node needs an LLM. A PipelineNode with fn=<callable> is wired directly into the Batch graph:

Listing 87 Function node for deterministic post-processing
 1from dragon.data.ddict import DDict
 2from dragon.ai.agent.config.pipeline import TaskResult, TaskStatus
 3from dragon.ai.agent.ddict import DDictAccessor
 4from dragon.ai.agent.config import DISPATCH_ID_KEY, RESULT_KEY
 5
 6def save_report(*upstreams: TaskResult) -> TaskResult:
 7    """Write the report to disk — no LLM needed."""
 8    ddict = DDict.attach(upstreams[0].serialized_ddict)
 9    accessor = DDictAccessor(ddict, agent_id="reporter",
10                             task_id=upstreams[0].task_id)
11    dispatch_id = accessor.get(DISPATCH_ID_KEY.format(
12        task_id=upstreams[0].task_id, agent_id="reporter"))
13    result = accessor.get(RESULT_KEY.format(
14        task_id=upstreams[0].task_id, agent_id="reporter",
15        dispatch_id=dispatch_id))
16    with open("report.md", "w") as f:
17        f.write(result.get("response", ""))
18    ddict.detach()
19    return TaskResult(
20        task_id=upstreams[0].task_id, agent_id="save_report",
21        status=TaskStatus.DONE,
22        serialized_ddict=upstreams[0].serialized_ddict)
23
24pipeline = Pipeline(nodes=[
25    PipelineNode(agent_id="reporter",
26                 task_description="Write the report."),
27    PipelineNode(agent_id="save_report", fn=save_report,
28                 depends_on=["reporter"]),
29])

TaskResult and TaskStatus

TaskResult is the lightweight token passed between Batch nodes:

Field

Description

task_id

Pipeline run identifier.

agent_id

Identifier of the agent that produced this result.

status

TaskStatus enum value.

serialized_ddict

Serialized DDict handle — function nodes use this to attach.

metadata

Arbitrary extra data (dict).

TaskStatus enum values: READY, PROCESSING, WAITING (HITL), DONE, ERROR.

Step 5 — Start Agent Processes

Agents must be started as persistent Dragon Processes before running the orchestrator.

Listing 88 Launching agent processes with node placement
 1from dragon.native.process import Process
 2from dragon.native.queue import Queue
 3from dragon.native.event import Event
 4from dragon.infrastructure.policy import Policy
 5from dragon.infrastructure.facts import HOST_NAME
 6from dragon.native.machine import System, Node
 7
 8from dragon.ai.agent.core.sub_agent import create_sub_agent
 9
10# Discover cluster topology
11all_nodes = System().nodes
12head_node = Node(all_nodes[0]).hostname
13compute_node = Node(all_nodes[1]).hostname
14
15shutdown_event = Event()
16
17# Launch each agent
18agents = {}  # agent_id -> (Process, reply_queue)
19for config, tools, mcps in [
20    (planner_config, planner_tools, None),
21    (runner_config, runner_tools, None),
22    (analyzer_config, analyzer_tools, None),
23    (reporter_config, reporter_tools, mcp_servers),
24]:
25    reply_q = Queue()
26    p = Process(
27        target=create_sub_agent,
28        args=(config, tools, mcps, shutdown_event, reply_q),
29        policy=Policy(placement=HOST_NAME, host_name=compute_node),
30    )
31    p.start()
32    agents[config.agent_id] = (p, reply_q)
33
34# Collect input queues via reply queue handshake
35for agent_id, (proc, reply_q) in agents.items():
36    input_queue = reply_q.get(timeout=60)
37    # Store the queue on the config so the orchestrator can find it
38    for cfg in [planner_config, runner_config, analyzer_config, reporter_config]:
39        if cfg.agent_id == agent_id:
40            cfg.input_queue = input_queue

The reply queue handshake is how the parent discovers each agent’s input queue: the agent creates a Queue inside its own process, serializes it, and puts it on the reply queue.

create_sub_agent Parameters

Parameter

Description

config

AgentConfig — agent identity, role, inference queue.

tool_registry

ToolRegistry — local tools. None for no tools.

mcp_servers

list[MCPServerConfig] — remote MCP server connections. None for no MCP.

shutdown_event

dragon.native.event.Event — set by the parent to stop the agent.

reply_queue

dragon.native.queue.Queue — agent puts its input queue here after construction so the parent can retrieve it.

Step 6 — Configure and Run the Orchestrator

OrchestratorConfig Reference

Field

Default

Description

agents

[]

List of AgentConfig instances. Duplicate agent_id values are rejected.

poll_interval

0.5

Seconds between queue polls (used for HITL bridge).

poll_timeout

120.0

Maximum seconds to wait per agent task. The dispatcher blocks on completion_event.wait(timeout=poll_timeout). Increase for long-running tasks.

tracing

False

Enable span-based observability. True creates per-task DictTracingProcessor instances, writes per-event DDict keys, and starts a TCP bridge for live viewing.

ddict_kwargs

{}

Passed to DDict(**kwargs). Tune managers_per_node, n_nodes, total_mem for large pipelines. Defaults applied when keys are absent: managers_per_node=1, n_nodes=1, trace=False.

Running a Pipeline

Listing 89 Full orchestrator setup and run
 1from dragon.ai.agent.config.agent_config import OrchestratorConfig
 2from dragon.ai.agent.orchestrator.orchestrator import DAGOrchestrator
 3from dragon.workflows.batch import Batch
 4
 5orch_config = OrchestratorConfig(
 6    agents=[planner_config, runner_config, analyzer_config, reporter_config],
 7    poll_timeout=300.0,
 8    tracing=True,
 9    ddict_kwargs={"managers_per_node": 2, "total_mem": 1 << 30},
10)
11
12orchestrator = DAGOrchestrator(config=orch_config, pipeline=pipeline)
13
14# Connection instructions — available immediately after construction
15if orchestrator.hitl_address:
16    host, port = orchestrator.hitl_address
17    print(f"HITL client:  python -m dragon.ai.agent.hitl --tcp {host}:{port}")
18if orchestrator.trace_address:
19    host, port = orchestrator.trace_address
20    print(f"Trace viewer: python -m dragon.ai.agent.observability --tcp {host}:{port} -i")
21
22try:
23    batch = Batch()
24    result = orchestrator.run(
25        user_input="Test whether increasing learning rate improves convergence.",
26        batch=batch,
27    )
28    print(result)
29finally:
30    orchestrator.destroy()
31    batch.close()
32    batch.join()
33    # Shutdown agents
34    shutdown_event.set()
35    for proc, _ in agents.values():
36        proc.join(timeout=30)
37    inference.destroy()

Launch the script with the Dragon runtime:

dragon my_pipeline.py

DAGOrchestrator API

Method / Property

Description

__init__(config, pipeline)

Creates shared DDict, starts HITL and trace TCP bridges. All properties below are available immediately after construction.

run(user_input, batch)

Execute the pipeline. Returns the terminal node’s result (single terminal) or a dict[agent_id, result] (multiple terminals).

destroy()

Stop TCP bridges, destroy HITL Queue and DDict. Safe to call multiple times.

hitl_address

(host, port) of the HITL TCP bridge, or None.

trace_address

(host, port) of the trace TCP bridge, or None.

ddict

The shared DDict instance.

serialized_ddict

Serialized DDict descriptor (str). External clients can attach with DDict.attach(orchestrator.serialized_ddict).

What Happens Inside run()

Understanding the execution flow helps with debugging and performance analysis:

  1. Writes the user prompt to the DDict and seeds global state.

  2. Builds a Dragon Batch DAG — each agent-backed node becomes a dispatcher closure that:

    1. Attaches to DDict, creates a completion Event.

    2. Puts a Message(header=DispatchHeader(...)) on the agent’s input Queue.

    3. Blocks on completion_event.wait(timeout=poll_timeout) — zero polling.

    4. Reads status from DDict (DONE or ERROR).

    5. Returns a TaskResult that Dragon Batch passes to downstream nodes.

  3. Calls task_handle.get() on terminal nodes — blocks until done.

  4. Assembles results from per-agent DDict keys.

If an agent task fails, the dispatcher reads status=ERROR from DDict and raises RuntimeError, which Dragon Batch propagates to downstream nodes.

Agent Concurrency

A single agent process can handle multiple tasks concurrently. Understanding the concurrency model is important for tuning performance and debugging.

How It Works

Each agent runs an asyncio event loop:

  1. SubAgent.listen() calls await comm.receive(timeout=1.0) — the blocking Queue.get() is offloaded to a thread via asyncio.to_thread() so the event loop stays free.

  2. On message arrival, the agent dispatches _handle_message(msg) as an independent asyncio.Task via asyncio.create_task() and immediately loops back to polling.

  3. Multiple tasks run concurrently — the event loop is free during all LLM, MCP, and tool I/O waits.

Listing 90 Concurrent task processing within a single agent
┌─ Agent Process (asyncio event loop) ──────────────────────────────┐
│                                                                   │
│  listen loop:  receive → create_task → receive → create_task → …  │
│                    │                      │                       │
│    Task A: attach DDict → LLM call ⏳ → tool call → LLM → done    │
│    Task B:              attach DDict → LLM call ⏳ → tool → done  │
│    Task C:                          attach DDict → LLM call ⏳ …  │
│                                                                   │
│  ⏳ = awaiting asyncio.to_thread(queue.get) — event loop is free  │
└───────────────────────────────────────────────────────────────────┘

Task Isolation

Tasks are fully isolated from each other:

  • Each _handle_message() attaches to the DDict independently and detaches in a finally block.

  • Exceptions are caught at the task boundary and never re-raised — a failing task writes status=ERROR to the DDict and signals the dispatcher’s completion event, but the agent process and all other concurrent tasks continue running.

  • Each task gets its own DDictAccessor scoped by dispatch_id, so concurrent tasks never collide on DDict keys.

LLM Request Pool

Each agent’s DragonQueueLLMProxy maintains a ResponseQueuePool — a pool of pre-allocated response Queues (default size: 32). When the agent sends a prompt, it borrows a response queue; when the response arrives, the queue is returned. Configure via AgentConfig.max_concurrent_requests:

1config = AgentConfig(
2    agent_id="runner", name="Runner", role="...",
3    inference_queue=inference_queue,
4    max_concurrent_requests=16,   # default is 32
5)

If the pool is exhausted, the next LLM request blocks until a queue is returned.

Graceful Shutdown

When shutdown_event.set() is called, the listen loop stops accepting new messages and awaits all in-flight tasks via asyncio.gather(*inflight, return_exceptions=True). Every task completes its DDict writes and fires its completion event before the process exits.

Human-in-the-Loop (HITL)

The HITL system provides a human approval gate for tool calls that require oversight.

Configuring HITL

Set an approval_filter on the agent config — a callable that receives the tool name and arguments and returns True if approval is needed:

1def needs_approval(tool_name: str, tool_args: dict) -> bool:
2    """Require approval for any tool that launches experiments."""
3    return tool_name in {"propose_experiment", "launch_experiment"}
4
5config = AgentConfig(
6    agent_id="runner", name="Runner", role="...",
7    inference_queue=inference_queue,
8    approval_filter=needs_approval,
9)

When any agent in the pipeline has an approval_filter, the orchestrator automatically creates a HITL Queue and TCP bridge on construction.

Running the HITL Client

Start the terminal client from a separate terminal (no Dragon runtime required):

python -m dragon.ai.agent.hitl --tcp HOST:PORT

The client presents each pending request with colored formatting and prompts the operator to:

  • Approve — the tool call proceeds.

  • Reject — the tool call is skipped and the LLM receives a rejection message, allowing it to try an alternative approach.

  • Provide feedback — the LLM receives the operator’s text as a tool result and retries with the feedback incorporated.

How HITL Works (Architecture)

Dragon primitives (Queue, Channel, DDict) cannot be deserialized outside the Dragon runtime. The TCP bridge keeps Dragon primitives intra-runtime and sends only JSON to the external client:

  1. Agent creates a per-request response Queue and puts (request, response_queue) on the shared HITL Queue.

  2. HitlTcpBridge (daemon thread in orchestrator) reads from the HITL Queue, sends JSON over TCP.

  3. TCP client reads JSON, gets operator decision, sends JSON response.

  4. Bridge puts HumanApprovalResponse on the per-request response Queue.

  5. Agent’s coroutine unblocks and proceeds.

If the client disconnects, the bridge re-queues the request. If it cannot read a valid response, it sends a synthetic rejection to prevent indefinite blocking.

Tracing and Observability

When OrchestratorConfig.tracing=True, every LLM call, tool execution, HITL decision, and memory operation emits structured span events.

Live Trace Viewer

python -m dragon.ai.agent.observability --tcp HOST:PORT -i

The viewer renders a live Gantt-bar tree of spans in the terminal. Use -i for interactive curses mode with keyboard navigation.

Traces can also be exported as JSONL files for offline analysis or as readable .txt reports that are auto-saved when the viewer exits.

Tracing Architecture

  1. ``DictTracingProcessor`` — per-task processor that writes span start/end events to per-agent DDict keys (TRACE_KEY) and per-event-type keys (LLM_EVENT_KEY, TOOL_EVENT_KEY, etc.) with atomic counters.

  2. ``TraceTcpBridge`` — daemon thread that polls DDict for new span data and streams it as newline-delimited JSON over TCP.

  3. Terminal Viewer — pure Python (no Dragon) — connects via TCP.

When tracing=False, all event writes are guarded and produce zero overhead.

Error Handling

The framework defines a structured exception hierarchy:

Exception

When raised

AgentError

Base class for all framework errors.

ToolExecutionError

A tool call failed. Wraps the original exception plus the tool name and arguments. Fed back to the LLM so it can retry.

AgentLoopError

LLM produced unparseable JSON, or max_tool_call_iterations was exceeded.

HITLBridgeError

HITL TCP bridge encountered a queue or network failure.

CompletionSignalError

completion_event.set() failed — the dispatcher will time out.

AgentObservabilityWarning

Non-fatal DDict/trace write issue (emitted as UserWarning). Promote to error: warnings.filterwarnings("error", category=AgentObservabilityWarning).

Key patterns:

  • Task isolation: one failing task never crashes other concurrent tasks. Errors are published to DDict and propagated via the dispatcher.

  • Tool error tolerance: ToolExecutionError is fed back to the LLM, which can reason about the failure and retry or try an alternative.

  • Graceful degradation: summarization failure falls back to sliding-window pruning; HITL bridge disconnect re-queues requests.

DDict Key Reference

All task state is stored in the Scoreboard DDict using structured key templates. Knowing these keys is useful for debugging and building custom monitoring tools.

Per-invocation keys (scoped by {task_id}:{agent_id}:{dispatch_id}):

Key template

Content

RESULT_KEY

Agent’s final answer (dict).

STATUS_KEY

TaskStatus value (ready, processing, waiting, done, error).

TRACE_KEY

Ordered list of span dicts for this invocation.

LLM_EVENT_KEY

Per-LLM-call event (indexed by {index}).

TOOL_EVENT_KEY

Per-tool-call event (indexed by {index}).

HITL_EVENT_KEY

Per-HITL-decision event (indexed by {index}).

MEMORY_EVENT_KEY

Per-memory-operation event (indexed by {index}).

HITL_REQUEST_KEY

HumanApprovalRequest (for audit logging).

HITL_RESPONSE_KEY

HumanApprovalResponse (for audit logging).

Per-run keys (scoped by {task_id}):

Key template

Content

DISPATCH_ID_KEY

Maps {task_id}:{agent_id}dispatch_id.

USER_INPUT_KEY

The original user prompt.

GLOBAL_STATE_KEY

Ordered list of {agent_id, answer} dicts.

GLOBAL_STATE_ENTRY_KEY

Per-agent atomic entry (avoids read-modify-write races).

HITL_QUEUE_KEY

Serialized Dragon Queue handle for the HITL channel.

Complete Example

Here is a minimal end-to-end script:

Listing 91 Single-agent pipeline
 1import dragon
 2from dragon.native.process import Process
 3from dragon.native.queue import Queue
 4from dragon.native.event import Event
 5from dragon.workflows.batch import Batch
 6
 7from dragon.ai.inference.inference_utils import Inference
 8from dragon.ai.inference.config import (
 9    InferenceConfig, ModelConfig, HardwareConfig, BatchingConfig,
10    GuardrailsConfig, DynamicWorkerConfig,
11)
12from dragon.ai.agent.config.agent_config import AgentConfig, OrchestratorConfig
13from dragon.ai.agent.config.pipeline import Pipeline, PipelineNode
14from dragon.ai.agent.tools.registry import ToolRegistry
15from dragon.ai.agent.core.sub_agent import create_sub_agent
16from dragon.ai.agent.orchestrator.orchestrator import DAGOrchestrator
17
18# 1. Inference backend
19inference_queue = Queue()
20inference = Inference(
21    config=InferenceConfig(
22        model=ModelConfig(model_name="meta-llama/Llama-3.1-8B-Instruct",
23                          hf_token="hf_...", tp_size=1, max_tokens=2048),
24        hardware=HardwareConfig(num_gpus=1, num_nodes=1),
25        batching=BatchingConfig(enabled=True),
26        guardrails=GuardrailsConfig(enabled=False),
27        dynamic_worker=DynamicWorkerConfig(enabled=False),
28        flask_secret_key="", run_type="agent", token="",
29    ),
30    input_queue=inference_queue,
31)
32inference.initialize()
33
34# 2. Tools
35registry = ToolRegistry()
36
37@registry.tool
38def add(a: int, b: int) -> int:
39    """Add two numbers."""
40    return a + b
41
42# 3. Agent config
43config = AgentConfig(
44    agent_id="assistant",
45    name="Math Assistant",
46    role="You are a math assistant. Use the add tool to compute sums.",
47    inference_queue=inference_queue,
48)
49
50# 4. Start agent process
51shutdown = Event()
52reply_q = Queue()
53p = Process(target=create_sub_agent,
54            args=(config, registry, None, shutdown, reply_q))
55p.start()
56config.input_queue = reply_q.get(timeout=60)
57
58# 5. Pipeline and orchestrator
59pipeline = Pipeline(nodes=[
60    PipelineNode(agent_id="assistant",
61                 task_description="Compute the sum of 17 and 25."),
62])
63orch = DAGOrchestrator(
64    config=OrchestratorConfig(agents=[config]),
65    pipeline=pipeline,
66)
67
68try:
69    batch = Batch()
70    result = orch.run("What is 17 + 25?", batch=batch)
71    print(result)
72finally:
73    orch.destroy()
74    batch.close()
75    batch.join()
76    shutdown.set()
77    p.join(timeout=30)
78    inference.destroy()