Memory Management Strategies
Building on Examples 01–03, this example demonstrates memory management for long-running agentic loops. As conversations grow with many LLM turns, agents can manage history via different strategies: sliding window (drop old turns), summarization (compress via a dedicated small LLM), or keep-all. The example also shows a dedicated summarizer LLM on a separate GPU partition to avoid contention with the main reasoning model.
Prerequisites: Read 03_hitl_approval.py first.
What you’ll learn:
How to configure memory strategies for different agents
SLIDING_WINDOW— drop old turns, keep last N; lowest costSUMMARIZE— keep last N turns + a summary of older ones (via separate LLM)FULL— keep everything (suitable for short-lived agents only)How to set up a dedicated summarizer LLM on a separate node
How to balance context preservation with token usage and latency
Architecture:
Node 0, GPUs [0,1]: Main inference service (large reasoning LLM, tp_size=2)
Node 1, GPU [0]: Summarizer service (small LLM, tp_size=1)
Main Code
Below is the complete example:
1"""04 — Memory Management Strategies + Dedicated Summarizer LLM.
2
3**Prerequisites:** Read ``03_hitl_approval.py`` first.
4
5**What's new:**
6
7* **MemoryConfig** and **MemoryStrategy** — control how agents manage
8 conversation history as the agentic loop grows
9* **Three strategies side-by-side:**
10
11 - ``planner_agent`` → ``SLIDING_WINDOW`` — drops old turn-pairs beyond
12 ``max_kept_turns``, replacing them with a synthetic note
13 - ``runner_agent`` → ``SUMMARIZE`` — calls a second LLM to compress
14 old turns into a summary instead of dropping them
15 - ``analyzer_agent`` / ``reporter_agent`` → ``FULL`` (default) — keep
16 everything (fine for short-lived agents)
17
18* **Dedicated summarizer LLM** — a separate, smaller model on its own
19 GPU partition, used only for summarization. The main model stays
20 uncontended for reasoning and tool-calling.
21
22Architecture::
23
24 Node 0 GPUs [0,1] → Main InferenceWorker (large model, tp_size=2)
25 Node 1 GPU [0] → Summarizer Worker (small model, tp_size=1)
26
27Usage::
28
29 # Terminal 1 (needs 2 nodes with GPUs):
30 dragon 04_memory.py
31
32 # Terminal 2 (HITL approval client):
33 python -m dragon.ai.agent.hitl --tcp HOST:PORT
34"""
35
36import asyncio
37from typing import Any
38
39import dragon
40import multiprocessing as mp
41
42from dragon.ai.agent.core import create_sub_agent
43from dragon.ai.agent.config import (
44 AgentConfig,
45 MemoryConfig, # ← NEW
46 MemoryStrategy, # ← NEW
47 OrchestratorConfig,
48 Pipeline,
49 PipelineNode,
50)
51from dragon.ai.agent.tools import ToolRegistry
52from dragon.ai.agent.orchestrator import DAGOrchestrator
53from dragon.infrastructure.policy import Policy
54from dragon.native.event import Event
55from dragon.native.machine import Node, System
56from dragon.native.process import Process
57from dragon.native.queue import Queue
58from dragon.workflows.batch import Batch
59
60from dragon.ai.inference.config import (
61 BatchingConfig,
62 HardwareConfig,
63 InferenceConfig,
64 ModelConfig,
65)
66from dragon.ai.inference.inference_utils import Inference
67
68from tools import (
69 propose_experiment,
70 launch_experiment,
71 check_progress,
72 collect_results,
73 analyze_convergence,
74 format_results_table,
75)
76from tools.runner import cleanup_experiment_state
77
78
79# ===========================================================================
80# User-configurable constants
81# ===========================================================================
82
83MODEL_NAME = "/path/to/your/model"
84HF_TOKEN = ""
85
86# ← NEW: Smaller instruct model used exclusively for summarization
87SUMMARIZER_MODEL_NAME = "/path/to/your/summarizer/model"
88
89
90# ===========================================================================
91# Inference Pipeline Configuration — Main (reasoning + tool-calling)
92# ===========================================================================
93
94INFERENCE_CONFIG = InferenceConfig(
95 model=ModelConfig(
96 model_name=MODEL_NAME,
97 hf_token=HF_TOKEN,
98 tp_size=2,
99 max_tokens=8192,
100 max_model_len=32768,
101 ),
102 hardware=HardwareConfig(
103 num_nodes=1,
104 num_gpus=2,
105 num_inf_workers_per_cpu=1,
106 ),
107 batching=BatchingConfig(
108 batch_wait_seconds=0.1,
109 max_batch_size=32,
110 ),
111)
112
113
114# ===========================================================================
115# ← NEW: Inference Pipeline Configuration — Summarizer
116#
117# Placed on the SECOND node (node_offset=1) so the main model's GPUs on
118# node 0 are not contended. A small instruct model fits on a single GPU.
119# ===========================================================================
120
121SUMMARIZER_CONFIG = InferenceConfig(
122 model=ModelConfig(
123 model_name=SUMMARIZER_MODEL_NAME,
124 hf_token=HF_TOKEN,
125 tp_size=1,
126 max_tokens=2048,
127 max_model_len=8192,
128 ),
129 hardware=HardwareConfig(
130 num_nodes=1,
131 num_gpus=1,
132 node_offset=1, # ← start from node 1 (second node)
133 num_inf_workers_per_cpu=1,
134 ),
135 batching=BatchingConfig(
136 batch_wait_seconds=0.05,
137 max_batch_size=8,
138 ),
139)
140
141
142# ===========================================================================
143# Tool registries
144# ===========================================================================
145
146planner_registry = ToolRegistry()
147planner_registry.register(propose_experiment)
148
149runner_registry = ToolRegistry()
150runner_registry.register(launch_experiment)
151runner_registry.register(check_progress)
152runner_registry.register(collect_results)
153
154analyzer_registry = ToolRegistry()
155analyzer_registry.register(analyze_convergence)
156
157reporter_registry = ToolRegistry()
158reporter_registry.register(format_results_table)
159
160
161# ===========================================================================
162# DAG pipeline
163# ===========================================================================
164
165pipeline = Pipeline(nodes=[
166 PipelineNode(
167 agent_id="planner_agent",
168 task_description=(
169 "You are a scientific experiment planner. The user wants to study "
170 "Monte Carlo convergence for estimating π.\n\n"
171 "Propose an experiment plan by calling propose_experiment with:\n"
172 " - description, sample_sizes, convergence_target, methodology\n\n"
173 "A human operator will review your plan. If they provide "
174 "feedback, revise and resubmit. Once approved, report verbatim."
175 ),
176 depends_on=[],
177 ),
178 PipelineNode(
179 agent_id="runner_agent",
180 task_description=(
181 "You manage parallel Monte Carlo simulations on an HPC cluster.\n\n"
182 "Tools:\n"
183 " 1. launch_experiment(sample_sizes, seeds) — launches ALL "
184 "simulations in parallel. Operator must approve.\n"
185 " 2. check_progress() — shows done/running/pending status.\n"
186 " 3. collect_results() — call ONLY when all_done=true.\n\n"
187 "STRICT workflow:\n"
188 " 1. Call launch_experiment.\n"
189 " 2. Call check_progress until all_done=true.\n"
190 " 3. Call collect_results — REQUIRED before final answer.\n"
191 " 4. Report collect_results output verbatim."
192 ),
193 depends_on=["planner_agent"],
194 ),
195 PipelineNode(
196 agent_id="analyzer_agent",
197 task_description=(
198 "Call analyze_convergence with a list of dicts (keys: "
199 "'n_samples', 'absolute_error'). Report all metrics verbatim."
200 ),
201 depends_on=["runner_agent"],
202 ),
203 PipelineNode(
204 agent_id="reporter_agent",
205 task_description=(
206 "Write a structured report.\n\n"
207 "STRICT workflow:\n"
208 " 1. Call format_results_table to get a Markdown table.\n"
209 " 2. COPY-PASTE the actual table into your final answer.\n"
210 " 3. Include sections: Plan, Results Table, Parallel Execution,\n"
211 " Convergence Analysis, Quality Assessment, Recommendations.\n\n"
212 "Never invent data — use only data from upstream agents."
213 ),
214 depends_on=["planner_agent", "runner_agent", "analyzer_agent"],
215 ),
216])
217
218
219# ===========================================================================
220# Helpers
221# ===========================================================================
222
223def _make_agent_kwargs(agent_id, name, role, registry, inference_queue,
224 approval_filter=None,
225 max_tool_call_iterations=20,
226 memory=None,
227 summarizer_inference_queue=None):
228 """Build kwargs for create_sub_agent().
229
230 NEW in this example:
231 ``memory`` — MemoryConfig controlling conversation history pruning
232 ``summarizer_inference_queue`` — separate queue for the summarizer LLM
233 """
234 return {
235 "config": AgentConfig(
236 agent_id=agent_id, name=name, role=role,
237 inference_queue=inference_queue,
238 summarizer_inference_queue=summarizer_inference_queue,
239 approval_filter=approval_filter,
240 max_tool_call_iterations=max_tool_call_iterations,
241 memory=memory,
242 ),
243 "tool_registry": registry,
244 "shutdown_event": Event(),
245 "reply_queue": Queue(),
246 }
247
248
249def _start_agents(specs, policies):
250 procs = []
251 for spec, policy in zip(specs, policies):
252 p = Process(target=create_sub_agent, kwargs=spec, policy=policy)
253 p.start()
254 procs.append(p)
255 queues = {}
256 for spec in specs:
257 aid = spec["config"].agent_id
258 queues[aid] = spec["reply_queue"].get()
259 print(f"[startup] Agent '{aid}' ready.", flush=True)
260 return procs, queues
261
262
263# ===========================================================================
264# Main
265# ===========================================================================
266
267async def main():
268 # -------------------------------------------------------------------
269 # 1. Main inference pipeline
270 # -------------------------------------------------------------------
271 input_queue = Queue()
272
273 print("[startup] Initializing main inference pipeline...", flush=True)
274
275 inference_pipeline = None
276 try:
277 inference_pipeline = Inference(INFERENCE_CONFIG, input_queue)
278 inference_pipeline.initialize()
279 except Exception as exc:
280 import traceback
281 print(f"\n[FATAL] Main inference pipeline failed to initialize: {exc}", flush=True)
282 traceback.print_exc()
283 if inference_pipeline is not None:
284 inference_pipeline.destroy()
285 return
286 print("[startup] Main inference pipeline ready.\n", flush=True)
287
288 # -------------------------------------------------------------------
289 # ← NEW: 2. Summarizer inference pipeline (separate model, separate GPU)
290 # -------------------------------------------------------------------
291 summarizer_input_queue = Queue()
292
293 print("[startup] Initializing summarizer inference pipeline...", flush=True)
294 print(f"[startup] Summarizer model: {SUMMARIZER_MODEL_NAME}", flush=True)
295
296 summarizer_pipeline = None
297 try:
298 summarizer_pipeline = Inference(SUMMARIZER_CONFIG, summarizer_input_queue)
299 summarizer_pipeline.initialize()
300 except Exception as exc:
301 import traceback
302 print(f"\n[FATAL] Summarizer pipeline failed to initialize: {exc}", flush=True)
303 traceback.print_exc()
304 inference_pipeline.destroy()
305 if summarizer_pipeline is not None:
306 summarizer_pipeline.destroy()
307 return
308 print("[startup] Summarizer inference pipeline ready.\n", flush=True)
309
310 my_alloc = System()
311 node_list = my_alloc.nodes
312 compute_host = (
313 Node(node_list[1]).hostname if len(node_list) > 1
314 else Node(node_list[0]).hostname
315 )
316 compute_policy = Policy(
317 placement=Policy.Placement.HOST_NAME,
318 host_name=compute_host,
319 )
320
321 procs, agent_specs = [], []
322 try:
323 agent_specs = [
324 _make_agent_kwargs(
325 "planner_agent", "Experiment Planner",
326 "You are an experiment planner for Monte Carlo convergence "
327 "studies on an HPC cluster. Propose plans via "
328 "propose_experiment. Revise on feedback.",
329 planner_registry, input_queue,
330 approval_filter=lambda n, a: n == "propose_experiment",
331 max_tool_call_iterations=60,
332 # ← NEW: SLIDING_WINDOW — drop old turns beyond max_kept_turns
333 memory=MemoryConfig(
334 strategy=MemoryStrategy.SLIDING_WINDOW,
335 max_kept_turns=3,
336 ),
337 ),
338 _make_agent_kwargs(
339 "runner_agent", "Parallel Simulation Runner",
340 "You manage parallel Monte Carlo simulations.\n"
341 "You MUST call all three tools in order: "
342 "launch_experiment → check_progress → collect_results.\n"
343 "NEVER give a final answer without calling collect_results first.",
344 runner_registry, input_queue,
345 approval_filter=lambda n, a: n == "launch_experiment",
346 max_tool_call_iterations=60,
347 # ← NEW: SUMMARIZE — compress old turns via LLM
348 memory=MemoryConfig(
349 strategy=MemoryStrategy.SUMMARIZE,
350 max_kept_turns=1,
351 summarize_after_turns=2,
352 ),
353 # ← NEW: use dedicated summarizer LLM
354 summarizer_inference_queue=summarizer_input_queue,
355 ),
356 _make_agent_kwargs(
357 "analyzer_agent", "Convergence Analyzer",
358 "Analyse convergence. Call analyze_convergence with "
359 "n_samples + absolute_error dicts. Report verbatim.",
360 analyzer_registry, input_queue,
361 # No memory config → FULL (default) — keep everything
362 ),
363 _make_agent_kwargs(
364 "reporter_agent", "Report Writer",
365 "Write a structured report with Markdown tables. "
366 "Include parallel execution details. "
367 "Always include tool output verbatim.",
368 reporter_registry, input_queue,
369 # No memory config → FULL (default)
370 ),
371 ]
372 policies = [None, compute_policy, None, None]
373
374 procs, queues = _start_agents(agent_specs, policies)
375 for spec in agent_specs:
376 spec["config"].input_queue = queues[spec["config"].agent_id]
377
378 orchestrator = DAGOrchestrator(
379 config=OrchestratorConfig(
380 agents=[s["config"] for s in agent_specs],
381 poll_interval=0.5,
382 poll_timeout=14400.0,
383 ),
384 pipeline=pipeline,
385 )
386
387 user_input = (
388 "Design and run a Monte Carlo convergence experiment for π. "
389 "I want to review the plan first, then approve the parallel "
390 "launch. Monitor progress and report which nodes ran what."
391 )
392
393 batch = Batch()
394 try:
395 print("=" * 60, flush=True)
396 print("Dragon AI — 04 Memory Strategies + Summarizer LLM", flush=True)
397 print("=" * 60, flush=True)
398 print(f"Request: {user_input}\n", flush=True)
399
400 # Print memory strategy summary
401 print("Memory strategies:", flush=True)
402 print(" planner_agent → SLIDING_WINDOW (max_kept_turns=3)", flush=True)
403 print(" runner_agent → SUMMARIZE (dedicated summarizer LLM)", flush=True)
404 print(" analyzer_agent → FULL (default)", flush=True)
405 print(" reporter_agent → FULL (default)", flush=True)
406 print(flush=True)
407
408 if orchestrator.hitl_address:
409 h, p = orchestrator.hitl_address
410 print(
411 f"HITL approval client — start in another terminal:\n"
412 f" python -m dragon.ai.agent.hitl --tcp {h}:{p}\n"
413 f" Options:\n"
414 f" --jsonl PATH Custom JSONL path (default: auto-generated)\n"
415 f" --report PATH Custom .txt report path\n"
416 f" --no-color Disable ANSI colours\n",
417 flush=True,
418 )
419
420 result = orchestrator.run(
421 user_input=user_input,
422 batch=batch,
423 )
424
425 print("\n" + "=" * 60, flush=True)
426 print("FINAL RESULT", flush=True)
427 print("=" * 60, flush=True)
428 print(result, flush=True)
429
430 except Exception as exc:
431 import traceback
432 print(f"\n[error] Pipeline failed: {exc}", flush=True)
433 traceback.print_exc()
434 finally:
435 orchestrator.destroy()
436 batch.join()
437 batch.destroy()
438
439 except Exception as exc:
440 import traceback
441 print(f"\n[error] Fatal: {exc}", flush=True)
442 traceback.print_exc()
443 finally:
444 cleanup_experiment_state()
445 for spec in agent_specs:
446 try:
447 spec["shutdown_event"].set()
448 except Exception:
449 pass
450 for p in procs:
451 try:
452 p.join()
453 except Exception:
454 pass
455 print("\n[teardown] All agents stopped.", flush=True)
456 try:
457 inference_pipeline.destroy()
458 except Exception:
459 pass
460 print("[teardown] Main inference pipeline stopped.", flush=True)
461 try:
462 summarizer_pipeline.destroy()
463 except Exception:
464 pass
465 print("[teardown] Summarizer inference pipeline stopped.", flush=True)
466
467
468if __name__ == "__main__":
469 mp.set_start_method("dragon")
470 asyncio.run(main())
Key Concepts
SLIDING_WINDOW:
Keep the most recent max_kept_turns conversation turns. Drop older turns
and replace with a synthetic “earlier context” note. Lowest memory and token
cost; suitable for short-term reasoning tasks.
SUMMARIZE:
Keep the last max_kept_turns turns in full detail. Older turns are
compressed into a summary (generated by a dedicated small LLM) inserted
before the kept turns. Good for long-running workflows where full history
matters but you want to control latency.
FULL:
Keep all conversation turns from the start. Suitable only for short-lived agents (will run out of context window eventually). Good for understanding agent reasoning in logs.
Dedicated Summarizer LLM:
By running the summarizer on a separate GPU partition (different node, smaller model), the main reasoning agent stays uncontended. This improves throughput and latency of the critical reasoning path.
Installation
See Example 01 (same dependencies).
System Description
Tested on HPE Cray EX:
Node 0 (main inference): 2 Nvidia A100 GPUs
Node 1 (summarizer): 1 Nvidia A100 GPU
Total: 2 compute nodes required
How to Run
Step 1: Edit model paths
Open 04_memory.py and set:
MODEL_NAME(main reasoning model, e.g., 70B Llama)SUMMARIZER_MODEL_NAME(small model, e.g., 7B Llama)
Step 2: Allocate nodes
salloc --nodes=2 --exclusive
Step 3: Run
dragon 04_memory.py
Example output:
$ dragon 04_memory.py
Node 0: Starting main inference service (70B model, tp_size=2)
Node 1: Starting summarizer service (7B model, tp_size=1)
Planner (SLIDING_WINDOW): Turn 1, kept_turns=1/5
Planner (SLIDING_WINDOW): Turn 2, kept_turns=2/5
...
Planner (SLIDING_WINDOW): Turn 6, kept_turns=5/5 (dropped turn 1)
Runner (SUMMARIZE): Turn 1, kept_turns=1/10
...
Runner (SUMMARIZE): Turn 11, kept_turns=10/10 + summary of turns 1-5
All agents completed
For HITL approval (in a second terminal):
python -m dragon.ai.agent.hitl_client --tcp HOST:PORT
Next Steps
05 — MCP Tools (integrate remote Model Context Protocol servers)