Full Production Pipeline
This is the capstone example, combining all features from the previous five examples into a production-ready reference implementation. It demonstrates: multi-agent DAG, HITL approval gates, memory strategies, MCP tool servers, and real-time tracing / observability. This is your template for building complex agentic systems on Dragon.
Prerequisites: Read Examples 01–05 first.
What you’ll learn:
How to integrate all five techniques into one system
How to enable tracing to log every LLM call, tool invocation, approval, and memory operation to a structured JSONL file
How to use the trace viewer to inspect and replay execution
Best practices for production agent deployment on HPC clusters
Architecture:
Node 0, GPUs [0,1]: Main inference service (large reasoning LLM)
Node 1, GPU [0]: Dedicated summarizer LLM
DAG: planner → runner → analyzer → reporter → save_report (function node)
All agents have memory management, some have HITL approval, one has MCP tools
Tracing enabled for full observability
Main Code
Below is the complete example:
1"""06 — Full Pipeline: DAG + HITL + Memory + MCP + Tracing.
2
3**Prerequisites:** Read ``01``–``05`` first.
4
5**What's new:**
6
7* **Tracing / observability** — ``tracing=True`` on ``OrchestratorConfig``
8 writes a per-run JSONL trace with every LLM call, tool call,
9 approval/rejection, memory operation, and DAG state transition
10* **Trace viewer CLI** — inspect the trace interactively after a run
11* **Everything combined** — this is the production-ready reference
12 combining all features from the previous examples:
13
14 - Multi-agent DAG + function node (from 02)
15 - HITL approval gates (from 03)
16 - Memory strategies + summarizer LLM (from 04)
17 - MCP tool server integration (from 05)
18
19Architecture::
20
21 Node 0 GPUs [0,1] → Main InferenceWorker (tp_size=2)
22 Node 1 GPU [0] → Summarizer Worker (tp_size=1)
23
24 planner_agent ──► runner_agent ──► analyzer_agent ──┐
25 [HITL] [HITL] [memory] ├──► reporter_agent ──► save_report(fn)
26 SLIDING_WINDOW SUMMARIZE │ [MCP: jupyter]
27 └──────────────┴───────────────────────────────┘
28
29 tracing → ./traces/<run-id>.jsonl
30
31Usage::
32
33 # Without MCP:
34 dragon 06_full_pipeline.py
35
36 # With MCP:
37 echo '<token>' > token.txt
38 dragon 06_full_pipeline.py <mcp_url>
39
40 # HITL approval client (in another terminal):
41 python -m dragon.ai.agent.hitl --tcp HOST:PORT
42
43 # Trace viewer (in another terminal):
44 python -m dragon.ai.agent.observability --tcp HOST:PORT -i
45"""
46
47import asyncio
48import json
49import os
50import sys
51import time
52from typing import Any
53
54import dragon
55import multiprocessing as mp
56
57from dragon.ai.agent.core import create_sub_agent
58from dragon.ai.agent.config import (
59 AgentConfig,
60 MCPServerConfig,
61 MemoryConfig,
62 MemoryStrategy,
63 OrchestratorConfig,
64 Pipeline,
65 PipelineNode,
66 TaskResult,
67 TaskStatus,
68 DISPATCH_ID_KEY,
69 RESULT_KEY,
70 STATUS_KEY,
71)
72from dragon.ai.agent.tools import ToolRegistry
73from dragon.ai.agent.orchestrator import DAGOrchestrator
74from dragon.data.ddict import DDict
75from dragon.infrastructure.policy import Policy
76from dragon.native.event import Event
77from dragon.native.machine import Node, System
78from dragon.native.process import Process
79from dragon.native.queue import Queue
80from dragon.workflows.batch import Batch
81
82from dragon.ai.inference.config import (
83 BatchingConfig,
84 HardwareConfig,
85 InferenceConfig,
86 ModelConfig,
87)
88from dragon.ai.inference.inference_utils import Inference
89
90from tools import (
91 propose_experiment,
92 launch_experiment,
93 check_progress,
94 collect_results,
95 analyze_convergence,
96 format_results_table,
97)
98from tools.runner import cleanup_experiment_state
99
100
101# ===========================================================================
102# User-configurable constants
103# ===========================================================================
104
105MODEL_NAME = "/path/to/your/model"
106HF_TOKEN = ""
107SUMMARIZER_MODEL_NAME = "/path/to/your/summarizer/model"
108
109
110# ===========================================================================
111# Inference — Main model (reasoning + tool-calling)
112# ===========================================================================
113
114INFERENCE_CONFIG = InferenceConfig(
115 model=ModelConfig(
116 model_name=MODEL_NAME,
117 hf_token=HF_TOKEN,
118 tp_size=2,
119 max_tokens=8192,
120 max_model_len=32768,
121 ),
122 hardware=HardwareConfig(
123 num_nodes=1,
124 num_gpus=2,
125 num_inf_workers_per_cpu=1,
126 ),
127 batching=BatchingConfig(
128 batch_wait_seconds=0.1,
129 max_batch_size=32,
130 ),
131)
132
133
134# ===========================================================================
135# Inference — Summarizer (separate GPU on node 1)
136# ===========================================================================
137
138SUMMARIZER_CONFIG = InferenceConfig(
139 model=ModelConfig(
140 model_name=SUMMARIZER_MODEL_NAME,
141 hf_token=HF_TOKEN,
142 tp_size=1,
143 max_tokens=2048,
144 max_model_len=8192,
145 ),
146 hardware=HardwareConfig(
147 num_nodes=1,
148 num_gpus=1,
149 node_offset=1,
150 num_inf_workers_per_cpu=1,
151 ),
152 batching=BatchingConfig(
153 batch_wait_seconds=0.05,
154 max_batch_size=8,
155 ),
156)
157
158
159# ===========================================================================
160# Tool registries
161# ===========================================================================
162
163planner_registry = ToolRegistry()
164planner_registry.register(propose_experiment)
165
166runner_registry = ToolRegistry()
167runner_registry.register(launch_experiment)
168runner_registry.register(check_progress)
169runner_registry.register(collect_results)
170
171analyzer_registry = ToolRegistry()
172analyzer_registry.register(analyze_convergence)
173
174reporter_registry = ToolRegistry()
175reporter_registry.register(format_results_table)
176
177
178# ===========================================================================
179# Function node — persists the final report to disk + DDict
180# ===========================================================================
181
182REPORT_DIR = os.environ.get("DRAGON_REPORT_DIR", os.getcwd())
183
184
185def save_report(*upstreams: TaskResult) -> TaskResult:
186 """Post-processing function node — saves the reporter's output to disk.
187
188 Runs as a plain Python function inside Dragon Batch (no LLM, no agent
189 queue). Receives upstream TaskResult tokens, attaches to the shared
190 DDict, reads the reporter's result, and writes it to disk.
191
192 :param upstreams: One or more TaskResult tokens from upstream nodes.
193 The first upstream carries ``task_id`` and ``serialized_ddict``.
194 :return: TaskResult with status DONE for the orchestrator to collect.
195 """
196 upstream = upstreams[0]
197 task_id = upstream.task_id
198 serialized_ddict = upstream.serialized_ddict
199
200 print(f"\n[save_report] Function node started (task_id={task_id[:8]}...)",
201 flush=True)
202
203 ddict = DDict.attach(serialized_ddict)
204 try:
205 # Read the reporter agent's result from DDict
206 reporter_dispatch_key = DISPATCH_ID_KEY.format(
207 task_id=task_id, agent_id="reporter_agent"
208 )
209 reporter_dispatch_id = ddict[reporter_dispatch_key]
210 reporter_result_key = RESULT_KEY.format(
211 task_id=task_id,
212 agent_id="reporter_agent",
213 dispatch_id=reporter_dispatch_id,
214 )
215 reporter_result = ddict[reporter_result_key]
216
217 if isinstance(reporter_result, dict):
218 report_text = reporter_result.get("response", str(reporter_result))
219 else:
220 report_text = str(reporter_result)
221
222 # Write Markdown file
223 md_path = os.path.join(REPORT_DIR, "monte_carlo_report.md")
224 with open(md_path, "w") as f:
225 f.write(report_text)
226 print(f"[save_report] Written: {md_path}", flush=True)
227
228 # Write JSON artifact with metadata
229 json_path = os.path.join(REPORT_DIR, "monte_carlo_report.json")
230 artifact = {
231 "task_id": task_id,
232 "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
233 "report": report_text,
234 "source_agent": "reporter_agent",
235 "output_files": [md_path],
236 }
237 with open(json_path, "w") as f:
238 json.dump(artifact, f, indent=2)
239 print(f"[save_report] Written: {json_path}", flush=True)
240
241 # Write own result to DDict so the orchestrator can collect it
242 own_dispatch_id = f"fn-save-report-{task_id[:8]}"
243 dispatch_id_key = DISPATCH_ID_KEY.format(
244 task_id=task_id, agent_id="save_report"
245 )
246 result_key = RESULT_KEY.format(
247 task_id=task_id, agent_id="save_report", dispatch_id=own_dispatch_id
248 )
249 status_key = STATUS_KEY.format(
250 task_id=task_id, agent_id="save_report", dispatch_id=own_dispatch_id
251 )
252 result_payload = {
253 "response": (
254 f"Report saved to:\n"
255 f" - {md_path}\n"
256 f" - {json_path}"
257 )
258 }
259 ddict[dispatch_id_key] = own_dispatch_id
260 ddict[result_key] = result_payload
261 ddict[status_key] = TaskStatus.DONE
262 finally:
263 ddict.detach()
264
265 print("[save_report] Function node complete.\n", flush=True)
266
267 return TaskResult(
268 task_id=task_id,
269 agent_id="save_report",
270 status=TaskStatus.DONE,
271 serialized_ddict=serialized_ddict,
272 )
273
274
275# ===========================================================================
276# DAG pipeline
277# ===========================================================================
278
279pipeline = Pipeline(nodes=[
280 PipelineNode(
281 agent_id="planner_agent",
282 task_description=(
283 "You are a scientific experiment planner. The user wants to study "
284 "Monte Carlo convergence for estimating π.\n\n"
285 "Propose an experiment plan by calling propose_experiment with:\n"
286 " - description, sample_sizes, convergence_target, methodology\n\n"
287 "A human operator will review your plan. If they provide "
288 "feedback, revise and resubmit. Once approved, report verbatim."
289 ),
290 depends_on=[],
291 ),
292 PipelineNode(
293 agent_id="runner_agent",
294 task_description=(
295 "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
296 "Tools:\n"
297 " 1. launch_experiment(sample_sizes, seeds) — launches ALL "
298 "simulations in parallel. Operator must approve.\n"
299 " 2. check_progress() — shows done/running/pending status.\n"
300 " 3. collect_results() — call ONLY when all_done=true.\n\n"
301 "STRICT workflow:\n"
302 " 1. Call launch_experiment.\n"
303 " 2. Call check_progress until all_done=true.\n"
304 " 3. Call collect_results — REQUIRED before final answer.\n"
305 " 4. Report collect_results output verbatim."
306 ),
307 depends_on=["planner_agent"],
308 ),
309 PipelineNode(
310 agent_id="analyzer_agent",
311 task_description=(
312 "Call analyze_convergence with a list of dicts (keys: "
313 "'n_samples', 'absolute_error'). Report all metrics verbatim."
314 ),
315 depends_on=["runner_agent"],
316 ),
317 PipelineNode(
318 agent_id="reporter_agent",
319 task_description=(
320 "Write a structured report with the experiment results.\n\n"
321 "STRICT workflow — follow every step IN ORDER:\n\n"
322 " STEP 1. Call format_results_table to get a Markdown table.\n\n"
323 " STEP 2. Check your available tools. If you have tools that\n"
324 " can manage Jupyter kernels and notebook files, do\n"
325 " BOTH sub-steps:\n"
326 " a. Create a Jupyter kernel.\n"
327 " b. Create a notebook file named\n"
328 " 'monte_carlo_convergence_report' with the\n"
329 " COMPLETE report as code_content.\n"
330 " If no such tools are available, skip Step 2.\n\n"
331 " STEP 3. Give your final answer with the SAME complete\n"
332 " report text. COPY-PASTE the actual table.\n\n"
333 "Never invent data — use only data from upstream agents."
334 ),
335 depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
336 ),
337 # ← Function node: runs save_report() after reporter_agent finishes
338 PipelineNode(
339 agent_id="save_report",
340 fn=save_report,
341 task_description="Persist final report to DDict.",
342 depends_on=["reporter_agent"],
343 ),
344])
345
346
347# ===========================================================================
348# Helpers
349# ===========================================================================
350
351def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
352 approval_filter=None,
353 max_tool_call_iterations=20,
354 memory=None,
355 summarizer_inference_queue=None,
356 mcp_servers=None):
357 """Build kwargs for create_sub_agent() — all features combined."""
358 return {
359 "config": AgentConfig(
360 agent_id=agent_id, name=name, role=role,
361 inference_queue=inference_queue,
362 summarizer_inference_queue=summarizer_inference_queue,
363 approval_filter=approval_filter,
364 max_tool_call_iterations=max_tool_call_iterations,
365 memory=memory,
366 ),
367 "tool_registry": registry,
368 "mcp_servers": mcp_servers,
369 "shutdown_event": Event(),
370 "reply_queue": Queue(),
371 }
372
373
374def _start_agents(specs, policies):
375 procs = []
376 for spec, policy in zip(specs, policies):
377 p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
378 p.start()
379 procs.append(p)
380 queues = {}
381 for spec in specs:
382 aid = spec["config"].agent_id
383 queues[aid] = spec["reply_queue"].get()
384 print(f"[startup] Agent '{aid}' ready.", flush=True)
385 return procs, queues
386
387
388# ===========================================================================
389# Main
390# ===========================================================================
391
392async def main():
393 # -------------------------------------------------------------------
394 # 1. Main inference pipeline
395 # -------------------------------------------------------------------
396 input_queue = Queue()
397
398 print("[startup] Initializing main inference pipeline...", flush=True)
399
400 inference_pipeline = None
401 try:
402 inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
403 inference_pipeline.initialize()
404 except Exception as exc:
405 import traceback
406 print(f"\n[FATAL] Main inference pipeline failed: {exc}", flush=True)
407 traceback.print_exc()
408 if inference_pipeline is not None:
409 inference_pipeline.destroy()
410 return
411 print("[startup] Main inference pipeline ready.\n", flush=True)
412
413 # -------------------------------------------------------------------
414 # 2. Summarizer inference pipeline
415 # -------------------------------------------------------------------
416 summarizer_input_queue = Queue()
417
418 print("[startup] Initializing summarizer inference pipeline...", flush=True)
419
420 summarizer_pipeline = None
421 try:
422 summarizer_pipeline = Inference(SUMMARIZER_CONFIG, summarizer_input_queue)
423 summarizer_pipeline.initialize()
424 except Exception as exc:
425 import traceback
426 print(f"\n[FATAL] Summarizer pipeline failed: {exc}", flush=True)
427 traceback.print_exc()
428 inference_pipeline.destroy()
429 if summarizer_pipeline is not None:
430 summarizer_pipeline.destroy()
431 return
432 print("[startup] Summarizer inference pipeline ready.\n", flush=True)
433
434 # -------------------------------------------------------------------
435 # 3. MCP server (optional)
436 # -------------------------------------------------------------------
437 mcp_url = sys.argv[1] if len(sys.argv) >= 2 else None
438
439 TOKEN_FILE = os.path.join(os.getcwd(), "token.txt")
440 mcp_token = None
441 if mcp_url:
442 try:
443 with open(TOKEN_FILE, "rb") as f:
444 raw = f.read()
445 mcp_token = raw.decode("utf-8").strip().strip("\ufeff")
446 print(f"[startup] MCP token loaded from {TOKEN_FILE}", flush=True)
447 except FileNotFoundError:
448 print(f"[startup] WARNING: {TOKEN_FILE} not found.", flush=True)
449
450 reporter_mcp_servers = None
451 if mcp_url:
452 reporter_mcp_servers = [
453 MCPServerConfig(url=mcp_url, alias="jupyter", token=mcp_token),
454 ]
455 print(f"[startup] Reporter MCP: {mcp_url}", flush=True)
456 else:
457 print("[startup] No MCP URL — reporter uses local tools only.", flush=True)
458
459 # -------------------------------------------------------------------
460 # 4. Node placement
461 # -------------------------------------------------------------------
462 my_alloc = System()
463 node_list = my_alloc.nodes
464 compute_host = (
465 Node(node_list[1]).hostname if len(node_list) > 1
466 else Node(node_list[0]).hostname
467 )
468 compute_policy = Policy(
469 placement=Policy.Placement.HOST_NAME,
470 host_name=compute_host,
471 )
472
473 procs, agent_specs = [], []
474 try:
475 agent_specs = [
476 _make_agent_kwargs(
477 "planner_agent", "Experiment Planner",
478 "You are an experiment planner for Monte Carlo convergence "
479 "studies on an HPC cluster.",
480 planner_registry, input_queue,
481 approval_filter=lambda n, a: n == "propose_experiment",
482 max_tool_call_iterations=60,
483 memory=MemoryConfig(
484 strategy=MemoryStrategy.SLIDING_WINDOW,
485 max_kept_turns=3,
486 ),
487 ),
488 _make_agent_kwargs(
489 "runner_agent", "Parallel Simulation Runner",
490 "You manage parallel Monte Carlo simulations.\n"
491 "You MUST call all three tools in order: "
492 "launch_experiment → check_progress → collect_results.\n"
493 "NEVER give a final answer without calling collect_results first.",
494 runner_registry, input_queue,
495 approval_filter=lambda n, a: n == "launch_experiment",
496 max_tool_call_iterations=60,
497 memory=MemoryConfig(
498 strategy=MemoryStrategy.SUMMARIZE,
499 max_kept_turns=1,
500 summarize_after_turns=2,
501 ),
502 summarizer_inference_queue=summarizer_input_queue,
503 ),
504 _make_agent_kwargs(
505 "analyzer_agent", "Convergence Analyzer",
506 "Analyse convergence. Call analyze_convergence with "
507 "n_samples + absolute_error dicts. Report verbatim.",
508 analyzer_registry, input_queue,
509 ),
510 _make_agent_kwargs(
511 "reporter_agent", "Report Writer",
512 "Write a structured report with Markdown tables. "
513 "If you have Jupyter tools, also create a notebook. "
514 "Always include tool output verbatim.",
515 reporter_registry, input_queue,
516 mcp_servers=reporter_mcp_servers,
517 ),
518 ]
519 policies = [None, compute_policy, None, None]
520
521 procs, queues = _start_agents(agent_specs, policies)
522 for spec in agent_specs:
523 spec["config"].input_queue = queues[spec["config"].agent_id]
524
525 # ← NEW: tracing=True enables the JSONL trace log
526 orchestrator = DAGOrchestrator(
527 config=OrchestratorConfig(
528 agents=[s["config"] for s in agent_specs],
529 poll_interval=0.5,
530 poll_timeout=14400.0,
531 tracing=True, # ← writes traces/<run-id>.jsonl
532 ),
533 pipeline=pipeline,
534 )
535
536 user_input = (
537 "Design and run a Monte Carlo convergence experiment for π. "
538 "I want to review the plan first, then approve the parallel "
539 "launch. Monitor progress and report which nodes ran what."
540 + (
541 " Save the final report to a Jupyter notebook on the cluster."
542 if mcp_url else ""
543 )
544 )
545
546 batch = Batch()
547 try:
548 print("=" * 60, flush=True)
549 print("Dragon AI — 06 Full Pipeline (all features)", flush=True)
550 print("=" * 60, flush=True)
551 print(f"Request: {user_input}\n", flush=True)
552
553 # Feature summary
554 print("Active features:", flush=True)
555 print(" DAG .............. 4 agents + save_report fn node", flush=True)
556 print(" HITL ............. planner (propose_experiment), "
557 "runner (launch_experiment)", flush=True)
558 print(" Memory ........... planner=SLIDING_WINDOW, "
559 "runner=SUMMARIZE", flush=True)
560 print(f" MCP .............. {'reporter → ' + mcp_url if mcp_url else 'disabled'}", flush=True)
561 print(" Tracing .......... enabled → ./traces/", flush=True)
562 print(flush=True)
563
564 # HITL instructions
565 if orchestrator.hitl_address:
566 h, p = orchestrator.hitl_address
567 print(
568 f"HITL approval client — start in another terminal:\n"
569 f" python -m dragon.ai.agent.hitl --tcp {h}:{p}\n"
570 f" Options:\n"
571 f" --jsonl PATH Custom JSONL path (default: auto-generated)\n"
572 f" --report PATH Custom .txt report path\n"
573 f" --no-color Disable ANSI colours\n",
574 flush=True,
575 )
576
577 # Trace viewer instructions
578 if orchestrator.trace_address:
579 th, tp = orchestrator.trace_address
580 print(
581 f"Trace viewer — start in another terminal:\n"
582 f" python -m dragon.ai.agent.observability --tcp {th}:{tp} -i\n"
583 f" Options:\n"
584 f" -i, --interactive Curses TUI (recommended)\n"
585 f" --jsonl PATH Custom JSONL path (default: auto-generated)\n"
586 f" --report PATH Custom .txt report path\n"
587 f" --no-color Disable ANSI colours\n",
588 flush=True,
589 )
590
591 result = orchestrator.run(
592 user_input=user_input,
593 batch=batch,
594 )
595
596 print("\n" + "=" * 60, flush=True)
597 print("FINAL RESULT", flush=True)
598 print("=" * 60, flush=True)
599 print(result, flush=True)
600
601 except Exception as exc:
602 import traceback
603 print(f"\n[error] Pipeline failed: {exc}", flush=True)
604 traceback.print_exc()
605 finally:
606 orchestrator.destroy()
607 batch.join()
608 batch.destroy()
609
610 except Exception as exc:
611 import traceback
612 print(f"\n[error] Fatal: {exc}", flush=True)
613 traceback.print_exc()
614 finally:
615 cleanup_experiment_state()
616 for spec in agent_specs:
617 try:
618 spec["shutdown_event"].set()
619 except Exception:
620 pass
621 for p in procs:
622 try:
623 p.join()
624 except Exception:
625 pass
626 print("\n[teardown] All agents stopped.", flush=True)
627 try:
628 inference_pipeline.destroy()
629 except Exception:
630 pass
631 print("[teardown] Main inference pipeline stopped.", flush=True)
632 try:
633 summarizer_pipeline.destroy()
634 except Exception:
635 pass
636 print("[teardown] Summarizer inference pipeline stopped.", flush=True)
637
638
639if __name__ == "__main__":
640 mp.set_start_method("dragon")
641 asyncio.run(main())
Key Concepts
Feature Integration:
Multi-agent DAG (from 02): Four agents form a pipeline with automatic upstream result flow
HITL Approval (from 03): Planner and runner require human review for tool calls
Memory Management (from 04): Planner uses sliding window, runner uses summarization with dedicated small LLM
MCP Tools (from 05): Reporter can call tools from remote MCP servers
Tracing (new feature): Every operation logged to
./traces/{run_id}.jsonl
Trace Structure:
Each line in the trace file is a JSON object:
{"timestamp": "...", "type": "llm_call", "agent": "planner", "model": "llama-70b", ...}
{"timestamp": "...", "type": "tool_call", "agent": "planner", "tool": "propose_strategy", ...}
{"timestamp": "...", "type": "hitl_approval", "agent": "planner", "status": "approved", ...}
{"timestamp": "...", "type": "memory_op", "agent": "runner", "op": "summarize", ...}
{"timestamp": "...", "type": "dag_transition", "from": "planner", "to": "runner", ...}
Trace Viewer:
Interactively explore the trace to understand agent reasoning, tool execution paths, approval decisions, and memory operations in real-time.
Installation
See Example 01 (same dependencies).
System Description
Tested on HPE Cray EX:
Node 0 (main inference): 2 Nvidia A100 GPUs, AMD EPYC 64-core CPU
Node 1 (summarizer): 1 Nvidia A100 GPU, AMD EPYC 64-core CPU
Total: 2 compute nodes required
How to Run
Step 1: Edit configuration
Open 06_full_pipeline.py and set:
MODEL_NAME(main reasoning model)SUMMARIZER_MODEL_NAME(small model for summarization)MCP server URLs (if using external tools)
Step 2: Generate auth token (if using MCP)
echo "your-mcp-server-token" > token.txt
Step 3: Allocate nodes
salloc --nodes=2 --exclusive
Step 4: Run
# Without MCP:
dragon 06_full_pipeline.py
# With MCP (e.g., Jupyter server):
dragon 06_full_pipeline.py http://jupyter-mcp-server:8888
Example output:
$ dragon 06_full_pipeline.py
Starting 2-node infrastructure...
Node 0: Main inference service (70B model) initialized
Node 1: Summarizer service (7B model) initialized
Agent 'planner' started
Agent 'runner' started
Agent 'analyzer' started
Agent 'reporter' started (with MCP: jupyter)
Task: 'Execute complex analysis workflow'
Planner proposing strategy... ✓
HITL: Approve planner's tool call? [A/R/F]: A
Runner executing strategy... ✓
HITL: Approve runner's tool call? [A/R/F]: A
Analyzer reviewing results... ✓
Reporter generating final report... ✓
Function node: save_report() executed
Workflow completed
Trace written to: ./traces/20260421-163045.jsonl
Step 5: Inspect trace (optional, in another terminal)
python -m dragon.ai.agent.observability --trace-file ./traces/20260421-163045.jsonl
HITL approval client (in another terminal, during run)
python -m dragon.ai.agent.hitl_client --tcp 127.0.0.1:9000
Use to approve/reject tool calls in real-time or add feedback.
Next Steps
Deploy to production using this example as a blueprint
Customize memory strategies for your task’s characteristics
Add more MCP servers for access to domain-specific tools
Monitor trace files to detect failure patterns and refine agents
Replay traces for debugging and testing recovery scenarios