MCP Server Integration

Building on Examples 01–04, this example demonstrates connecting a Dragon agent to remote Model Context Protocol (MCP) servers. The agent can discover and invoke tools exposed by any MCP-compatible server (e.g., data stores, compute clusters, external APIs) alongside locally registered tools. Tool calls are automatically routed based on a namespace prefix.

Prerequisites: Read 04_memory.py first.

What you’ll learn:

  • How to connect to one or more MCP servers via HTTP/SSE

  • How to configure authentication tokens and timeouts

  • How tool names are scoped (local tools vs. MCP tools via alias__tool_name)

  • How the ToolDispatcher routes calls to the correct server

  • How to mix local and remote tools in the same agent

Architecture:

The agent’s ToolDispatcher maintains connections to multiple MCP servers and routes tool calls transparently: calls without a prefix go to local tools; calls like datastore__query route to the MCP server aliased datastore.

Main Code

Below is the complete example:

Listing 31 05_mcp_tools.py: Agent with MCP server tools
  1"""05 — MCP Server Integration.
  2
  3**Prerequisites:** Read ``02_multi_agent_dag.py`` first.
  4
  5**What's new:**
  6
  7* **MCPServerConfig** — connect an agent to a remote MCP tool server
  8* **Scoped tool names** — MCP tools appear as ``alias__tool_name``
  9  (e.g. ``jupyter__create_notebook``) to avoid collisions with local tools
 10* **Mixed local + MCP tools** — the reporter agent has both local tools
 11  (``format_results_table``) and remote MCP tools (Jupyter operations)
 12* **Auto-discovery** — MCP tool schemas are fetched at connect time and
 13  merged into the LLM's tool list
 14* **Token from file** — bearer token read from ``token.txt``, never on CLI
 15* **Graceful fallback** — if no MCP URL is provided, reporter uses
 16  local tools only
 17
 18Architecture::
 19
 20    planner_agent ──► runner_agent ──► analyzer_agent ──┐
 21         │                │                             ├──► reporter_agent
 22         └────────────────┴─────────────────────────────┘      │
 23                                                         MCP: jupyter__*
 24
 25Usage::
 26
 27    # Without MCP (local tools only):
 28    dragon 05_mcp_tools.py
 29
 30    # With MCP (Jupyter notebook creation):
 31    echo '<your-token>' > token.txt
 32    dragon 05_mcp_tools.py <mcp_server_url>
 33"""
 34
 35import asyncio
 36import os
 37import sys
 38from typing import Any
 39
 40import dragon
 41import multiprocessing as mp
 42
 43from dragon.ai.agent.core import create_sub_agent
 44from dragon.ai.agent.config import (
 45    AgentConfig,
 46    MCPServerConfig,       # ← NEW
 47    OrchestratorConfig,
 48    Pipeline,
 49    PipelineNode,
 50)
 51from dragon.ai.agent.tools import ToolRegistry
 52from dragon.ai.agent.orchestrator import DAGOrchestrator
 53from dragon.infrastructure.policy import Policy
 54from dragon.native.event import Event
 55from dragon.native.machine import Node, System
 56from dragon.native.process import Process
 57from dragon.native.queue import Queue
 58from dragon.workflows.batch import Batch
 59
 60from dragon.ai.inference.config import (
 61    BatchingConfig,
 62    HardwareConfig,
 63    InferenceConfig,
 64    ModelConfig,
 65)
 66from dragon.ai.inference.inference_utils import Inference
 67
 68from tools import (
 69    propose_experiment,
 70    launch_experiment,
 71    check_progress,
 72    collect_results,
 73    analyze_convergence,
 74    format_results_table,
 75)
 76from tools.runner import cleanup_experiment_state
 77
 78
 79# ===========================================================================
 80# User-configurable constants
 81# ===========================================================================
 82
 83MODEL_NAME = "/path/to/your/model"
 84HF_TOKEN = ""
 85
 86
 87# ===========================================================================
 88# Inference Pipeline Configuration
 89# ===========================================================================
 90
 91INFERENCE_CONFIG = InferenceConfig(
 92    model=ModelConfig(
 93        model_name=MODEL_NAME,
 94        hf_token=HF_TOKEN,
 95        tp_size=2,
 96        max_tokens=8192,
 97        max_model_len=32768,
 98    ),
 99    hardware=HardwareConfig(
100        num_nodes=1,
101        num_gpus=2,
102        num_inf_workers_per_cpu=1,
103    ),
104    batching=BatchingConfig(
105        batch_wait_seconds=0.1,
106        max_batch_size=32,
107    ),
108)
109
110
111# ===========================================================================
112# Tool registries
113# ===========================================================================
114
115planner_registry = ToolRegistry()
116planner_registry.register(propose_experiment)
117
118runner_registry = ToolRegistry()
119runner_registry.register(launch_experiment)
120runner_registry.register(check_progress)
121runner_registry.register(collect_results)
122
123analyzer_registry = ToolRegistry()
124analyzer_registry.register(analyze_convergence)
125
126reporter_registry = ToolRegistry()
127reporter_registry.register(format_results_table)
128
129
130# ===========================================================================
131# DAG pipeline
132# ===========================================================================
133
134pipeline = Pipeline(nodes=[
135    PipelineNode(
136        agent_id="planner_agent",
137        task_description=(
138            "You are a scientific experiment planner.  The user wants to study "
139            "Monte Carlo convergence for estimating π.\n\n"
140            "Propose an experiment plan by calling propose_experiment with:\n"
141            "  - description, sample_sizes, convergence_target, methodology\n\n"
142            "Report the approved plan verbatim as your final answer."
143        ),
144        depends_on=[],
145    ),
146    PipelineNode(
147        agent_id="runner_agent",
148        task_description=(
149            "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
150            "Tools:\n"
151            "  1. launch_experiment(sample_sizes, seeds) — launches ALL "
152            "simulations in parallel.\n"
153            "  2. check_progress() — shows done/running/pending status.\n"
154            "  3. collect_results() — call ONLY when all_done=true.\n\n"
155            "STRICT workflow:\n"
156            "  1. Call launch_experiment.\n"
157            "  2. Call check_progress until all_done=true.\n"
158            "  3. Call collect_results — REQUIRED before final answer.\n"
159            "  4. Report collect_results output verbatim."
160        ),
161        depends_on=["planner_agent"],
162    ),
163    PipelineNode(
164        agent_id="analyzer_agent",
165        task_description=(
166            "Call analyze_convergence with a list of dicts (keys: "
167            "'n_samples', 'absolute_error').  Report all metrics verbatim."
168        ),
169        depends_on=["runner_agent"],
170    ),
171    PipelineNode(
172        agent_id="reporter_agent",
173        task_description=(
174            "Write a structured report with the experiment results.\n\n"
175            "STRICT workflow — follow every step IN ORDER:\n\n"
176            "  STEP 1.  Call format_results_table to get a Markdown table.\n\n"
177            "  STEP 2.  Check your available tools.  If you have tools that\n"
178            "           can manage Jupyter kernels and notebook files, do\n"
179            "           BOTH sub-steps:\n"
180            "             a. Create a Jupyter kernel.\n"
181            "             b. Create a notebook file named\n"
182            "                'monte_carlo_convergence_report' with the\n"
183            "                COMPLETE report as code_content.\n"
184            "           If no such tools are available, skip Step 2.\n\n"
185            "  STEP 3.  Give your final answer with the SAME complete\n"
186            "           report text.  COPY-PASTE the actual table.\n\n"
187            "Never invent data — use only data from upstream agents."
188        ),
189        depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
190    ),
191])
192
193
194# ===========================================================================
195# Helpers
196# ===========================================================================
197
198def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
199                       mcp_servers=None,
200                       max_tool_call_iterations=20):
201    """Build kwargs for create_sub_agent().
202
203    NEW in this example: ``mcp_servers`` — list of MCPServerConfig objects.
204    The agent connects to each MCP server in its own process at startup.
205    """
206    return {
207        "config": AgentConfig(
208            agent_id=agent_id, name=name, role=role,
209            inference_queue=inference_queue,
210            max_tool_call_iterations=max_tool_call_iterations,
211        ),
212        "tool_registry": registry,
213        "mcp_servers": mcp_servers,    # ← NEW: passed to create_sub_agent
214        "shutdown_event": Event(),
215        "reply_queue": Queue(),
216    }
217
218
219def _start_agents(specs, policies):
220    procs = []
221    for spec, policy in zip(specs, policies):
222        p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
223        p.start()
224        procs.append(p)
225    queues = {}
226    for spec in specs:
227        aid = spec["config"].agent_id
228        queues[aid] = spec["reply_queue"].get()
229        print(f"[startup] Agent '{aid}' ready.", flush=True)
230    return procs, queues
231
232
233# ===========================================================================
234# Main
235# ===========================================================================
236
237async def main():
238    input_queue = Queue()
239
240    print("[startup] Initializing inference pipeline...", flush=True)
241
242    inference_pipeline = None
243    try:
244        inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
245        inference_pipeline.initialize()
246    except Exception as exc:
247        import traceback
248        print(f"\n[FATAL] Inference pipeline failed to initialize: {exc}", flush=True)
249        traceback.print_exc()
250        if inference_pipeline is not None:
251            inference_pipeline.destroy()
252        return
253    print("[startup] Inference pipeline ready.\n", flush=True)
254
255    my_alloc = System()
256    node_list = my_alloc.nodes
257    compute_host = (
258        Node(node_list[1]).hostname if len(node_list) > 1
259        else Node(node_list[0]).hostname
260    )
261    compute_policy = Policy(
262        placement=Policy.Placement.HOST_NAME,
263        host_name=compute_host,
264    )
265
266    # -------------------------------------------------------------------
267    # ← NEW: MCP server setup for the reporter agent
268    #
269    # Pass the MCP URL as a command-line argument:
270    #   dragon 05_mcp_tools.py <mcp_url>
271    #
272    # The bearer token is read from token.txt in CWD.
273    # -------------------------------------------------------------------
274    mcp_url = sys.argv[1] if len(sys.argv) >= 2 else None
275
276    TOKEN_FILE = os.path.join(os.getcwd(), "token.txt")
277    mcp_token = None
278    if mcp_url:
279        try:
280            with open(TOKEN_FILE, "rb") as f:
281                raw = f.read()
282            mcp_token = raw.decode("utf-8").strip().strip("\ufeff")
283            print(f"[startup] MCP token loaded from {TOKEN_FILE}", flush=True)
284        except FileNotFoundError:
285            print(
286                f"[startup] WARNING: {TOKEN_FILE} not found — "
287                "MCP connection will have no auth token.",
288                flush=True,
289            )
290
291    reporter_mcp_servers = None
292    if mcp_url:
293        reporter_mcp_servers = [
294            MCPServerConfig(url=mcp_url, alias="jupyter", token=mcp_token),
295        ]
296        print(f"[startup] Reporter will connect to MCP: {mcp_url}", flush=True)
297    else:
298        print(
299            "[startup] No MCP URL provided — reporter uses local tools only.\n"
300            "  To enable MCP, run:\n"
301            "    echo '<token>' > token.txt\n"
302            "    dragon 05_mcp_tools.py <mcp_url>",
303            flush=True,
304        )
305
306    procs, agent_specs = [], []
307    try:
308        agent_specs = [
309            _make_agent_kwargs(
310                "planner_agent", "Experiment Planner",
311                "You are an experiment planner for Monte Carlo convergence "
312                "studies on an HPC cluster.",
313                planner_registry, input_queue,
314            ),
315            _make_agent_kwargs(
316                "runner_agent", "Parallel Simulation Runner",
317                "You manage parallel Monte Carlo simulations.\n"
318                "You MUST call all three tools in order: "
319                "launch_experiment → check_progress → collect_results.\n"
320                "NEVER give a final answer without calling collect_results first.",
321                runner_registry, input_queue,
322                max_tool_call_iterations=60,
323            ),
324            _make_agent_kwargs(
325                "analyzer_agent", "Convergence Analyzer",
326                "Analyse convergence.  Call analyze_convergence with "
327                "n_samples + absolute_error dicts.  Report verbatim.",
328                analyzer_registry, input_queue,
329            ),
330            _make_agent_kwargs(
331                "reporter_agent", "Report Writer",
332                "Write a structured report with Markdown tables.  "
333                "If you have Jupyter tools, also create a notebook.  "
334                "Always include tool output verbatim.",
335                reporter_registry, input_queue,
336                # ← NEW: attach MCP servers to this agent
337                mcp_servers=reporter_mcp_servers,
338            ),
339        ]
340        policies = [None, compute_policy, None, None]
341
342        procs, queues = _start_agents(agent_specs, policies)
343        for spec in agent_specs:
344            spec["config"].input_queue = queues[spec["config"].agent_id]
345
346        orchestrator = DAGOrchestrator(
347            config=OrchestratorConfig(
348                agents=[s["config"] for s in agent_specs],
349                poll_interval=0.5,
350                poll_timeout=14400.0,
351            ),
352            pipeline=pipeline,
353        )
354
355        user_input = (
356            "Design and run a Monte Carlo convergence experiment for π.  "
357            "Monitor progress and report which nodes ran what."
358            + (
359                "  Save the final report to a Jupyter notebook on the cluster."
360                if mcp_url else ""
361            )
362        )
363
364        batch = Batch()
365        try:
366            print("=" * 60, flush=True)
367            print("Dragon AI — 05 MCP Tools Integration", flush=True)
368            print("=" * 60, flush=True)
369            print(f"Request: {user_input}\n", flush=True)
370
371            result = orchestrator.run(
372                user_input=user_input,
373                batch=batch,
374            )
375
376            print("\n" + "=" * 60, flush=True)
377            print("FINAL RESULT", flush=True)
378            print("=" * 60, flush=True)
379            print(result, flush=True)
380
381        except Exception as exc:
382            import traceback
383            print(f"\n[error] Pipeline failed: {exc}", flush=True)
384            traceback.print_exc()
385        finally:
386            orchestrator.destroy()
387            batch.join()
388            batch.destroy()
389
390    except Exception as exc:
391        import traceback
392        print(f"\n[error] Fatal: {exc}", flush=True)
393        traceback.print_exc()
394    finally:
395        cleanup_experiment_state()
396        for spec in agent_specs:
397            try:
398                spec["shutdown_event"].set()
399            except Exception:
400                pass
401        for p in procs:
402            try:
403                p.join()
404            except Exception:
405                pass
406        print("\n[teardown] All agents stopped.", flush=True)
407        try:
408            inference_pipeline.destroy()
409        except Exception:
410            pass
411        print("[teardown] Inference pipeline stopped.", flush=True)
412
413
414if __name__ == "__main__":
415    mp.set_start_method("dragon")
416    asyncio.run(main())

Key Concepts

MCP Server Configuration:

Define one or more MCP servers in MCPServerConfig:

mcp_servers = [
    MCPServerConfig(
        url="http://datastore-service:8080/sse",
        alias="datastore",
        token="my-secret-token",
        timeout=10.0,
        max_retries=3,
    ),
    MCPServerConfig(
        url="http://compute-cluster:9090/sse",
        alias="compute",
    ),
]

Tool Naming and Routing:

  • Local tools: registered via registry.register() or @registry.tool; accessible as tool_name

  • MCP tools: auto-discovered from servers; accessible as alias__tool_name (e.g., datastore__query, compute__submit_job)

The ToolDispatcher parses the prefix and routes the call to the correct server (or local registry).

Tool Discovery:

When the agent connects to an MCP server, it auto-discovers all tools exposed by that server. No manual schema registration needed.

Installation

See Example 01 (same dependencies). For MCP servers, install compatible implementations (e.g., mcp-server-filesystem, mcp-server-postgres, etc.).

System Description

Tested on HPE Cray EX:

  • 1 compute node with 1–2 GPUs

  • MCP servers can run on same or different nodes (TCP/HTTP connectivity required)

How to Run

Step 1: Start MCP servers (in separate terminals)

Example with filesystem MCP server:

python -m mcp_server_filesystem /data/experiments

Example with custom data service:

python my_datastore_mcp_server.py --port 8080

Step 2: Edit MCP URLs in the example

Open 05_mcp_tools.py and set MCPServerConfig URLs to match your servers.

Step 3: Run the agent

dragon 05_mcp_tools.py

Example output:

$ dragon 05_mcp_tools.py
Agent 'researcher' started
MCP server 'datastore' connected (6 tools available)
MCP server 'compute' connected (4 tools available)
Task: 'Find all experiments and analyze trends'
Calling: datastore__query(filter='last_week')
→ Result: 42 experiments found
Calling: compute__analyze(experiment_ids=[...])
→ Result: Trends calculated
Calling: summarize_results(trends=...)
→ Result: Final summary written to report.json
Agent completed

Next Steps

  • 06 — Full Pipeline (all features combined: DAG + HITL + Memory + MCP + Tracing)