Single Agent with Tool Registration
This is the first and simplest example in the Agent framework series. It demonstrates how to stand up a single Dragon agent with an LLM backend, register a sync tool, and execute a task. This example is the foundation for all subsequent examples — read it first to understand the basic lifecycle and configuration.
What you’ll learn:
How to configure an
AgentConfigwith minimal required parametersHow to register a sync tool using
registry.register(fn)How to set up an inference pipeline and queue
How to create a
Pipelinewith a single agent nodeHow to launch and run a task through the
DAGOrchestrator
Architecture:
The agent process receives a task via Dragon Queue, calls the LLM, uses tools, and returns results via the Scoreboard DDict.
Main Code
Below is the complete example:
1"""01 — Single Agent with Tool Registration.
2
3**Prerequisites:** None — this is the starting point.
4
5**What you'll learn:**
6
7* How to stand up a single Dragon agent with an LLM backend
8* How to register a sync tool using ``registry.register(fn)``
9* Minimal configuration: ``AgentConfig``, ``ToolRegistry``, ``InferenceConfig``
10* The agent lifecycle: create → listen → process → shutdown
11
12Architecture::
13
14 Main Process
15 └── Inference.initialize() ← vLLM engine on 1 GPU
16 └── CPUWorker
17 └── InferenceWorker
18
19 planner_agent (Dragon Process)
20 └── receives task via Dragon Queue
21 └── calls propose_experiment tool
22 └── returns result
23
24This is the simplest possible Dragon agent setup. One agent, one tool,
25one inference pipeline.
26
27Usage::
28
29 dragon 01_single_agent.py
30"""
31
32import asyncio
33
34import dragon
35import multiprocessing as mp
36
37from dragon.ai.agent.core import create_sub_agent
38from dragon.ai.agent.config import (
39 AgentConfig,
40 OrchestratorConfig,
41 Pipeline,
42 PipelineNode,
43)
44from dragon.ai.agent.tools import ToolRegistry
45from dragon.ai.agent.orchestrator import DAGOrchestrator
46from dragon.native.event import Event
47from dragon.native.process import Process
48from dragon.native.queue import Queue
49from dragon.workflows.batch import Batch
50
51from dragon.ai.inference.config import (
52 BatchingConfig,
53 HardwareConfig,
54 InferenceConfig,
55 ModelConfig,
56)
57from dragon.ai.inference.inference_utils import Inference
58
59# --- Tool implementation ---------------------------------------------------
60from tools import propose_experiment
61
62
63# ===========================================================================
64# User-configurable constants
65# ===========================================================================
66
67MODEL_NAME = "/path/to/your/model" # any vLLM-compatible checkpoint
68HF_TOKEN = "" # set if the model is gated # set if the model is gated
69
70
71# ===========================================================================
72# Inference Pipeline Configuration
73#
74# Minimal setup: 1 node, 1 GPU, 1 inference worker.
75# ===========================================================================
76
77INFERENCE_CONFIG = InferenceConfig(
78 model=ModelConfig(
79 model_name=MODEL_NAME,
80 hf_token=HF_TOKEN,
81 tp_size=1, # single GPU
82 max_tokens=8192,
83 max_model_len=32768,
84 ),
85 hardware=HardwareConfig(
86 num_nodes=1,
87 num_gpus=1,
88 num_inf_workers_per_cpu=1,
89 ),
90 batching=BatchingConfig(
91 batch_wait_seconds=0.1,
92 max_batch_size=32,
93 ),
94)
95
96
97# ===========================================================================
98# Tool registry
99#
100# register() accepts any callable — sync or async. It auto-wraps it in
101# a FunctionTool, deriving name, description, and parameter schemas from
102# the function's __name__, __doc__, and type annotations.
103# ===========================================================================
104
105registry = ToolRegistry()
106registry.register(propose_experiment)
107
108
109# ===========================================================================
110# Pipeline — single node, single agent
111# ===========================================================================
112
113pipeline = Pipeline(nodes=[
114 PipelineNode(
115 agent_id="planner_agent",
116 task_description=(
117 "You are a scientific experiment planner. The user wants to "
118 "study Monte Carlo convergence for estimating π.\n\n"
119 "Propose an experiment plan by calling propose_experiment with:\n"
120 " - description, sample_sizes, convergence_target, methodology\n\n"
121 "Report the approved plan verbatim as your final answer."
122 ),
123 depends_on=[],
124 ),
125])
126
127
128# ===========================================================================
129# Main
130# ===========================================================================
131
132async def main():
133 input_queue = Queue()
134
135 print("[startup] Initializing inference pipeline...", flush=True)
136
137 inference_pipeline = None
138 try:
139 inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
140 inference_pipeline.initialize()
141 except Exception as exc:
142 import traceback
143 print(f"\n[FATAL] Inference pipeline failed to initialize: {exc}", flush=True)
144 traceback.print_exc()
145 if inference_pipeline is not None:
146 inference_pipeline.destroy()
147 return
148 print("[startup] Inference pipeline ready.\n", flush=True)
149
150 procs, agent_specs = [], []
151 try:
152 # Create the single agent
153 agent_spec = {
154 "config": AgentConfig(
155 agent_id="planner_agent",
156 name="Experiment Planner",
157 role=(
158 "You are an experiment planner for Monte Carlo "
159 "convergence studies. Propose plans via "
160 "propose_experiment."
161 ),
162 inference_queue=input_queue,
163 ),
164 "tool_registry": registry,
165 "shutdown_event": Event(),
166 "reply_queue": Queue(),
167 }
168 agent_specs = [agent_spec]
169
170 # Launch agent as a Dragon Process
171 p = Process(target=create_sub_agent, kwargs=agent_spec)
172 p.start()
173 procs.append(p)
174
175 # Wait for agent to publish its input queue
176 agent_input_queue = agent_spec["reply_queue"].get()
177 agent_spec["config"].input_queue = agent_input_queue
178 print("[startup] Agent 'planner_agent' ready.", flush=True)
179
180 # Create orchestrator (even for a single agent, it manages
181 # the DDict, Batch dispatch, and result collection)
182 orchestrator = DAGOrchestrator(
183 config=OrchestratorConfig(
184 agents=[agent_spec["config"]],
185 poll_interval=0.5,
186 poll_timeout=120.0,
187 ),
188 pipeline=pipeline,
189 )
190
191 user_input = (
192 "Propose a Monte Carlo experiment to estimate π using sample "
193 "sizes of 1000, 10000, and 100000."
194 )
195
196 batch = Batch()
197 try:
198 print("=" * 60, flush=True)
199 print("Dragon AI — 01 Single Agent", flush=True)
200 print("=" * 60, flush=True)
201 print(f"Request: {user_input}\n", flush=True)
202
203 result = orchestrator.run(
204 user_input=user_input,
205 batch=batch,
206 )
207
208 print("\n" + "=" * 60, flush=True)
209 print("FINAL RESULT", flush=True)
210 print("=" * 60, flush=True)
211 print(result, flush=True)
212
213 except Exception as exc:
214 import traceback
215 print(f"\n[error] Pipeline failed: {exc}", flush=True)
216 traceback.print_exc()
217 finally:
218 orchestrator.destroy()
219 batch.join()
220 batch.destroy()
221
222 except Exception as exc:
223 import traceback
224 print(f"\n[error] Fatal: {exc}", flush=True)
225 traceback.print_exc()
226 finally:
227 for spec in agent_specs:
228 try:
229 spec["shutdown_event"].set()
230 except Exception:
231 pass
232 for p in procs:
233 try:
234 p.join()
235 except Exception:
236 pass
237 print("\n[teardown] Agent stopped.", flush=True)
238 try:
239 inference_pipeline.destroy()
240 except Exception:
241 pass
242 print("[teardown] Inference pipeline stopped.", flush=True)
243
244
245if __name__ == "__main__":
246 mp.set_start_method("dragon")
247 asyncio.run(main())
Key Concepts
Agent Lifecycle:
AgentConfigdefines the agent’s identity, tools, and inference backendPipelineNoderegisters the agent in the workflow DAGDAGOrchestratorspawns the agent as a Dragon processAgent listens on its input queue for
DispatchHeadermessagesFor each task, agent processes via the LLM + tool loop
Results are written to the Scoreboard DDict
Agent signals completion via an Event
Tool Registration:
Use registry.register(fn) to register a simple callable. The framework
extracts parameter annotations and generates a JSON schema for the LLM.
Inference Setup:
Pass an inference queue to the agent so it can send requests to the vLLM backend. Multiple agents in the same Dragon runtime share this queue, allowing them to submit inference requests to the same GPU worker(s). The backend can run on the same machine or a different node.
Installation
After installing Dragon, ensure you have:
pip install torch torchvision torchaudio
pip install vllm
System Description
For a minimal run: 1 node, 1 GPU (for vLLM), any CPU available
How to Run
Step 1: Edit the model path
Open 01_single_agent.py and set MODEL_NAME to your vLLM-compatible
checkpoint (e.g., meta-llama/Llama-2-7b-hf).
Step 2: Set HuggingFace token (if using gated models)
export HF_TOKEN="hf_your_token_here"
Step 3: Run
dragon 01_single_agent.py
Example output:
$ dragon 01_single_agent.py
Agent 'planner' started
Task 'Estimate the convergence rate' received
Calling LLM with tool: estimate_convergence_rate
Result: {'convergence_rate': 0.95, 'confidence': 0.87}
Agent 'planner' completed
Next Steps
Once this example works, proceed to:
02 — Multi-Agent DAG (multi-agent orchestration, function nodes, registration styles)
03 — Human-in-the-Loop (approval gates before tool execution)
04 — Memory Management (history strategies, dedicated summarizer LLM)
05 — MCP Tools (integrate remote MCP servers)
- 06 — Full Pipeline (all features combined with tracing)
# … setup code … return inference_queue
- def main():
set_start_method(“dragon”)
# 1. Define tools registry = ToolRegistry()
@registry.tool def lookup_value(key: str) -> dict:
“””Look up a value in the configuration store.
- Args:
key: The configuration key to look up.
- Returns:
A dict with the key and its corresponding value.
“”” store = {“learning_rate”: “0.001”, “batch_size”: “64”, “epochs”: “100”} value = store.get(key, “not found”) return {“key”: key, “value”: value}
# 2. Configure the agent inference_queue = setup_inference_queue()
- agent_config = AgentConfig(
agent_id=”assistant”, name=”Config Assistant”, role=”You are a helpful assistant that looks up configuration values. “
“Use the lookup_value tool to find requested settings.”,
inference_queue=inference_queue,
)
# 3. Build a single-node pipeline pipeline = Pipeline(nodes=[
- PipelineNode(
agent_id=”assistant”, task_description=”Look up the requested configuration values.”,
),
])
# 4. Run the orchestrator orch_config = OrchestratorConfig(agents=[agent_config]) orchestrator = DAGOrchestrator(config=orch_config, pipeline=pipeline)
- try:
batch = Batch() result = orchestrator.run(
user_input=”What are the learning_rate and batch_size settings?”, batch=batch,
) print(“Agent result:”, result)
- finally:
orchestrator.destroy()
- if __name__ == “__main__”:
main()
How to run
dragon basic_agent_pipeline.py