MCP Server Integration
Building on Examples 01–04, this example demonstrates connecting a Dragon agent to remote Model Context Protocol (MCP) servers. The agent can discover and invoke tools exposed by any MCP-compatible server (e.g., data stores, compute clusters, external APIs) alongside locally registered tools. Tool calls are automatically routed based on a namespace prefix.
Prerequisites: Read 04_memory.py first.
What you’ll learn:
How to connect to one or more MCP servers via HTTP/SSE
How to configure authentication tokens and timeouts
How tool names are scoped (local tools vs. MCP tools via
alias__tool_name)How the
ToolDispatcherroutes calls to the correct serverHow to mix local and remote tools in the same agent
Architecture:
The agent’s ToolDispatcher maintains connections to multiple MCP servers
and routes tool calls transparently: calls without a prefix go to local tools;
calls like datastore__query route to the MCP server aliased datastore.
Main Code
Below is the complete example:
1"""05 — MCP Server Integration.
2
3**Prerequisites:** Read ``02_multi_agent_dag.py`` first.
4
5**What's new:**
6
7* **MCPServerConfig** — connect an agent to a remote MCP tool server
8* **Scoped tool names** — MCP tools appear as ``alias__tool_name``
9 (e.g. ``jupyter__create_notebook``) to avoid collisions with local tools
10* **Mixed local + MCP tools** — the reporter agent has both local tools
11 (``format_results_table``) and remote MCP tools (Jupyter operations)
12* **Auto-discovery** — MCP tool schemas are fetched at connect time and
13 merged into the LLM's tool list
14* **Token from file** — bearer token read from ``token.txt``, never on CLI
15* **Graceful fallback** — if no MCP URL is provided, reporter uses
16 local tools only
17
18Architecture::
19
20 planner_agent ──► runner_agent ──► analyzer_agent ──┐
21 │ │ ├──► reporter_agent
22 └────────────────┴─────────────────────────────┘ │
23 MCP: jupyter__*
24
25Usage::
26
27 # Without MCP (local tools only):
28 dragon 05_mcp_tools.py
29
30 # With MCP (Jupyter notebook creation):
31 echo '<your-token>' > token.txt
32 dragon 05_mcp_tools.py <mcp_server_url>
33"""
34
35import asyncio
36import os
37import sys
38from typing import Any
39
40import dragon
41import multiprocessing as mp
42
43from dragon.ai.agent.core import create_sub_agent
44from dragon.ai.agent.config import (
45 AgentConfig,
46 MCPServerConfig, # ← NEW
47 OrchestratorConfig,
48 Pipeline,
49 PipelineNode,
50)
51from dragon.ai.agent.tools import ToolRegistry
52from dragon.ai.agent.orchestrator import DAGOrchestrator
53from dragon.infrastructure.policy import Policy
54from dragon.native.event import Event
55from dragon.native.machine import Node, System
56from dragon.native.process import Process
57from dragon.native.queue import Queue
58from dragon.workflows.batch import Batch
59
60from dragon.ai.inference.config import (
61 BatchingConfig,
62 HardwareConfig,
63 InferenceConfig,
64 ModelConfig,
65)
66from dragon.ai.inference.inference_utils import Inference
67
68from tools import (
69 propose_experiment,
70 launch_experiment,
71 check_progress,
72 collect_results,
73 analyze_convergence,
74 format_results_table,
75)
76from tools.runner import cleanup_experiment_state
77
78
79# ===========================================================================
80# User-configurable constants
81# ===========================================================================
82
83MODEL_NAME = "/path/to/your/model"
84HF_TOKEN = ""
85
86
87# ===========================================================================
88# Inference Pipeline Configuration
89# ===========================================================================
90
91INFERENCE_CONFIG = InferenceConfig(
92 model=ModelConfig(
93 model_name=MODEL_NAME,
94 hf_token=HF_TOKEN,
95 tp_size=2,
96 max_tokens=8192,
97 max_model_len=32768,
98 ),
99 hardware=HardwareConfig(
100 num_nodes=1,
101 num_gpus=2,
102 num_inf_workers_per_cpu=1,
103 ),
104 batching=BatchingConfig(
105 batch_wait_seconds=0.1,
106 max_batch_size=32,
107 ),
108)
109
110
111# ===========================================================================
112# Tool registries
113# ===========================================================================
114
115planner_registry = ToolRegistry()
116planner_registry.register(propose_experiment)
117
118runner_registry = ToolRegistry()
119runner_registry.register(launch_experiment)
120runner_registry.register(check_progress)
121runner_registry.register(collect_results)
122
123analyzer_registry = ToolRegistry()
124analyzer_registry.register(analyze_convergence)
125
126reporter_registry = ToolRegistry()
127reporter_registry.register(format_results_table)
128
129
130# ===========================================================================
131# DAG pipeline
132# ===========================================================================
133
134pipeline = Pipeline(nodes=[
135 PipelineNode(
136 agent_id="planner_agent",
137 task_description=(
138 "You are a scientific experiment planner. The user wants to study "
139 "Monte Carlo convergence for estimating π.\n\n"
140 "Propose an experiment plan by calling propose_experiment with:\n"
141 " - description, sample_sizes, convergence_target, methodology\n\n"
142 "Report the approved plan verbatim as your final answer."
143 ),
144 depends_on=[],
145 ),
146 PipelineNode(
147 agent_id="runner_agent",
148 task_description=(
149 "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
150 "Tools:\n"
151 " 1. launch_experiment(sample_sizes, seeds) — launches ALL "
152 "simulations in parallel.\n"
153 " 2. check_progress() — shows done/running/pending status.\n"
154 " 3. collect_results() — call ONLY when all_done=true.\n\n"
155 "STRICT workflow:\n"
156 " 1. Call launch_experiment.\n"
157 " 2. Call check_progress until all_done=true.\n"
158 " 3. Call collect_results — REQUIRED before final answer.\n"
159 " 4. Report collect_results output verbatim."
160 ),
161 depends_on=["planner_agent"],
162 ),
163 PipelineNode(
164 agent_id="analyzer_agent",
165 task_description=(
166 "Call analyze_convergence with a list of dicts (keys: "
167 "'n_samples', 'absolute_error'). Report all metrics verbatim."
168 ),
169 depends_on=["runner_agent"],
170 ),
171 PipelineNode(
172 agent_id="reporter_agent",
173 task_description=(
174 "Write a structured report with the experiment results.\n\n"
175 "STRICT workflow — follow every step IN ORDER:\n\n"
176 " STEP 1. Call format_results_table to get a Markdown table.\n\n"
177 " STEP 2. Check your available tools. If you have tools that\n"
178 " can manage Jupyter kernels and notebook files, do\n"
179 " BOTH sub-steps:\n"
180 " a. Create a Jupyter kernel.\n"
181 " b. Create a notebook file named\n"
182 " 'monte_carlo_convergence_report' with the\n"
183 " COMPLETE report as code_content.\n"
184 " If no such tools are available, skip Step 2.\n\n"
185 " STEP 3. Give your final answer with the SAME complete\n"
186 " report text. COPY-PASTE the actual table.\n\n"
187 "Never invent data — use only data from upstream agents."
188 ),
189 depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
190 ),
191])
192
193
194# ===========================================================================
195# Helpers
196# ===========================================================================
197
198def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
199 mcp_servers=None,
200 max_tool_call_iterations=20):
201 """Build kwargs for create_sub_agent().
202
203 NEW in this example: ``mcp_servers`` — list of MCPServerConfig objects.
204 The agent connects to each MCP server in its own process at startup.
205 """
206 return {
207 "config": AgentConfig(
208 agent_id=agent_id, name=name, role=role,
209 inference_queue=inference_queue,
210 max_tool_call_iterations=max_tool_call_iterations,
211 ),
212 "tool_registry": registry,
213 "mcp_servers": mcp_servers, # ← NEW: passed to create_sub_agent
214 "shutdown_event": Event(),
215 "reply_queue": Queue(),
216 }
217
218
219def _start_agents(specs, policies):
220 procs = []
221 for spec, policy in zip(specs, policies):
222 p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
223 p.start()
224 procs.append(p)
225 queues = {}
226 for spec in specs:
227 aid = spec["config"].agent_id
228 queues[aid] = spec["reply_queue"].get()
229 print(f"[startup] Agent '{aid}' ready.", flush=True)
230 return procs, queues
231
232
233# ===========================================================================
234# Main
235# ===========================================================================
236
237async def main():
238 input_queue = Queue()
239
240 print("[startup] Initializing inference pipeline...", flush=True)
241
242 inference_pipeline = None
243 try:
244 inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
245 inference_pipeline.initialize()
246 except Exception as exc:
247 import traceback
248 print(f"\n[FATAL] Inference pipeline failed to initialize: {exc}", flush=True)
249 traceback.print_exc()
250 if inference_pipeline is not None:
251 inference_pipeline.destroy()
252 return
253 print("[startup] Inference pipeline ready.\n", flush=True)
254
255 my_alloc = System()
256 node_list = my_alloc.nodes
257 compute_host = (
258 Node(node_list[1]).hostname if len(node_list) > 1
259 else Node(node_list[0]).hostname
260 )
261 compute_policy = Policy(
262 placement=Policy.Placement.HOST_NAME,
263 host_name=compute_host,
264 )
265
266 # -------------------------------------------------------------------
267 # ← NEW: MCP server setup for the reporter agent
268 #
269 # Pass the MCP URL as a command-line argument:
270 # dragon 05_mcp_tools.py <mcp_url>
271 #
272 # The bearer token is read from token.txt in CWD.
273 # -------------------------------------------------------------------
274 mcp_url = sys.argv[1] if len(sys.argv) >= 2 else None
275
276 TOKEN_FILE = os.path.join(os.getcwd(), "token.txt")
277 mcp_token = None
278 if mcp_url:
279 try:
280 with open(TOKEN_FILE, "rb") as f:
281 raw = f.read()
282 mcp_token = raw.decode("utf-8").strip().strip("\ufeff")
283 print(f"[startup] MCP token loaded from {TOKEN_FILE}", flush=True)
284 except FileNotFoundError:
285 print(
286 f"[startup] WARNING: {TOKEN_FILE} not found — "
287 "MCP connection will have no auth token.",
288 flush=True,
289 )
290
291 reporter_mcp_servers = None
292 if mcp_url:
293 reporter_mcp_servers = [
294 MCPServerConfig(url=mcp_url, alias="jupyter", token=mcp_token),
295 ]
296 print(f"[startup] Reporter will connect to MCP: {mcp_url}", flush=True)
297 else:
298 print(
299 "[startup] No MCP URL provided — reporter uses local tools only.\n"
300 " To enable MCP, run:\n"
301 " echo '<token>' > token.txt\n"
302 " dragon 05_mcp_tools.py <mcp_url>",
303 flush=True,
304 )
305
306 procs, agent_specs = [], []
307 try:
308 agent_specs = [
309 _make_agent_kwargs(
310 "planner_agent", "Experiment Planner",
311 "You are an experiment planner for Monte Carlo convergence "
312 "studies on an HPC cluster.",
313 planner_registry, input_queue,
314 ),
315 _make_agent_kwargs(
316 "runner_agent", "Parallel Simulation Runner",
317 "You manage parallel Monte Carlo simulations.\n"
318 "You MUST call all three tools in order: "
319 "launch_experiment → check_progress → collect_results.\n"
320 "NEVER give a final answer without calling collect_results first.",
321 runner_registry, input_queue,
322 max_tool_call_iterations=60,
323 ),
324 _make_agent_kwargs(
325 "analyzer_agent", "Convergence Analyzer",
326 "Analyse convergence. Call analyze_convergence with "
327 "n_samples + absolute_error dicts. Report verbatim.",
328 analyzer_registry, input_queue,
329 ),
330 _make_agent_kwargs(
331 "reporter_agent", "Report Writer",
332 "Write a structured report with Markdown tables. "
333 "If you have Jupyter tools, also create a notebook. "
334 "Always include tool output verbatim.",
335 reporter_registry, input_queue,
336 # ← NEW: attach MCP servers to this agent
337 mcp_servers=reporter_mcp_servers,
338 ),
339 ]
340 policies = [None, compute_policy, None, None]
341
342 procs, queues = _start_agents(agent_specs, policies)
343 for spec in agent_specs:
344 spec["config"].input_queue = queues[spec["config"].agent_id]
345
346 orchestrator = DAGOrchestrator(
347 config=OrchestratorConfig(
348 agents=[s["config"] for s in agent_specs],
349 poll_interval=0.5,
350 poll_timeout=14400.0,
351 ),
352 pipeline=pipeline,
353 )
354
355 user_input = (
356 "Design and run a Monte Carlo convergence experiment for π. "
357 "Monitor progress and report which nodes ran what."
358 + (
359 " Save the final report to a Jupyter notebook on the cluster."
360 if mcp_url else ""
361 )
362 )
363
364 batch = Batch()
365 try:
366 print("=" * 60, flush=True)
367 print("Dragon AI — 05 MCP Tools Integration", flush=True)
368 print("=" * 60, flush=True)
369 print(f"Request: {user_input}\n", flush=True)
370
371 result = orchestrator.run(
372 user_input=user_input,
373 batch=batch,
374 )
375
376 print("\n" + "=" * 60, flush=True)
377 print("FINAL RESULT", flush=True)
378 print("=" * 60, flush=True)
379 print(result, flush=True)
380
381 except Exception as exc:
382 import traceback
383 print(f"\n[error] Pipeline failed: {exc}", flush=True)
384 traceback.print_exc()
385 finally:
386 orchestrator.destroy()
387 batch.join()
388 batch.destroy()
389
390 except Exception as exc:
391 import traceback
392 print(f"\n[error] Fatal: {exc}", flush=True)
393 traceback.print_exc()
394 finally:
395 cleanup_experiment_state()
396 for spec in agent_specs:
397 try:
398 spec["shutdown_event"].set()
399 except Exception:
400 pass
401 for p in procs:
402 try:
403 p.join()
404 except Exception:
405 pass
406 print("\n[teardown] All agents stopped.", flush=True)
407 try:
408 inference_pipeline.destroy()
409 except Exception:
410 pass
411 print("[teardown] Inference pipeline stopped.", flush=True)
412
413
414if __name__ == "__main__":
415 mp.set_start_method("dragon")
416 asyncio.run(main())
Key Concepts
MCP Server Configuration:
Define one or more MCP servers in MCPServerConfig:
mcp_servers = [
MCPServerConfig(
url="http://datastore-service:8080/sse",
alias="datastore",
token="my-secret-token",
timeout=10.0,
max_retries=3,
),
MCPServerConfig(
url="http://compute-cluster:9090/sse",
alias="compute",
),
]
Tool Naming and Routing:
Local tools: registered via
registry.register()or@registry.tool; accessible astool_nameMCP tools: auto-discovered from servers; accessible as
alias__tool_name(e.g.,datastore__query,compute__submit_job)
The ToolDispatcher parses the prefix and routes the call to the correct
server (or local registry).
Tool Discovery:
When the agent connects to an MCP server, it auto-discovers all tools exposed by that server. No manual schema registration needed.
Installation
See Example 01 (same dependencies). For MCP servers, install compatible
implementations (e.g., mcp-server-filesystem, mcp-server-postgres, etc.).
System Description
Tested on HPE Cray EX:
1 compute node with 1–2 GPUs
MCP servers can run on same or different nodes (TCP/HTTP connectivity required)
How to Run
Step 1: Start MCP servers (in separate terminals)
Example with filesystem MCP server:
python -m mcp_server_filesystem /data/experiments
Example with custom data service:
python my_datastore_mcp_server.py --port 8080
Step 2: Edit MCP URLs in the example
Open 05_mcp_tools.py and set MCPServerConfig URLs to match your servers.
Step 3: Run the agent
dragon 05_mcp_tools.py
Example output:
$ dragon 05_mcp_tools.py
Agent 'researcher' started
MCP server 'datastore' connected (6 tools available)
MCP server 'compute' connected (4 tools available)
Task: 'Find all experiments and analyze trends'
Calling: datastore__query(filter='last_week')
→ Result: 42 experiments found
Calling: compute__analyze(experiment_ids=[...])
→ Result: Trends calculated
Calling: summarize_results(trends=...)
→ Result: Final summary written to report.json
Agent completed
Next Steps
06 — Full Pipeline (all features combined: DAG + HITL + Memory + MCP + Tracing)