Dragon AI Agent Framework — User Guide
This guide covers everything you need to build, configure, and run multi-agent AI pipelines on HPC clusters using the Dragon AI Agent Framework. It is written for first-time users. By the end you will understand every available API, every configuration option, and enough architecture to debug performance issues.
For the internal architecture details (Dragon primitives, source-level code paths), see Agent Framework — Developer Guide. For the auto-generated API reference, see Agent Framework.
Overview — What the Framework Does
The Dragon AI Agent Framework runs LLM-powered multi-agent workflows on HPC clusters. You define:
Agents — persistent processes with a role (system prompt), tools, and an LLM backend.
A pipeline — a directed acyclic graph (DAG) of tasks, each assigned to an agent.
An orchestrator — the entry point that creates shared state, builds the DAG, dispatches tasks to agents, and collects results.
At runtime the framework:
Creates a shared
DDict(Distributed Dictionary) called the Scoreboard that stores all task state, results, and observability data.Builds a Dragon Batch DAG where each node is a lightweight dispatcher closure that puts a message on the target agent’s input
Queueand blocks on a completionEvent.Each agent runs an
asyncioevent loop, picks up messages from its Queue, runs an LLM tool-calling loop, writes results to the Scoreboard, and signals the dispatcher.Agents are stateless — zero per-task state is held between tasks. All context lives in the Scoreboard DDict.
┌─ Orchestrator Process ──────────────────────────────────────────────┐
│ DDict (Scoreboard) ←→ Dragon Batch DAG ←→ HITL TCP Bridge │
│ │ │ │ Trace TCP Bridge │
└───────────────────────────┼────┼────┼───────────────────────────────┘
│ │ │
┌──────────────┘ │ └──────────────┐
▼ ▼ ▼
┌─ Agent Process ───┐ ┌─ Agent Process ───┐ ┌─ Agent Process ───┐
│ Input Queue │ │ Input Queue │ │ Input Queue │
│ LLM Proxy │ │ LLM Proxy │ │ LLM Proxy │
│ Tool Registry │ │ Tool Registry │ │ Tool Registry │
│ MCP Clients │ │ MCP Clients │ │ MCP Clients │
│ asyncio loop │ │ asyncio loop │ │ asyncio loop │
└───────────────────┘ └───────────────────┘ └───────────────────┘
Step 1 — Set Up the Inference Backend
Before creating agents you need an LLM inference service. The Inference
class manages the vLLM backend with multi-GPU tensor parallelism, request
batching, and dynamic worker management.
1from dragon.native.queue import Queue
2from dragon.ai.inference.inference_utils import Inference
3from dragon.ai.inference.config import (
4 InferenceConfig, ModelConfig, HardwareConfig, BatchingConfig,
5 GuardrailsConfig, DynamicWorkerConfig,
6)
7
8inference_queue = Queue()
9
10inference = Inference(
11 config=InferenceConfig(
12 model=ModelConfig(
13 model_name="meta-llama/Llama-3.1-70B-Instruct",
14 hf_token="hf_...",
15 tp_size=4, # tensor parallelism across 4 GPUs
16 max_tokens=4096,
17 max_model_len=8192,
18 ),
19 hardware=HardwareConfig(
20 num_gpus=4,
21 num_nodes=1,
22 ),
23 batching=BatchingConfig(
24 enabled=True,
25 batch_wait_seconds=0.1,
26 max_batch_size=32,
27 ),
28 guardrails=GuardrailsConfig(enabled=False),
29 dynamic_worker=DynamicWorkerConfig(enabled=False),
30 flask_secret_key="",
31 run_type="agent",
32 token="",
33 ),
34 input_queue=inference_queue,
35)
36inference.initialize()
The inference_queue is the shared Queue
that agents will use to send LLM requests. Multiple agents can share the same
queue — the inference backend handles batching and routing internally.
InferenceConfig Reference
Sub-config |
Key fields |
|---|---|
|
|
|
|
|
|
|
|
|
|
Separate Summarizer Model
For memory management with the SUMMARIZE strategy (see
MemoryConfig Reference), you can run a second, smaller model on a
separate GPU partition:
1summarizer_queue = Queue()
2summarizer_inference = Inference(
3 config=InferenceConfig(
4 model=ModelConfig(
5 model_name="meta-llama/Llama-3.1-8B-Instruct",
6 hf_token="hf_...", tp_size=1, max_tokens=2048,
7 ),
8 hardware=HardwareConfig(num_gpus=1, num_nodes=1, node_offset=1),
9 batching=BatchingConfig(enabled=True, batch_wait_seconds=0.05),
10 guardrails=GuardrailsConfig(enabled=False),
11 dynamic_worker=DynamicWorkerConfig(enabled=False),
12 flask_secret_key="", run_type="agent", token="",
13 ),
14 input_queue=summarizer_queue,
15)
16summarizer_inference.initialize()
Step 2 — Define Tools
Tools are callable functions that agents invoke during their reasoning loop. The LLM decides which tool to call and with what arguments based on the tool’s name, description, and parameter schema.
The ToolRegistry holds all tools for a single agent.
Registration Methods
There are three ways to register tools:
1. ``@registry.tool`` decorator (recommended for new code):
1from dragon.ai.agent.tools.registry import ToolRegistry
2
3registry = ToolRegistry()
4
5@registry.tool
6def search_database(query: str, max_results: int = 10) -> dict:
7 """Search the experiment database for matching records.
8
9 :param query: The search query string.
10 :param max_results: Maximum number of results to return.
11 """
12 # ... perform search ...
13 return {"results": records}
The decorator wraps the function in a FunctionTool and registers it
automatically. The tool name is fn.__name__, the description is the first
line of the docstring, and parameters are derived from type hints and
:param: docstring entries.
2. ``registry.register(callable)`` (for existing functions):
1from tools.analyzer import analyze_convergence
2registry.register(analyze_convergence)
3. Subclass ``BaseTool`` (for full control over schema):
1from dragon.ai.agent.tools.base import BaseTool
2
3class RunSimulation(BaseTool):
4 name = "run_simulation"
5 description = "Run a Monte Carlo simulation with the given parameters."
6
7 def run(self, input: dict) -> dict:
8 n_samples = input["n_samples"]
9 # ... run simulation ...
10 return {"status": "complete", "result": result}
11
12 def to_schema(self) -> dict:
13 return {
14 "type": "function",
15 "function": {
16 "name": self.name,
17 "description": self.description,
18 "parameters": {
19 "type": "object",
20 "properties": {
21 "n_samples": {"type": "integer", "description": "Number of samples"},
22 },
23 "required": ["n_samples"],
24 },
25 },
26 }
27
28registry.register(RunSimulation())
Async Tools
Async callables are detected automatically. The dispatcher awaits them
instead of calling synchronously:
1@registry.tool
2async def fetch_remote_data(url: str, timeout: float = 30.0) -> dict:
3 """Fetch data from a remote HTTP endpoint."""
4 async with aiohttp.ClientSession() as session:
5 async with session.get(url, timeout=timeout) as resp:
6 return await resp.json()
Async tools yield the event loop while waiting for I/O, allowing other concurrent tasks on the same agent to make progress.
ToolRegistry API
Method |
Description |
|---|---|
|
Register a |
|
Remove a tool by name. No-op if not present. |
|
Return the tool registered under name. Raises |
|
Return |
|
Return a list of OpenAI-compatible tool/function schema dicts. |
|
Return a sorted list of registered tool names. |
|
Decorator that wraps fn as a |
Step 3 — Configure Agents
Each agent needs an AgentConfig that defines its identity, role, and
connections.
1from dragon.ai.agent.config.agent_config import AgentConfig
2from dragon.ai.agent.config.memory_config import MemoryConfig, MemoryStrategy
3
4planner_config = AgentConfig(
5 agent_id="planner",
6 name="Experiment Planner",
7 role="You are an experiment planner. Given a research question, "
8 "design a set of experiments to answer it.",
9 inference_queue=inference_queue,
10 summarizer_inference_queue=summarizer_queue, # optional
11 memory=MemoryConfig(
12 strategy=MemoryStrategy.SUMMARIZE,
13 max_kept_turns=8,
14 summarize_after_turns=6,
15 ),
16 max_tool_call_iterations=15,
17 max_concurrent_requests=16,
18 approval_filter=lambda name, args: name == "propose_experiment",
19)
AgentConfig Reference
Field |
Default |
Description |
|---|---|---|
|
(required) |
Unique identifier within the pipeline. |
|
(required) |
Human-readable display name. |
|
(required) |
System prompt — the LLM’s persona for this agent. |
|
|
Dragon Queue feeding the inference backend. When set, a
|
|
|
The agent’s input Queue — populated automatically via the reply queue handshake during agent startup. Do not set manually. |
|
|
Maximum concurrent in-flight LLM requests per agent. Controls the
size of the |
|
|
Separate inference queue for a dedicated summarizer model. Used when
|
|
|
Placement hint (hostname or node index). Informational — actual
placement is controlled by |
|
|
HITL gating predicate: |
|
|
Maximum LLM iterations in the tool-calling loop. Each iteration is one LLM call that may produce a tool request or final answer. Increase for agents with many tool calls or HITL feedback rounds. |
|
|
Memory management config. |
MemoryConfig Reference
Controls how the agent manages its conversation history during the tool-calling loop. Without memory management, the message list grows without bound and can exceed the model’s context window.
1from dragon.ai.agent.config.memory_config import MemoryConfig, MemoryStrategy
2
3# No pruning (default — keeps everything)
4AgentConfig(..., memory=None)
5
6# Sliding window — drop old turns
7AgentConfig(..., memory=MemoryConfig(
8 strategy=MemoryStrategy.SLIDING_WINDOW,
9 max_kept_turns=8,
10))
11
12# Summarize old turns via LLM
13AgentConfig(..., memory=MemoryConfig(
14 strategy=MemoryStrategy.SUMMARIZE,
15 max_kept_turns=8,
16 summarize_after_turns=6,
17))
MemoryStrategy enum:
Value |
Behavior |
|---|---|
|
Keep every message. Same as |
|
Drop older turns beyond |
|
When |
MemoryConfig fields:
Field |
Default |
Description |
|---|---|---|
|
|
Memory management strategy. |
|
|
Number of recent turn-pairs to keep in full. A turn-pair is one assistant tool-call request + its tool result(s). |
|
|
Maximum characters per tool result content field. |
|
|
( |
|
|
Maximum chars per tool-result when building the summarizer input.
|
|
|
Maximum chars per non-tool message when building the summarizer
input. |
MCPServerConfig Reference
Connect agents to remote MCP (Model Context Protocol) servers for additional tools:
1from dragon.ai.agent.config.agent_config import MCPServerConfig
2
3mcp_servers = [
4 MCPServerConfig(
5 url="http://mcp-server:8080/mcp",
6 alias="jupyter",
7 token="my-api-token",
8 ),
9]
Tools from MCP servers are automatically discovered at agent startup and exposed
with scoped names: {alias}__{tool_name} (e.g., jupyter__create_notebook).
Field |
Default |
Description |
|---|---|---|
|
(required) |
Full HTTP/HTTPS URL of the MCP server endpoint. |
|
(required) |
Short unique label used as the tool name prefix. |
|
|
Bearer token for authentication. |
|
|
Connection attempts before raising |
|
|
Seconds between retry attempts. |
|
|
Per-attempt connection timeout in seconds. |
Step 4 — Build a Pipeline
A Pipeline defines the DAG topology. Each PipelineNode specifies which
agent handles it and which nodes it depends on.
1from dragon.ai.agent.config.pipeline import Pipeline, PipelineNode
2
3pipeline = Pipeline(nodes=[
4 PipelineNode(
5 agent_id="planner",
6 task_description="Design experiments to test the hypothesis.",
7 ),
8 PipelineNode(
9 agent_id="runner",
10 task_description="Execute the planned experiments.",
11 depends_on=["planner"],
12 ),
13 PipelineNode(
14 agent_id="analyzer",
15 task_description="Analyze the experiment results.",
16 depends_on=["runner"],
17 ),
18 PipelineNode(
19 agent_id="reporter",
20 task_description="Write a summary report.",
21 depends_on=["analyzer"],
22 ),
23])
Nodes are topologically sorted automatically. Nodes with no dependencies run first; nodes whose dependencies are all satisfied run in parallel.
PipelineNode Reference
Field |
Default |
Description |
|---|---|---|
|
(required) |
Unique node identifier (also the agent id for agent-backed nodes). |
|
|
Human-readable task sent to the agent. Ignored for function nodes. |
|
|
Optional callable for function nodes (no LLM). Must accept
|
|
|
List of |
Validation: Pipeline.__post_init__ rejects duplicate agent_id
values, undefined depends_on references, and cycles.
Function Nodes
Not every node needs an LLM. A PipelineNode with fn=<callable> is wired
directly into the Batch graph:
1from dragon.data.ddict import DDict
2from dragon.ai.agent.config.pipeline import TaskResult, TaskStatus
3from dragon.ai.agent.ddict import DDictAccessor
4from dragon.ai.agent.config import DISPATCH_ID_KEY, RESULT_KEY
5
6def save_report(*upstreams: TaskResult) -> TaskResult:
7 """Write the report to disk — no LLM needed."""
8 ddict = DDict.attach(upstreams[0].serialized_ddict)
9 accessor = DDictAccessor(ddict, agent_id="reporter",
10 task_id=upstreams[0].task_id)
11 dispatch_id = accessor.get(DISPATCH_ID_KEY.format(
12 task_id=upstreams[0].task_id, agent_id="reporter"))
13 result = accessor.get(RESULT_KEY.format(
14 task_id=upstreams[0].task_id, agent_id="reporter",
15 dispatch_id=dispatch_id))
16 with open("report.md", "w") as f:
17 f.write(result.get("response", ""))
18 ddict.detach()
19 return TaskResult(
20 task_id=upstreams[0].task_id, agent_id="save_report",
21 status=TaskStatus.DONE,
22 serialized_ddict=upstreams[0].serialized_ddict)
23
24pipeline = Pipeline(nodes=[
25 PipelineNode(agent_id="reporter",
26 task_description="Write the report."),
27 PipelineNode(agent_id="save_report", fn=save_report,
28 depends_on=["reporter"]),
29])
TaskResult and TaskStatus
TaskResult is the lightweight token passed between Batch nodes:
Field |
Description |
|---|---|
|
Pipeline run identifier. |
|
Identifier of the agent that produced this result. |
|
|
|
Serialized DDict handle — function nodes use this to attach. |
|
Arbitrary extra data (dict). |
TaskStatus enum values: READY, PROCESSING, WAITING (HITL),
DONE, ERROR.
Step 5 — Start Agent Processes
Agents must be started as persistent Dragon Processes before running the orchestrator.
1from dragon.native.process import Process
2from dragon.native.queue import Queue
3from dragon.native.event import Event
4from dragon.infrastructure.policy import Policy
5from dragon.infrastructure.facts import HOST_NAME
6from dragon.native.machine import System, Node
7
8from dragon.ai.agent.core.sub_agent import create_sub_agent
9
10# Discover cluster topology
11all_nodes = System().nodes
12head_node = Node(all_nodes[0]).hostname
13compute_node = Node(all_nodes[1]).hostname
14
15shutdown_event = Event()
16
17# Launch each agent
18agents = {} # agent_id -> (Process, reply_queue)
19for config, tools, mcps in [
20 (planner_config, planner_tools, None),
21 (runner_config, runner_tools, None),
22 (analyzer_config, analyzer_tools, None),
23 (reporter_config, reporter_tools, mcp_servers),
24]:
25 reply_q = Queue()
26 p = Process(
27 target=create_sub_agent,
28 args=(config, tools, mcps, shutdown_event, reply_q),
29 policy=Policy(placement=HOST_NAME, host_name=compute_node),
30 )
31 p.start()
32 agents[config.agent_id] = (p, reply_q)
33
34# Collect input queues via reply queue handshake
35for agent_id, (proc, reply_q) in agents.items():
36 input_queue = reply_q.get(timeout=60)
37 # Store the queue on the config so the orchestrator can find it
38 for cfg in [planner_config, runner_config, analyzer_config, reporter_config]:
39 if cfg.agent_id == agent_id:
40 cfg.input_queue = input_queue
The reply queue handshake is how the parent discovers each agent’s input
queue: the agent creates a Queue inside its
own process, serializes it, and puts it on the reply queue.
create_sub_agent Parameters
Parameter |
Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
Step 6 — Configure and Run the Orchestrator
OrchestratorConfig Reference
Field |
Default |
Description |
|---|---|---|
|
|
List of |
|
|
Seconds between queue polls (used for HITL bridge). |
|
|
Maximum seconds to wait per agent task. The dispatcher blocks on
|
|
|
Enable span-based observability. |
|
|
Passed to |
Running a Pipeline
1from dragon.ai.agent.config.agent_config import OrchestratorConfig
2from dragon.ai.agent.orchestrator.orchestrator import DAGOrchestrator
3from dragon.workflows.batch import Batch
4
5orch_config = OrchestratorConfig(
6 agents=[planner_config, runner_config, analyzer_config, reporter_config],
7 poll_timeout=300.0,
8 tracing=True,
9 ddict_kwargs={"managers_per_node": 2, "total_mem": 1 << 30},
10)
11
12orchestrator = DAGOrchestrator(config=orch_config, pipeline=pipeline)
13
14# Connection instructions — available immediately after construction
15if orchestrator.hitl_address:
16 host, port = orchestrator.hitl_address
17 print(f"HITL client: python -m dragon.ai.agent.hitl --tcp {host}:{port}")
18if orchestrator.trace_address:
19 host, port = orchestrator.trace_address
20 print(f"Trace viewer: python -m dragon.ai.agent.observability --tcp {host}:{port} -i")
21
22try:
23 batch = Batch()
24 result = orchestrator.run(
25 user_input="Test whether increasing learning rate improves convergence.",
26 batch=batch,
27 )
28 print(result)
29finally:
30 orchestrator.destroy()
31 batch.close()
32 batch.join()
33 # Shutdown agents
34 shutdown_event.set()
35 for proc, _ in agents.values():
36 proc.join(timeout=30)
37 inference.destroy()
Launch the script with the Dragon runtime:
dragon my_pipeline.py
DAGOrchestrator API
Method / Property |
Description |
|---|---|
|
Creates shared DDict, starts HITL and trace TCP bridges. All properties below are available immediately after construction. |
|
Execute the pipeline. Returns the terminal node’s result (single
terminal) or a |
|
Stop TCP bridges, destroy HITL Queue and DDict. Safe to call multiple times. |
|
|
|
|
|
The shared |
|
Serialized DDict descriptor (str). External clients can attach with
|
What Happens Inside run()
Understanding the execution flow helps with debugging and performance analysis:
Writes the user prompt to the DDict and seeds global state.
Builds a Dragon Batch DAG — each agent-backed node becomes a dispatcher closure that:
Attaches to DDict, creates a completion Event.
Puts a
Message(header=DispatchHeader(...))on the agent’s input Queue.Blocks on
completion_event.wait(timeout=poll_timeout)— zero polling.Reads status from DDict (
DONEorERROR).Returns a
TaskResultthat Dragon Batch passes to downstream nodes.
Calls
task_handle.get()on terminal nodes — blocks until done.Assembles results from per-agent DDict keys.
If an agent task fails, the dispatcher reads status=ERROR from DDict and
raises RuntimeError, which Dragon Batch propagates to downstream nodes.
Agent Concurrency
A single agent process can handle multiple tasks concurrently. Understanding the concurrency model is important for tuning performance and debugging.
How It Works
Each agent runs an asyncio event loop:
SubAgent.listen()callsawait comm.receive(timeout=1.0)— the blockingQueue.get()is offloaded to a thread viaasyncio.to_thread()so the event loop stays free.On message arrival, the agent dispatches
_handle_message(msg)as an independentasyncio.Taskviaasyncio.create_task()and immediately loops back to polling.Multiple tasks run concurrently — the event loop is free during all LLM, MCP, and tool I/O waits.
┌─ Agent Process (asyncio event loop) ──────────────────────────────┐
│ │
│ listen loop: receive → create_task → receive → create_task → … │
│ │ │ │
│ Task A: attach DDict → LLM call ⏳ → tool call → LLM → done │
│ Task B: attach DDict → LLM call ⏳ → tool → done │
│ Task C: attach DDict → LLM call ⏳ … │
│ │
│ ⏳ = awaiting asyncio.to_thread(queue.get) — event loop is free │
└───────────────────────────────────────────────────────────────────┘
Task Isolation
Tasks are fully isolated from each other:
Each
_handle_message()attaches to the DDict independently and detaches in afinallyblock.Exceptions are caught at the task boundary and never re-raised — a failing task writes
status=ERRORto the DDict and signals the dispatcher’s completion event, but the agent process and all other concurrent tasks continue running.Each task gets its own
DDictAccessorscoped bydispatch_id, so concurrent tasks never collide on DDict keys.
LLM Request Pool
Each agent’s DragonQueueLLMProxy maintains a ResponseQueuePool — a pool
of pre-allocated response Queues (default size: 32). When the agent sends a
prompt, it borrows a response queue; when the response arrives, the queue is
returned. Configure via AgentConfig.max_concurrent_requests:
1config = AgentConfig(
2 agent_id="runner", name="Runner", role="...",
3 inference_queue=inference_queue,
4 max_concurrent_requests=16, # default is 32
5)
If the pool is exhausted, the next LLM request blocks until a queue is returned.
Graceful Shutdown
When shutdown_event.set() is called, the listen loop stops accepting new
messages and awaits all in-flight tasks via
asyncio.gather(*inflight, return_exceptions=True). Every task completes its
DDict writes and fires its completion event before the process exits.
Human-in-the-Loop (HITL)
The HITL system provides a human approval gate for tool calls that require oversight.
Configuring HITL
Set an approval_filter on the agent config — a callable that receives the
tool name and arguments and returns True if approval is needed:
1def needs_approval(tool_name: str, tool_args: dict) -> bool:
2 """Require approval for any tool that launches experiments."""
3 return tool_name in {"propose_experiment", "launch_experiment"}
4
5config = AgentConfig(
6 agent_id="runner", name="Runner", role="...",
7 inference_queue=inference_queue,
8 approval_filter=needs_approval,
9)
When any agent in the pipeline has an approval_filter, the orchestrator
automatically creates a HITL Queue and TCP bridge on construction.
Running the HITL Client
Start the terminal client from a separate terminal (no Dragon runtime required):
python -m dragon.ai.agent.hitl --tcp HOST:PORT
The client presents each pending request with colored formatting and prompts the operator to:
Approve — the tool call proceeds.
Reject — the tool call is skipped and the LLM receives a rejection message, allowing it to try an alternative approach.
Provide feedback — the LLM receives the operator’s text as a tool result and retries with the feedback incorporated.
How HITL Works (Architecture)
Dragon primitives (Queue, Channel, DDict) cannot be deserialized outside the Dragon runtime. The TCP bridge keeps Dragon primitives intra-runtime and sends only JSON to the external client:
Agent creates a per-request response Queue and puts
(request, response_queue)on the shared HITL Queue.HitlTcpBridge(daemon thread in orchestrator) reads from the HITL Queue, sends JSON over TCP.TCP client reads JSON, gets operator decision, sends JSON response.
Bridge puts
HumanApprovalResponseon the per-request response Queue.Agent’s coroutine unblocks and proceeds.
If the client disconnects, the bridge re-queues the request. If it cannot read a valid response, it sends a synthetic rejection to prevent indefinite blocking.
Tracing and Observability
When OrchestratorConfig.tracing=True, every LLM call, tool execution, HITL
decision, and memory operation emits structured span events.
Live Trace Viewer
python -m dragon.ai.agent.observability --tcp HOST:PORT -i
The viewer renders a live Gantt-bar tree of spans in the terminal. Use -i
for interactive curses mode with keyboard navigation.
Traces can also be exported as JSONL files for offline analysis or as readable
.txt reports that are auto-saved when the viewer exits.
Tracing Architecture
``DictTracingProcessor`` — per-task processor that writes span start/end events to per-agent DDict keys (
TRACE_KEY) and per-event-type keys (LLM_EVENT_KEY,TOOL_EVENT_KEY, etc.) with atomic counters.``TraceTcpBridge`` — daemon thread that polls DDict for new span data and streams it as newline-delimited JSON over TCP.
Terminal Viewer — pure Python (no Dragon) — connects via TCP.
When tracing=False, all event writes are guarded and produce zero overhead.
Error Handling
The framework defines a structured exception hierarchy:
Exception |
When raised |
|---|---|
|
Base class for all framework errors. |
|
A tool call failed. Wraps the original exception plus the tool name and arguments. Fed back to the LLM so it can retry. |
|
LLM produced unparseable JSON, or |
|
HITL TCP bridge encountered a queue or network failure. |
|
|
|
Non-fatal DDict/trace write issue (emitted as |
Key patterns:
Task isolation: one failing task never crashes other concurrent tasks. Errors are published to DDict and propagated via the dispatcher.
Tool error tolerance:
ToolExecutionErroris fed back to the LLM, which can reason about the failure and retry or try an alternative.Graceful degradation: summarization failure falls back to sliding-window pruning; HITL bridge disconnect re-queues requests.
DDict Key Reference
All task state is stored in the Scoreboard DDict using structured key templates. Knowing these keys is useful for debugging and building custom monitoring tools.
Per-invocation keys (scoped by {task_id}:{agent_id}:{dispatch_id}):
Key template |
Content |
|---|---|
|
Agent’s final answer (dict). |
|
|
|
Ordered list of span dicts for this invocation. |
|
Per-LLM-call event (indexed by |
|
Per-tool-call event (indexed by |
|
Per-HITL-decision event (indexed by |
|
Per-memory-operation event (indexed by |
|
|
|
|
Per-run keys (scoped by {task_id}):
Key template |
Content |
|---|---|
|
Maps |
|
The original user prompt. |
|
Ordered list of |
|
Per-agent atomic entry (avoids read-modify-write races). |
|
Serialized Dragon Queue handle for the HITL channel. |
Complete Example
Here is a minimal end-to-end script:
1import dragon
2from dragon.native.process import Process
3from dragon.native.queue import Queue
4from dragon.native.event import Event
5from dragon.workflows.batch import Batch
6
7from dragon.ai.inference.inference_utils import Inference
8from dragon.ai.inference.config import (
9 InferenceConfig, ModelConfig, HardwareConfig, BatchingConfig,
10 GuardrailsConfig, DynamicWorkerConfig,
11)
12from dragon.ai.agent.config.agent_config import AgentConfig, OrchestratorConfig
13from dragon.ai.agent.config.pipeline import Pipeline, PipelineNode
14from dragon.ai.agent.tools.registry import ToolRegistry
15from dragon.ai.agent.core.sub_agent import create_sub_agent
16from dragon.ai.agent.orchestrator.orchestrator import DAGOrchestrator
17
18# 1. Inference backend
19inference_queue = Queue()
20inference = Inference(
21 config=InferenceConfig(
22 model=ModelConfig(model_name="meta-llama/Llama-3.1-8B-Instruct",
23 hf_token="hf_...", tp_size=1, max_tokens=2048),
24 hardware=HardwareConfig(num_gpus=1, num_nodes=1),
25 batching=BatchingConfig(enabled=True),
26 guardrails=GuardrailsConfig(enabled=False),
27 dynamic_worker=DynamicWorkerConfig(enabled=False),
28 flask_secret_key="", run_type="agent", token="",
29 ),
30 input_queue=inference_queue,
31)
32inference.initialize()
33
34# 2. Tools
35registry = ToolRegistry()
36
37@registry.tool
38def add(a: int, b: int) -> int:
39 """Add two numbers."""
40 return a + b
41
42# 3. Agent config
43config = AgentConfig(
44 agent_id="assistant",
45 name="Math Assistant",
46 role="You are a math assistant. Use the add tool to compute sums.",
47 inference_queue=inference_queue,
48)
49
50# 4. Start agent process
51shutdown = Event()
52reply_q = Queue()
53p = Process(target=create_sub_agent,
54 args=(config, registry, None, shutdown, reply_q))
55p.start()
56config.input_queue = reply_q.get(timeout=60)
57
58# 5. Pipeline and orchestrator
59pipeline = Pipeline(nodes=[
60 PipelineNode(agent_id="assistant",
61 task_description="Compute the sum of 17 and 25."),
62])
63orch = DAGOrchestrator(
64 config=OrchestratorConfig(agents=[config]),
65 pipeline=pipeline,
66)
67
68try:
69 batch = Batch()
70 result = orch.run("What is 17 + 25?", batch=batch)
71 print(result)
72finally:
73 orch.destroy()
74 batch.close()
75 batch.join()
76 shutdown.set()
77 p.join(timeout=30)
78 inference.destroy()