Memory Management Strategies

Building on Examples 01–03, this example demonstrates memory management for long-running agentic loops. As conversations grow with many LLM turns, agents can manage history via different strategies: sliding window (drop old turns), summarization (compress via a dedicated small LLM), or keep-all. The example also shows a dedicated summarizer LLM on a separate GPU partition to avoid contention with the main reasoning model.

Prerequisites: Read 03_hitl_approval.py first.

What you’ll learn:

  • How to configure memory strategies for different agents

  • SLIDING_WINDOW — drop old turns, keep last N; lowest cost

  • SUMMARIZE — keep last N turns + a summary of older ones (via separate LLM)

  • FULL — keep everything (suitable for short-lived agents only)

  • How to set up a dedicated summarizer LLM on a separate node

  • How to balance context preservation with token usage and latency

Architecture:

  • Node 0, GPUs [0,1]: Main inference service (large reasoning LLM, tp_size=2)

  • Node 1, GPU [0]: Summarizer service (small LLM, tp_size=1)

Main Code

Below is the complete example:

Listing 30 04_memory.py: Memory strategies and dedicated summarizer
  1"""04 — Memory Management Strategies + Dedicated Summarizer LLM.
  2
  3**Prerequisites:** Read ``03_hitl_approval.py`` first.
  4
  5**What's new:**
  6
  7* **MemoryConfig** and **MemoryStrategy** — control how agents manage
  8  conversation history as the agentic loop grows
  9* **Three strategies side-by-side:**
 10
 11  - ``planner_agent`` → ``SLIDING_WINDOW`` — drops old turn-pairs beyond
 12    ``max_kept_turns``, replacing them with a synthetic note
 13  - ``runner_agent``  → ``SUMMARIZE`` — calls a second LLM to compress
 14    old turns into a summary instead of dropping them
 15  - ``analyzer_agent`` / ``reporter_agent`` → ``FULL`` (default) — keep
 16    everything (fine for short-lived agents)
 17
 18* **Dedicated summarizer LLM** — a separate, smaller model on its own
 19  GPU partition, used only for summarization.  The main model stays
 20  uncontended for reasoning and tool-calling.
 21
 22Architecture::
 23
 24    Node 0  GPUs [0,1]  → Main InferenceWorker  (large model, tp_size=2)
 25    Node 1  GPU  [0]    → Summarizer Worker     (small model, tp_size=1)
 26
 27Usage::
 28
 29    # Terminal 1 (needs 2 nodes with GPUs):
 30    dragon 04_memory.py
 31
 32    # Terminal 2 (HITL approval client):
 33    python -m dragon.ai.agent.hitl --tcp HOST:PORT
 34"""
 35
 36import asyncio
 37from typing import Any
 38
 39import dragon
 40import multiprocessing as mp
 41
 42from dragon.ai.agent.core import create_sub_agent
 43from dragon.ai.agent.config import (
 44    AgentConfig,
 45    MemoryConfig,          # ← NEW
 46    MemoryStrategy,        # ← NEW
 47    OrchestratorConfig,
 48    Pipeline,
 49    PipelineNode,
 50)
 51from dragon.ai.agent.tools import ToolRegistry
 52from dragon.ai.agent.orchestrator import DAGOrchestrator
 53from dragon.infrastructure.policy import Policy
 54from dragon.native.event import Event
 55from dragon.native.machine import Node, System
 56from dragon.native.process import Process
 57from dragon.native.queue import Queue
 58from dragon.workflows.batch import Batch
 59
 60from dragon.ai.inference.config import (
 61    BatchingConfig,
 62    HardwareConfig,
 63    InferenceConfig,
 64    ModelConfig,
 65)
 66from dragon.ai.inference.inference_utils import Inference
 67
 68from tools import (
 69    propose_experiment,
 70    launch_experiment,
 71    check_progress,
 72    collect_results,
 73    analyze_convergence,
 74    format_results_table,
 75)
 76from tools.runner import cleanup_experiment_state
 77
 78
 79# ===========================================================================
 80# User-configurable constants
 81# ===========================================================================
 82
 83MODEL_NAME = "/path/to/your/model"
 84HF_TOKEN = ""
 85
 86# ← NEW: Smaller instruct model used exclusively for summarization
 87SUMMARIZER_MODEL_NAME = "/path/to/your/summarizer/model"
 88
 89
 90# ===========================================================================
 91# Inference Pipeline Configuration — Main (reasoning + tool-calling)
 92# ===========================================================================
 93
 94INFERENCE_CONFIG = InferenceConfig(
 95    model=ModelConfig(
 96        model_name=MODEL_NAME,
 97        hf_token=HF_TOKEN,
 98        tp_size=2,
 99        max_tokens=8192,
100        max_model_len=32768,
101    ),
102    hardware=HardwareConfig(
103        num_nodes=1,
104        num_gpus=2,
105        num_inf_workers_per_cpu=1,
106    ),
107    batching=BatchingConfig(
108        batch_wait_seconds=0.1,
109        max_batch_size=32,
110    ),
111)
112
113
114# ===========================================================================
115# ← NEW: Inference Pipeline Configuration — Summarizer
116#
117# Placed on the SECOND node (node_offset=1) so the main model's GPUs on
118# node 0 are not contended.  A small instruct model fits on a single GPU.
119# ===========================================================================
120
121SUMMARIZER_CONFIG = InferenceConfig(
122    model=ModelConfig(
123        model_name=SUMMARIZER_MODEL_NAME,
124        hf_token=HF_TOKEN,
125        tp_size=1,
126        max_tokens=2048,
127        max_model_len=8192,
128    ),
129    hardware=HardwareConfig(
130        num_nodes=1,
131        num_gpus=1,
132        node_offset=1,               # ← start from node 1 (second node)
133        num_inf_workers_per_cpu=1,
134    ),
135    batching=BatchingConfig(
136        batch_wait_seconds=0.05,
137        max_batch_size=8,
138    ),
139)
140
141
142# ===========================================================================
143# Tool registries
144# ===========================================================================
145
146planner_registry = ToolRegistry()
147planner_registry.register(propose_experiment)
148
149runner_registry = ToolRegistry()
150runner_registry.register(launch_experiment)
151runner_registry.register(check_progress)
152runner_registry.register(collect_results)
153
154analyzer_registry = ToolRegistry()
155analyzer_registry.register(analyze_convergence)
156
157reporter_registry = ToolRegistry()
158reporter_registry.register(format_results_table)
159
160
161# ===========================================================================
162# DAG pipeline
163# ===========================================================================
164
165pipeline = Pipeline(nodes=[
166    PipelineNode(
167        agent_id="planner_agent",
168        task_description=(
169            "You are a scientific experiment planner.  The user wants to study "
170            "Monte Carlo convergence for estimating π.\n\n"
171            "Propose an experiment plan by calling propose_experiment with:\n"
172            "  - description, sample_sizes, convergence_target, methodology\n\n"
173            "A human operator will review your plan.  If they provide "
174            "feedback, revise and resubmit.  Once approved, report verbatim."
175        ),
176        depends_on=[],
177    ),
178    PipelineNode(
179        agent_id="runner_agent",
180        task_description=(
181            "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
182            "Tools:\n"
183            "  1. launch_experiment(sample_sizes, seeds) — launches ALL "
184            "simulations in parallel.  Operator must approve.\n"
185            "  2. check_progress() — shows done/running/pending status.\n"
186            "  3. collect_results() — call ONLY when all_done=true.\n\n"
187            "STRICT workflow:\n"
188            "  1. Call launch_experiment.\n"
189            "  2. Call check_progress until all_done=true.\n"
190            "  3. Call collect_results — REQUIRED before final answer.\n"
191            "  4. Report collect_results output verbatim."
192        ),
193        depends_on=["planner_agent"],
194    ),
195    PipelineNode(
196        agent_id="analyzer_agent",
197        task_description=(
198            "Call analyze_convergence with a list of dicts (keys: "
199            "'n_samples', 'absolute_error').  Report all metrics verbatim."
200        ),
201        depends_on=["runner_agent"],
202    ),
203    PipelineNode(
204        agent_id="reporter_agent",
205        task_description=(
206            "Write a structured report.\n\n"
207            "STRICT workflow:\n"
208            "  1. Call format_results_table to get a Markdown table.\n"
209            "  2. COPY-PASTE the actual table into your final answer.\n"
210            "  3. Include sections: Plan, Results Table, Parallel Execution,\n"
211            "     Convergence Analysis, Quality Assessment, Recommendations.\n\n"
212            "Never invent data — use only data from upstream agents."
213        ),
214        depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
215    ),
216])
217
218
219# ===========================================================================
220# Helpers
221# ===========================================================================
222
223def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
224                       approval_filter=None,
225                       max_tool_call_iterations=20,
226                       memory=None,
227                       summarizer_inference_queue=None):
228    """Build kwargs for create_sub_agent().
229
230    NEW in this example:
231      ``memory`` — MemoryConfig controlling conversation history pruning
232      ``summarizer_inference_queue`` — separate queue for the summarizer LLM
233    """
234    return {
235        "config": AgentConfig(
236            agent_id=agent_id, name=name, role=role,
237            inference_queue=inference_queue,
238            summarizer_inference_queue=summarizer_inference_queue,
239            approval_filter=approval_filter,
240            max_tool_call_iterations=max_tool_call_iterations,
241            memory=memory,
242        ),
243        "tool_registry": registry,
244        "shutdown_event": Event(),
245        "reply_queue": Queue(),
246    }
247
248
249def _start_agents(specs, policies):
250    procs = []
251    for spec, policy in zip(specs, policies):
252        p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
253        p.start()
254        procs.append(p)
255    queues = {}
256    for spec in specs:
257        aid = spec["config"].agent_id
258        queues[aid] = spec["reply_queue"].get()
259        print(f"[startup] Agent '{aid}' ready.", flush=True)
260    return procs, queues
261
262
263# ===========================================================================
264# Main
265# ===========================================================================
266
267async def main():
268    # -------------------------------------------------------------------
269    # 1. Main inference pipeline
270    # -------------------------------------------------------------------
271    input_queue = Queue()
272
273    print("[startup] Initializing main inference pipeline...", flush=True)
274
275    inference_pipeline = None
276    try:
277        inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
278        inference_pipeline.initialize()
279    except Exception as exc:
280        import traceback
281        print(f"\n[FATAL] Main inference pipeline failed to initialize: {exc}", flush=True)
282        traceback.print_exc()
283        if inference_pipeline is not None:
284            inference_pipeline.destroy()
285        return
286    print("[startup] Main inference pipeline ready.\n", flush=True)
287
288    # -------------------------------------------------------------------
289    # ← NEW: 2. Summarizer inference pipeline (separate model, separate GPU)
290    # -------------------------------------------------------------------
291    summarizer_input_queue = Queue()
292
293    print("[startup] Initializing summarizer inference pipeline...", flush=True)
294    print(f"[startup] Summarizer model: {SUMMARIZER_MODEL_NAME}", flush=True)
295
296    summarizer_pipeline = None
297    try:
298        summarizer_pipeline = Inference(SUMMARIZER_CONFIG, summarizer_input_queue)
299        summarizer_pipeline.initialize()
300    except Exception as exc:
301        import traceback
302        print(f"\n[FATAL] Summarizer pipeline failed to initialize: {exc}", flush=True)
303        traceback.print_exc()
304        inference_pipeline.destroy()
305        if summarizer_pipeline is not None:
306            summarizer_pipeline.destroy()
307        return
308    print("[startup] Summarizer inference pipeline ready.\n", flush=True)
309
310    my_alloc = System()
311    node_list = my_alloc.nodes
312    compute_host = (
313        Node(node_list[1]).hostname if len(node_list) > 1
314        else Node(node_list[0]).hostname
315    )
316    compute_policy = Policy(
317        placement=Policy.Placement.HOST_NAME,
318        host_name=compute_host,
319    )
320
321    procs, agent_specs = [], []
322    try:
323        agent_specs = [
324            _make_agent_kwargs(
325                "planner_agent", "Experiment Planner",
326                "You are an experiment planner for Monte Carlo convergence "
327                "studies on an HPC cluster.  Propose plans via "
328                "propose_experiment.  Revise on feedback.",
329                planner_registry, input_queue,
330                approval_filter=lambda n, a: n == "propose_experiment",
331                max_tool_call_iterations=60,
332                # ← NEW: SLIDING_WINDOW — drop old turns beyond max_kept_turns
333                memory=MemoryConfig(
334                    strategy=MemoryStrategy.SLIDING_WINDOW,
335                    max_kept_turns=3,
336                ),
337            ),
338            _make_agent_kwargs(
339                "runner_agent", "Parallel Simulation Runner",
340                "You manage parallel Monte Carlo simulations.\n"
341                "You MUST call all three tools in order: "
342                "launch_experiment → check_progress → collect_results.\n"
343                "NEVER give a final answer without calling collect_results first.",
344                runner_registry, input_queue,
345                approval_filter=lambda n, a: n == "launch_experiment",
346                max_tool_call_iterations=60,
347                # ← NEW: SUMMARIZE — compress old turns via LLM
348                memory=MemoryConfig(
349                    strategy=MemoryStrategy.SUMMARIZE,
350                    max_kept_turns=1,
351                    summarize_after_turns=2,
352                ),
353                # ← NEW: use dedicated summarizer LLM
354                summarizer_inference_queue=summarizer_input_queue,
355            ),
356            _make_agent_kwargs(
357                "analyzer_agent", "Convergence Analyzer",
358                "Analyse convergence.  Call analyze_convergence with "
359                "n_samples + absolute_error dicts.  Report verbatim.",
360                analyzer_registry, input_queue,
361                # No memory config → FULL (default) — keep everything
362            ),
363            _make_agent_kwargs(
364                "reporter_agent", "Report Writer",
365                "Write a structured report with Markdown tables.  "
366                "Include parallel execution details.  "
367                "Always include tool output verbatim.",
368                reporter_registry, input_queue,
369                # No memory config → FULL (default)
370            ),
371        ]
372        policies = [None, compute_policy, None, None]
373
374        procs, queues = _start_agents(agent_specs, policies)
375        for spec in agent_specs:
376            spec["config"].input_queue = queues[spec["config"].agent_id]
377
378        orchestrator = DAGOrchestrator(
379            config=OrchestratorConfig(
380                agents=[s["config"] for s in agent_specs],
381                poll_interval=0.5,
382                poll_timeout=14400.0,
383            ),
384            pipeline=pipeline,
385        )
386
387        user_input = (
388            "Design and run a Monte Carlo convergence experiment for π.  "
389            "I want to review the plan first, then approve the parallel "
390            "launch.  Monitor progress and report which nodes ran what."
391        )
392
393        batch = Batch()
394        try:
395            print("=" * 60, flush=True)
396            print("Dragon AI — 04 Memory Strategies + Summarizer LLM", flush=True)
397            print("=" * 60, flush=True)
398            print(f"Request: {user_input}\n", flush=True)
399
400            # Print memory strategy summary
401            print("Memory strategies:", flush=True)
402            print("  planner_agent  → SLIDING_WINDOW (max_kept_turns=3)", flush=True)
403            print("  runner_agent   → SUMMARIZE (dedicated summarizer LLM)", flush=True)
404            print("  analyzer_agent → FULL (default)", flush=True)
405            print("  reporter_agent → FULL (default)", flush=True)
406            print(flush=True)
407
408            if orchestrator.hitl_address:
409                h, p = orchestrator.hitl_address
410                print(
411                    f"HITL approval client — start in another terminal:\n"
412                    f"  python -m dragon.ai.agent.hitl --tcp {h}:{p}\n"
413                    f"  Options:\n"
414                    f"    --jsonl PATH        Custom JSONL path (default: auto-generated)\n"
415                    f"    --report PATH       Custom .txt report path\n"
416                    f"    --no-color          Disable ANSI colours\n",
417                    flush=True,
418                )
419
420            result = orchestrator.run(
421                user_input=user_input,
422                batch=batch,
423            )
424
425            print("\n" + "=" * 60, flush=True)
426            print("FINAL RESULT", flush=True)
427            print("=" * 60, flush=True)
428            print(result, flush=True)
429
430        except Exception as exc:
431            import traceback
432            print(f"\n[error] Pipeline failed: {exc}", flush=True)
433            traceback.print_exc()
434        finally:
435            orchestrator.destroy()
436            batch.join()
437            batch.destroy()
438
439    except Exception as exc:
440        import traceback
441        print(f"\n[error] Fatal: {exc}", flush=True)
442        traceback.print_exc()
443    finally:
444        cleanup_experiment_state()
445        for spec in agent_specs:
446            try:
447                spec["shutdown_event"].set()
448            except Exception:
449                pass
450        for p in procs:
451            try:
452                p.join()
453            except Exception:
454                pass
455        print("\n[teardown] All agents stopped.", flush=True)
456        try:
457            inference_pipeline.destroy()
458        except Exception:
459            pass
460        print("[teardown] Main inference pipeline stopped.", flush=True)
461        try:
462            summarizer_pipeline.destroy()
463        except Exception:
464            pass
465        print("[teardown] Summarizer inference pipeline stopped.", flush=True)
466
467
468if __name__ == "__main__":
469    mp.set_start_method("dragon")
470    asyncio.run(main())

Key Concepts

SLIDING_WINDOW:

Keep the most recent max_kept_turns conversation turns. Drop older turns and replace with a synthetic “earlier context” note. Lowest memory and token cost; suitable for short-term reasoning tasks.

SUMMARIZE:

Keep the last max_kept_turns turns in full detail. Older turns are compressed into a summary (generated by a dedicated small LLM) inserted before the kept turns. Good for long-running workflows where full history matters but you want to control latency.

FULL:

Keep all conversation turns from the start. Suitable only for short-lived agents (will run out of context window eventually). Good for understanding agent reasoning in logs.

Dedicated Summarizer LLM:

By running the summarizer on a separate GPU partition (different node, smaller model), the main reasoning agent stays uncontended. This improves throughput and latency of the critical reasoning path.

Installation

See Example 01 (same dependencies).

System Description

Tested on HPE Cray EX:

  • Node 0 (main inference): 2 Nvidia A100 GPUs

  • Node 1 (summarizer): 1 Nvidia A100 GPU

  • Total: 2 compute nodes required

How to Run

Step 1: Edit model paths

Open 04_memory.py and set:

  • MODEL_NAME (main reasoning model, e.g., 70B Llama)

  • SUMMARIZER_MODEL_NAME (small model, e.g., 7B Llama)

Step 2: Allocate nodes

salloc --nodes=2 --exclusive

Step 3: Run

dragon 04_memory.py

Example output:

$ dragon 04_memory.py
Node 0: Starting main inference service (70B model, tp_size=2)
Node 1: Starting summarizer service (7B model, tp_size=1)
Planner (SLIDING_WINDOW): Turn 1, kept_turns=1/5
Planner (SLIDING_WINDOW): Turn 2, kept_turns=2/5
...
Planner (SLIDING_WINDOW): Turn 6, kept_turns=5/5 (dropped turn 1)
Runner (SUMMARIZE): Turn 1, kept_turns=1/10
...
Runner (SUMMARIZE): Turn 11, kept_turns=10/10 + summary of turns 1-5
All agents completed

For HITL approval (in a second terminal):

python -m dragon.ai.agent.hitl_client --tcp HOST:PORT

Next Steps

  • 05 — MCP Tools (integrate remote Model Context Protocol servers)