Full Production Pipeline

This is the capstone example, combining all features from the previous five examples into a production-ready reference implementation. It demonstrates: multi-agent DAG, HITL approval gates, memory strategies, MCP tool servers, and real-time tracing / observability. This is your template for building complex agentic systems on Dragon.

Prerequisites: Read Examples 01–05 first.

What you’ll learn:

  • How to integrate all five techniques into one system

  • How to enable tracing to log every LLM call, tool invocation, approval, and memory operation to a structured JSONL file

  • How to use the trace viewer to inspect and replay execution

  • Best practices for production agent deployment on HPC clusters

Architecture:

  • Node 0, GPUs [0,1]: Main inference service (large reasoning LLM)

  • Node 1, GPU [0]: Dedicated summarizer LLM

  • DAG: planner → runner → analyzer → reporter → save_report (function node)

  • All agents have memory management, some have HITL approval, one has MCP tools

  • Tracing enabled for full observability

Main Code

Below is the complete example:

Listing 32 06_full_pipeline.py: All features combined with tracing
  1"""06 — Full Pipeline: DAG + HITL + Memory + MCP + Tracing.
  2
  3**Prerequisites:** Read ``01``–``05`` first.
  4
  5**What's new:**
  6
  7* **Tracing / observability** — ``tracing=True`` on ``OrchestratorConfig``
  8  writes a per-run JSONL trace with every LLM call, tool call,
  9  approval/rejection, memory operation, and DAG state transition
 10* **Trace viewer CLI** — inspect the trace interactively after a run
 11* **Everything combined** — this is the production-ready reference
 12  combining all features from the previous examples:
 13
 14  - Multi-agent DAG + function node       (from 02)
 15  - HITL approval gates                   (from 03)
 16  - Memory strategies + summarizer LLM    (from 04)
 17  - MCP tool server integration           (from 05)
 18
 19Architecture::
 20
 21    Node 0 GPUs [0,1] → Main InferenceWorker (tp_size=2)
 22    Node 1 GPU  [0]   → Summarizer Worker    (tp_size=1)
 23
 24    planner_agent ──► runner_agent ──► analyzer_agent ──┐
 25       [HITL]           [HITL]           [memory]       ├──► reporter_agent ──► save_report(fn)
 26     SLIDING_WINDOW   SUMMARIZE                         │       [MCP: jupyter]
 27         └──────────────┴───────────────────────────────┘
 28
 29    tracing → ./traces/<run-id>.jsonl
 30
 31Usage::
 32
 33    # Without MCP:
 34    dragon 06_full_pipeline.py
 35
 36    # With MCP:
 37    echo '<token>' > token.txt
 38    dragon 06_full_pipeline.py <mcp_url>
 39
 40    # HITL approval client (in another terminal):
 41    python -m dragon.ai.agent.hitl --tcp HOST:PORT
 42
 43    # Trace viewer (in another terminal):
 44    python -m dragon.ai.agent.observability --tcp HOST:PORT -i
 45"""
 46
 47import asyncio
 48import json
 49import os
 50import sys
 51import time
 52from typing import Any
 53
 54import dragon
 55import multiprocessing as mp
 56
 57from dragon.ai.agent.core import create_sub_agent
 58from dragon.ai.agent.config import (
 59    AgentConfig,
 60    MCPServerConfig,
 61    MemoryConfig,
 62    MemoryStrategy,
 63    OrchestratorConfig,
 64    Pipeline,
 65    PipelineNode,
 66    TaskResult,
 67    TaskStatus,
 68    DISPATCH_ID_KEY,
 69    RESULT_KEY,
 70    STATUS_KEY,
 71)
 72from dragon.ai.agent.tools import ToolRegistry
 73from dragon.ai.agent.orchestrator import DAGOrchestrator
 74from dragon.data.ddict import DDict
 75from dragon.infrastructure.policy import Policy
 76from dragon.native.event import Event
 77from dragon.native.machine import Node, System
 78from dragon.native.process import Process
 79from dragon.native.queue import Queue
 80from dragon.workflows.batch import Batch
 81
 82from dragon.ai.inference.config import (
 83    BatchingConfig,
 84    HardwareConfig,
 85    InferenceConfig,
 86    ModelConfig,
 87)
 88from dragon.ai.inference.inference_utils import Inference
 89
 90from tools import (
 91    propose_experiment,
 92    launch_experiment,
 93    check_progress,
 94    collect_results,
 95    analyze_convergence,
 96    format_results_table,
 97)
 98from tools.runner import cleanup_experiment_state
 99
100
101# ===========================================================================
102# User-configurable constants
103# ===========================================================================
104
105MODEL_NAME = "/path/to/your/model"
106HF_TOKEN = ""
107SUMMARIZER_MODEL_NAME = "/path/to/your/summarizer/model"
108
109
110# ===========================================================================
111# Inference — Main model (reasoning + tool-calling)
112# ===========================================================================
113
114INFERENCE_CONFIG = InferenceConfig(
115    model=ModelConfig(
116        model_name=MODEL_NAME,
117        hf_token=HF_TOKEN,
118        tp_size=2,
119        max_tokens=8192,
120        max_model_len=32768,
121    ),
122    hardware=HardwareConfig(
123        num_nodes=1,
124        num_gpus=2,
125        num_inf_workers_per_cpu=1,
126    ),
127    batching=BatchingConfig(
128        batch_wait_seconds=0.1,
129        max_batch_size=32,
130    ),
131)
132
133
134# ===========================================================================
135# Inference — Summarizer (separate GPU on node 1)
136# ===========================================================================
137
138SUMMARIZER_CONFIG = InferenceConfig(
139    model=ModelConfig(
140        model_name=SUMMARIZER_MODEL_NAME,
141        hf_token=HF_TOKEN,
142        tp_size=1,
143        max_tokens=2048,
144        max_model_len=8192,
145    ),
146    hardware=HardwareConfig(
147        num_nodes=1,
148        num_gpus=1,
149        node_offset=1,
150        num_inf_workers_per_cpu=1,
151    ),
152    batching=BatchingConfig(
153        batch_wait_seconds=0.05,
154        max_batch_size=8,
155    ),
156)
157
158
159# ===========================================================================
160# Tool registries
161# ===========================================================================
162
163planner_registry = ToolRegistry()
164planner_registry.register(propose_experiment)
165
166runner_registry = ToolRegistry()
167runner_registry.register(launch_experiment)
168runner_registry.register(check_progress)
169runner_registry.register(collect_results)
170
171analyzer_registry = ToolRegistry()
172analyzer_registry.register(analyze_convergence)
173
174reporter_registry = ToolRegistry()
175reporter_registry.register(format_results_table)
176
177
178# ===========================================================================
179# Function node — persists the final report to disk + DDict
180# ===========================================================================
181
182REPORT_DIR = os.environ.get("DRAGON_REPORT_DIR", os.getcwd())
183
184
185def save_report(*upstreams: TaskResult) -> TaskResult:
186    """Post-processing function node — saves the reporter's output to disk.
187
188    Runs as a plain Python function inside Dragon Batch (no LLM, no agent
189    queue).  Receives upstream TaskResult tokens, attaches to the shared
190    DDict, reads the reporter's result, and writes it to disk.
191
192    :param upstreams: One or more TaskResult tokens from upstream nodes.
193        The first upstream carries ``task_id`` and ``serialized_ddict``.
194    :return: TaskResult with status DONE for the orchestrator to collect.
195    """
196    upstream = upstreams[0]
197    task_id = upstream.task_id
198    serialized_ddict = upstream.serialized_ddict
199
200    print(f"\n[save_report] Function node started (task_id={task_id[:8]}...)",
201          flush=True)
202
203    ddict = DDict.attach(serialized_ddict)
204    try:
205        # Read the reporter agent's result from DDict
206        reporter_dispatch_key = DISPATCH_ID_KEY.format(
207            task_id=task_id, agent_id="reporter_agent"
208        )
209        reporter_dispatch_id = ddict[reporter_dispatch_key]
210        reporter_result_key = RESULT_KEY.format(
211            task_id=task_id,
212            agent_id="reporter_agent",
213            dispatch_id=reporter_dispatch_id,
214        )
215        reporter_result = ddict[reporter_result_key]
216
217        if isinstance(reporter_result, dict):
218            report_text = reporter_result.get("response", str(reporter_result))
219        else:
220            report_text = str(reporter_result)
221
222        # Write Markdown file
223        md_path = os.path.join(REPORT_DIR, "monte_carlo_report.md")
224        with open(md_path, "w") as f:
225            f.write(report_text)
226        print(f"[save_report] Written: {md_path}", flush=True)
227
228        # Write JSON artifact with metadata
229        json_path = os.path.join(REPORT_DIR, "monte_carlo_report.json")
230        artifact = {
231            "task_id": task_id,
232            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
233            "report": report_text,
234            "source_agent": "reporter_agent",
235            "output_files": [md_path],
236        }
237        with open(json_path, "w") as f:
238            json.dump(artifact, f, indent=2)
239        print(f"[save_report] Written: {json_path}", flush=True)
240
241        # Write own result to DDict so the orchestrator can collect it
242        own_dispatch_id = f"fn-save-report-{task_id[:8]}"
243        dispatch_id_key = DISPATCH_ID_KEY.format(
244            task_id=task_id, agent_id="save_report"
245        )
246        result_key = RESULT_KEY.format(
247            task_id=task_id, agent_id="save_report", dispatch_id=own_dispatch_id
248        )
249        status_key = STATUS_KEY.format(
250            task_id=task_id, agent_id="save_report", dispatch_id=own_dispatch_id
251        )
252        result_payload = {
253            "response": (
254                f"Report saved to:\n"
255                f"  - {md_path}\n"
256                f"  - {json_path}"
257            )
258        }
259        ddict[dispatch_id_key] = own_dispatch_id
260        ddict[result_key] = result_payload
261        ddict[status_key] = TaskStatus.DONE
262    finally:
263        ddict.detach()
264
265    print("[save_report] Function node complete.\n", flush=True)
266
267    return TaskResult(
268        task_id=task_id,
269        agent_id="save_report",
270        status=TaskStatus.DONE,
271        serialized_ddict=serialized_ddict,
272    )
273
274
275# ===========================================================================
276# DAG pipeline
277# ===========================================================================
278
279pipeline = Pipeline(nodes=[
280    PipelineNode(
281        agent_id="planner_agent",
282        task_description=(
283            "You are a scientific experiment planner.  The user wants to study "
284            "Monte Carlo convergence for estimating π.\n\n"
285            "Propose an experiment plan by calling propose_experiment with:\n"
286            "  - description, sample_sizes, convergence_target, methodology\n\n"
287            "A human operator will review your plan.  If they provide "
288            "feedback, revise and resubmit.  Once approved, report verbatim."
289        ),
290        depends_on=[],
291    ),
292    PipelineNode(
293        agent_id="runner_agent",
294        task_description=(
295            "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
296            "Tools:\n"
297            "  1. launch_experiment(sample_sizes, seeds) — launches ALL "
298            "simulations in parallel.  Operator must approve.\n"
299            "  2. check_progress() — shows done/running/pending status.\n"
300            "  3. collect_results() — call ONLY when all_done=true.\n\n"
301            "STRICT workflow:\n"
302            "  1. Call launch_experiment.\n"
303            "  2. Call check_progress until all_done=true.\n"
304            "  3. Call collect_results — REQUIRED before final answer.\n"
305            "  4. Report collect_results output verbatim."
306        ),
307        depends_on=["planner_agent"],
308    ),
309    PipelineNode(
310        agent_id="analyzer_agent",
311        task_description=(
312            "Call analyze_convergence with a list of dicts (keys: "
313            "'n_samples', 'absolute_error').  Report all metrics verbatim."
314        ),
315        depends_on=["runner_agent"],
316    ),
317    PipelineNode(
318        agent_id="reporter_agent",
319        task_description=(
320            "Write a structured report with the experiment results.\n\n"
321            "STRICT workflow — follow every step IN ORDER:\n\n"
322            "  STEP 1.  Call format_results_table to get a Markdown table.\n\n"
323            "  STEP 2.  Check your available tools.  If you have tools that\n"
324            "           can manage Jupyter kernels and notebook files, do\n"
325            "           BOTH sub-steps:\n"
326            "             a. Create a Jupyter kernel.\n"
327            "             b. Create a notebook file named\n"
328            "                'monte_carlo_convergence_report' with the\n"
329            "                COMPLETE report as code_content.\n"
330            "           If no such tools are available, skip Step 2.\n\n"
331            "  STEP 3.  Give your final answer with the SAME complete\n"
332            "           report text.  COPY-PASTE the actual table.\n\n"
333            "Never invent data — use only data from upstream agents."
334        ),
335        depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
336    ),
337    # ← Function node: runs save_report() after reporter_agent finishes
338    PipelineNode(
339        agent_id="save_report",
340        fn=save_report,
341        task_description="Persist final report to DDict.",
342        depends_on=["reporter_agent"],
343    ),
344])
345
346
347# ===========================================================================
348# Helpers
349# ===========================================================================
350
351def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
352                       approval_filter=None,
353                       max_tool_call_iterations=20,
354                       memory=None,
355                       summarizer_inference_queue=None,
356                       mcp_servers=None):
357    """Build kwargs for create_sub_agent() — all features combined."""
358    return {
359        "config": AgentConfig(
360            agent_id=agent_id, name=name, role=role,
361            inference_queue=inference_queue,
362            summarizer_inference_queue=summarizer_inference_queue,
363            approval_filter=approval_filter,
364            max_tool_call_iterations=max_tool_call_iterations,
365            memory=memory,
366        ),
367        "tool_registry": registry,
368        "mcp_servers": mcp_servers,
369        "shutdown_event": Event(),
370        "reply_queue": Queue(),
371    }
372
373
374def _start_agents(specs, policies):
375    procs = []
376    for spec, policy in zip(specs, policies):
377        p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
378        p.start()
379        procs.append(p)
380    queues = {}
381    for spec in specs:
382        aid = spec["config"].agent_id
383        queues[aid] = spec["reply_queue"].get()
384        print(f"[startup] Agent '{aid}' ready.", flush=True)
385    return procs, queues
386
387
388# ===========================================================================
389# Main
390# ===========================================================================
391
392async def main():
393    # -------------------------------------------------------------------
394    # 1. Main inference pipeline
395    # -------------------------------------------------------------------
396    input_queue = Queue()
397
398    print("[startup] Initializing main inference pipeline...", flush=True)
399
400    inference_pipeline = None
401    try:
402        inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
403        inference_pipeline.initialize()
404    except Exception as exc:
405        import traceback
406        print(f"\n[FATAL] Main inference pipeline failed: {exc}", flush=True)
407        traceback.print_exc()
408        if inference_pipeline is not None:
409            inference_pipeline.destroy()
410        return
411    print("[startup] Main inference pipeline ready.\n", flush=True)
412
413    # -------------------------------------------------------------------
414    # 2. Summarizer inference pipeline
415    # -------------------------------------------------------------------
416    summarizer_input_queue = Queue()
417
418    print("[startup] Initializing summarizer inference pipeline...", flush=True)
419
420    summarizer_pipeline = None
421    try:
422        summarizer_pipeline = Inference(SUMMARIZER_CONFIG, summarizer_input_queue)
423        summarizer_pipeline.initialize()
424    except Exception as exc:
425        import traceback
426        print(f"\n[FATAL] Summarizer pipeline failed: {exc}", flush=True)
427        traceback.print_exc()
428        inference_pipeline.destroy()
429        if summarizer_pipeline is not None:
430            summarizer_pipeline.destroy()
431        return
432    print("[startup] Summarizer inference pipeline ready.\n", flush=True)
433
434    # -------------------------------------------------------------------
435    # 3. MCP server (optional)
436    # -------------------------------------------------------------------
437    mcp_url = sys.argv[1] if len(sys.argv) >= 2 else None
438
439    TOKEN_FILE = os.path.join(os.getcwd(), "token.txt")
440    mcp_token = None
441    if mcp_url:
442        try:
443            with open(TOKEN_FILE, "rb") as f:
444                raw = f.read()
445            mcp_token = raw.decode("utf-8").strip().strip("\ufeff")
446            print(f"[startup] MCP token loaded from {TOKEN_FILE}", flush=True)
447        except FileNotFoundError:
448            print(f"[startup] WARNING: {TOKEN_FILE} not found.", flush=True)
449
450    reporter_mcp_servers = None
451    if mcp_url:
452        reporter_mcp_servers = [
453            MCPServerConfig(url=mcp_url, alias="jupyter", token=mcp_token),
454        ]
455        print(f"[startup] Reporter MCP: {mcp_url}", flush=True)
456    else:
457        print("[startup] No MCP URL — reporter uses local tools only.", flush=True)
458
459    # -------------------------------------------------------------------
460    # 4. Node placement
461    # -------------------------------------------------------------------
462    my_alloc = System()
463    node_list = my_alloc.nodes
464    compute_host = (
465        Node(node_list[1]).hostname if len(node_list) > 1
466        else Node(node_list[0]).hostname
467    )
468    compute_policy = Policy(
469        placement=Policy.Placement.HOST_NAME,
470        host_name=compute_host,
471    )
472
473    procs, agent_specs = [], []
474    try:
475        agent_specs = [
476            _make_agent_kwargs(
477                "planner_agent", "Experiment Planner",
478                "You are an experiment planner for Monte Carlo convergence "
479                "studies on an HPC cluster.",
480                planner_registry, input_queue,
481                approval_filter=lambda n, a: n == "propose_experiment",
482                max_tool_call_iterations=60,
483                memory=MemoryConfig(
484                    strategy=MemoryStrategy.SLIDING_WINDOW,
485                    max_kept_turns=3,
486                ),
487            ),
488            _make_agent_kwargs(
489                "runner_agent", "Parallel Simulation Runner",
490                "You manage parallel Monte Carlo simulations.\n"
491                "You MUST call all three tools in order: "
492                "launch_experiment → check_progress → collect_results.\n"
493                "NEVER give a final answer without calling collect_results first.",
494                runner_registry, input_queue,
495                approval_filter=lambda n, a: n == "launch_experiment",
496                max_tool_call_iterations=60,
497                memory=MemoryConfig(
498                    strategy=MemoryStrategy.SUMMARIZE,
499                    max_kept_turns=1,
500                    summarize_after_turns=2,
501                ),
502                summarizer_inference_queue=summarizer_input_queue,
503            ),
504            _make_agent_kwargs(
505                "analyzer_agent", "Convergence Analyzer",
506                "Analyse convergence.  Call analyze_convergence with "
507                "n_samples + absolute_error dicts.  Report verbatim.",
508                analyzer_registry, input_queue,
509            ),
510            _make_agent_kwargs(
511                "reporter_agent", "Report Writer",
512                "Write a structured report with Markdown tables.  "
513                "If you have Jupyter tools, also create a notebook.  "
514                "Always include tool output verbatim.",
515                reporter_registry, input_queue,
516                mcp_servers=reporter_mcp_servers,
517            ),
518        ]
519        policies = [None, compute_policy, None, None]
520
521        procs, queues = _start_agents(agent_specs, policies)
522        for spec in agent_specs:
523            spec["config"].input_queue = queues[spec["config"].agent_id]
524
525        # ← NEW: tracing=True enables the JSONL trace log
526        orchestrator = DAGOrchestrator(
527            config=OrchestratorConfig(
528                agents=[s["config"] for s in agent_specs],
529                poll_interval=0.5,
530                poll_timeout=14400.0,
531                tracing=True,          # ← writes traces/<run-id>.jsonl
532            ),
533            pipeline=pipeline,
534        )
535
536        user_input = (
537            "Design and run a Monte Carlo convergence experiment for π.  "
538            "I want to review the plan first, then approve the parallel "
539            "launch.  Monitor progress and report which nodes ran what."
540            + (
541                "  Save the final report to a Jupyter notebook on the cluster."
542                if mcp_url else ""
543            )
544        )
545
546        batch = Batch()
547        try:
548            print("=" * 60, flush=True)
549            print("Dragon AI — 06 Full Pipeline (all features)", flush=True)
550            print("=" * 60, flush=True)
551            print(f"Request: {user_input}\n", flush=True)
552
553            # Feature summary
554            print("Active features:", flush=True)
555            print("  DAG .............. 4 agents + save_report fn node", flush=True)
556            print("  HITL ............. planner (propose_experiment), "
557                  "runner (launch_experiment)", flush=True)
558            print("  Memory ........... planner=SLIDING_WINDOW, "
559                  "runner=SUMMARIZE", flush=True)
560            print(f"  MCP .............. {'reporter → ' + mcp_url if mcp_url else 'disabled'}", flush=True)
561            print("  Tracing .......... enabled → ./traces/", flush=True)
562            print(flush=True)
563
564            # HITL instructions
565            if orchestrator.hitl_address:
566                h, p = orchestrator.hitl_address
567                print(
568                    f"HITL approval client — start in another terminal:\n"
569                    f"  python -m dragon.ai.agent.hitl --tcp {h}:{p}\n"
570                    f"  Options:\n"
571                    f"    --jsonl PATH        Custom JSONL path (default: auto-generated)\n"
572                    f"    --report PATH       Custom .txt report path\n"
573                    f"    --no-color          Disable ANSI colours\n",
574                    flush=True,
575                )
576
577            # Trace viewer instructions
578            if orchestrator.trace_address:
579                th, tp = orchestrator.trace_address
580                print(
581                    f"Trace viewer — start in another terminal:\n"
582                    f"  python -m dragon.ai.agent.observability --tcp {th}:{tp} -i\n"
583                    f"  Options:\n"
584                    f"    -i, --interactive   Curses TUI (recommended)\n"
585                    f"    --jsonl PATH        Custom JSONL path (default: auto-generated)\n"
586                    f"    --report PATH       Custom .txt report path\n"
587                    f"    --no-color          Disable ANSI colours\n",
588                    flush=True,
589                )
590
591            result = orchestrator.run(
592                user_input=user_input,
593                batch=batch,
594            )
595
596            print("\n" + "=" * 60, flush=True)
597            print("FINAL RESULT", flush=True)
598            print("=" * 60, flush=True)
599            print(result, flush=True)
600
601        except Exception as exc:
602            import traceback
603            print(f"\n[error] Pipeline failed: {exc}", flush=True)
604            traceback.print_exc()
605        finally:
606            orchestrator.destroy()
607            batch.join()
608            batch.destroy()
609
610    except Exception as exc:
611        import traceback
612        print(f"\n[error] Fatal: {exc}", flush=True)
613        traceback.print_exc()
614    finally:
615        cleanup_experiment_state()
616        for spec in agent_specs:
617            try:
618                spec["shutdown_event"].set()
619            except Exception:
620                pass
621        for p in procs:
622            try:
623                p.join()
624            except Exception:
625                pass
626        print("\n[teardown] All agents stopped.", flush=True)
627        try:
628            inference_pipeline.destroy()
629        except Exception:
630            pass
631        print("[teardown] Main inference pipeline stopped.", flush=True)
632        try:
633            summarizer_pipeline.destroy()
634        except Exception:
635            pass
636        print("[teardown] Summarizer inference pipeline stopped.", flush=True)
637
638
639if __name__ == "__main__":
640    mp.set_start_method("dragon")
641    asyncio.run(main())

Key Concepts

Feature Integration:

  1. Multi-agent DAG (from 02): Four agents form a pipeline with automatic upstream result flow

  2. HITL Approval (from 03): Planner and runner require human review for tool calls

  3. Memory Management (from 04): Planner uses sliding window, runner uses summarization with dedicated small LLM

  4. MCP Tools (from 05): Reporter can call tools from remote MCP servers

  5. Tracing (new feature): Every operation logged to ./traces/{run_id}.jsonl

Trace Structure:

Each line in the trace file is a JSON object:

{"timestamp": "...", "type": "llm_call", "agent": "planner", "model": "llama-70b", ...}
{"timestamp": "...", "type": "tool_call", "agent": "planner", "tool": "propose_strategy", ...}
{"timestamp": "...", "type": "hitl_approval", "agent": "planner", "status": "approved", ...}
{"timestamp": "...", "type": "memory_op", "agent": "runner", "op": "summarize", ...}
{"timestamp": "...", "type": "dag_transition", "from": "planner", "to": "runner", ...}

Trace Viewer:

Interactively explore the trace to understand agent reasoning, tool execution paths, approval decisions, and memory operations in real-time.

Installation

See Example 01 (same dependencies).

System Description

Tested on HPE Cray EX:

  • Node 0 (main inference): 2 Nvidia A100 GPUs, AMD EPYC 64-core CPU

  • Node 1 (summarizer): 1 Nvidia A100 GPU, AMD EPYC 64-core CPU

  • Total: 2 compute nodes required

How to Run

Step 1: Edit configuration

Open 06_full_pipeline.py and set:

  • MODEL_NAME (main reasoning model)

  • SUMMARIZER_MODEL_NAME (small model for summarization)

  • MCP server URLs (if using external tools)

Step 2: Generate auth token (if using MCP)

echo "your-mcp-server-token" > token.txt

Step 3: Allocate nodes

salloc --nodes=2 --exclusive

Step 4: Run

# Without MCP:
dragon 06_full_pipeline.py

# With MCP (e.g., Jupyter server):
dragon 06_full_pipeline.py http://jupyter-mcp-server:8888

Example output:

$ dragon 06_full_pipeline.py
Starting 2-node infrastructure...
Node 0: Main inference service (70B model) initialized
Node 1: Summarizer service (7B model) initialized
Agent 'planner' started
Agent 'runner' started
Agent 'analyzer' started
Agent 'reporter' started (with MCP: jupyter)
Task: 'Execute complex analysis workflow'
Planner proposing strategy... ✓
HITL: Approve planner's tool call? [A/R/F]: A
Runner executing strategy... ✓
HITL: Approve runner's tool call? [A/R/F]: A
Analyzer reviewing results... ✓
Reporter generating final report... ✓
Function node: save_report() executed
Workflow completed
Trace written to: ./traces/20260421-163045.jsonl

Step 5: Inspect trace (optional, in another terminal)

python -m dragon.ai.agent.observability --trace-file ./traces/20260421-163045.jsonl

HITL approval client (in another terminal, during run)

python -m dragon.ai.agent.hitl_client --tcp 127.0.0.1:9000

Use to approve/reject tool calls in real-time or add feedback.

Next Steps

  • Deploy to production using this example as a blueprint

  • Customize memory strategies for your task’s characteristics

  • Add more MCP servers for access to domain-specific tools

  • Monitor trace files to detect failure patterns and refine agents

  • Replay traces for debugging and testing recovery scenarios