DDict Function Smoothing Example
This example demonstrates how to use DragonHPC’s DDict distributed dictionary with checkpointing for a simple distributed smoothing algorithm. It also demonstrates how to use persistence in the DDict to be able to restart from a persisted checkpoint.
The Smoothing Algorithm
The algorithm presented here is primarily intended to showcase a few aspects of the DDict including:
Data sharing across processes
Checkpointing as implemented by the DDict for process synchronization
Checkpoint persistence when a restart of a long computation might be necessary
The smoothing algorithm implements a simple 1D averaging filter where each
element in the array is replaced by the average of itself and its immediate
neighbors. For an array element at position i, the smoothed value becomes:
smoothed[i] = (array[i-1] + array[i] + array[i+1]) / 3
The algorithm proceeds as follows:
It first divides a 1D array into
Nchunks.Then it stores each chunk in a distributed dictionary (
DDict).Once initialized, it launches one Dragon worker process per chunk.
Each worker repeatedly smooths its chunk and at chunk boundaries it uses neighbor values from adjacent chunks.
After each iteration, the worker advances the DDict checkpoint using
ddict.checkpoint().It then writes its updated chunk vector to the DDict for use on the next iteration.
The example supports both in-memory communication between processes and persisted checkpointing.
The entire program is provided at https://github.com/DragonHPC/dragon/tree/main/examples/dragon_data/ddict/smoothing.py and in Listing 14 for your reference.
The algorithm begins by firing up N workers to work on the chunks. Each worker
averages the values of the function by averaging three value at i, i-1, and
i+1 for all i in the chunk assigned to the worker. This works great for the
interior values of the chunk, but the values at the left and right boundaries
depend on values from workers working on neighboring chunks. The code in
Listing 10 shows how a worker retrieves the data it needs from the
chunks to the left or right of its chunk.
1 if chunk_id > 0:
2 left_segment = np.array(ddict[f"chunk_{chunk_id - 1}"])
3 left_val = left_segment[-1]
4 if chunk_id < num_chunks - 1:
5 right_segment = np.array(ddict[f"chunk_{chunk_id + 1}"])
6 right_val = right_segment[0]
Again, since the array is distributed across multiple chunks, each worker process is responsible for smoothing one chunk. However, to compute the smoothed values at the chunk boundaries, workers need access to values from neighboring chunks:
The worker for chunk
kneeds the last element of chunkk-1to smooth its first element.The worker for chunk
kneeds the first element of chunkk+1to smooth its last element.
Workers coordinate through the DDict to read neighbor values before computing each smoothing iteration. This creates a distributed dependency pattern where workers synchronize their reads and writes through the checkpoint mechanism.
The code on lines 2 and 5 of Listing 10 blocks if the data is not available. Initially, all chunks are written to the DDict at checkpoint 0 and since each worker starts at checkpoint 0, they will not block on the first iteration of the algorithm. For all but the first iteration the worker must wait for the next computed average value to compute the average of both the first and last values of the chunk as exhibited in Listing 10.
1 current = smooth_segment(current, left_val, right_val)
2
3 ddict.checkpoint()
4
5 ddict[f"chunk_{chunk_id}"] = current
Once the average of a chunk is completely computed, that new computed value is
the value to be used for the next iteration of the worker. In
Listing 11 the new value is computed by the call to
smooth-segment. Then checkpoint is called to increment the worker’s
checkpoint id, indicating that the client has completed it’s computation at step
i of the iteration. Then on line 5 it stores its new average for the segment to
be used by its neighbors on iteration i+1 of the algorithm. Any client
trying to load a neighbor’s chunk on lines 2 or 4 of Listing 10 will
block until that neighbor has executed line 5 of Listing 11. The
DDict provides the synchronization necesssary to block the workers until data is
available at the current checkpoint.
1 # These two kv pairs do not change on each iteration. They are the same
2 # across all iterations and as such are put into the DDict as persistent
3 # kv pairs. This means they are not affected by checkpointing.
4 ddict.pput("num_chunks", num_chunks)
5 ddict.pput("chunk_size", chunk_size)
There is one additional snippet of code worth mentioning. In Listing 12
the algorithm initially stores num_chunks and chunk_size as two constants
for use by workers. These constant values don’t change and as such they are
called persistent key/value pairs. The don’t change so they are not affected
by checkpointing. If an algorithm needs to store values that do not change with
each iteration, then they probably should be stored using pput as persistent
key/value pairs.
In summary, the algorithm demonstrates how distributed computations can be checkpointed at regular intervals to handle synchronization and communication between workers. The next section describes how checkpoints may be optionally persisted in this example program, demonstrating how persisted checkpoints allow for fault tolerance and the ability to resume long-running computations from intermediate states.
Persistence of Checkpoints
Should it be necessary, checkpoints can be persisted to longer-term storage like disk. For example, this might be necessary for longer computations that may be prone to hardware failure because of the scale at which they run. In such cases, resuming the computation from a persisted checkpoint may be required.
1 if persister_class is not None:
2 kwargs.update(
3 {
4 "persister_class": persister_class,
5 "persist_freq": args.persist_frequency,
6 "persist_count": args.persist_count,
7 "persist_path": args.persist_path,
8 "name": args.name,
9 }
10 )
No additional code is necessary when persisting checkpoints to disk. The DDict handles that by writing persisted checkpoints to disk on some user-specified frequency. When constructing the DDict you can specify four persistence arguments as shown in Listing 13.
persister_class- This selects how you want the checkpoints persisted. It is possible for user to specify their own persister class. Two persister classes are provided with Dragon, a Posix file system persister, and a DAOS persister.
persist_frequency- This is the frequency that checkpoints should be persisted. With this argument it is possible to only persist every so often as opposed to each checkpoint.
persist_count- This is the number of persisted checkpoints that are kept during a run. With this argument, the DDict is instructed to only keep the lastpersist_countcheckpoints by deleting the oldest when a new one is created beyond thepersist_count.
persist_path- The path (persister_classspecific) where the persisted checkpoints should be stored.
To test persistence, you can run the program to persist checkpoints first, then run it again to restore from a persisted checkpoint and it will resume from the last persisted checkpoint. The various run modes and usage for this program are as follows.
Modes
validate: runs the smoothing algorithm without persistence, purely in-memory.persist: runs the smoothing algorithm while persisting checkpoints using the configured persister.restore: restores the latest persisted checkpoint and continues smoothing from that point.
Usage
This first example runs the smoothing program to validate that it runs to completion and utilizes checkpointing correctly.
1examples/dragon_data/ddict $ dragon ddict_smoothing.py --mode validate --size 128 --chunks 4 --iterations 10
2DDict built in validate mode, populating initial chunks...
3Initial chunks populated, starting smoothing iterations...
4Starting DDict smoothing: mode=validate chunks=4 iterations=10 checkpoint=0
5Finished smoothing mode=validate checkpoint=10 result min=0.012784 max=0.987216
6+++ head proc exited, code 0
This mode of running it runs the program with peristent checkpointing to create some sample persisted checkpoints which can then be used in the restart mode.
1examples/dragon_data/ddict $ mkdir ddict_checkpoints
2examples/dragon_data/ddict $ dragon ddict_smoothing.py --mode persist --size 128 --chunks 4 --iterations 10 --persist_path ./ddict_checkpoints
3Starting DDict smoothing: mode=persist chunks=4 iterations=10 checkpoint=0
4Finished smoothing mode=persist checkpoint=10 result min=0.012784 max=0.987216
5Persisted checkpoint ids: [3, 11, 12, 13, 14]
6+++ head proc exited, code 0
Finally, this mode will restart from the last persisted checkpoint and run for the number of specified iterations from that last checkpoint.
1examples/dragon_data/ddict $ dragon ddict_smoothing.py --mode restore --resume_iterations 8 --persist_path ./ddict_checkpoints
2Restoring persisted checkpoint 14
3Starting DDict smoothing: mode=restore chunks=4 iterations=8 checkpoint=14
4Finished smoothing mode=restore checkpoint=22 result min=0.020463 max=0.979537
5Restored to checkpoint 14 and continued smoothing.
6+++ head proc exited, code 0
Arguments
--size: total length of the array to smooth.--chunks: number of distributed chunks / worker processes.--iterations: number of smoothing iterations to perform.--resume_iterations: number of iterations to run after restore.--working_set_size: checkpoint working set size in the DDict.--persist_path: directory or storage path for persisted checkpoints.--persister: usePOSIXorDAOScheckpoint backends.--trace: print per-chunk progress during smoothing.
Full Program
1"""
2Distributed smoothing algorithm using DragonHPC DDict with checkpointing.
3
4This example demonstrates a distributed 1D smoothing algorithm where an array is divided
5into chunks and each chunk is processed by a separate worker process. The DDict is used
6to store the chunk data and checkpointing allows for fault tolerance and resumability.
7
8To run this example, you must use the 'dragon' command instead of 'python3' because
9DragonHPC requires the runtime to be initialized:
10
11 # Validate mode (no persistence):
12 dragon ddict_smoothing.py --mode validate --size 128 --chunks 4 --iterations 10
13
14 # Persist checkpoints:
15 dragon ddict_smoothing.py --mode persist --size 128 --chunks 4 --iterations 10 --persist_path ./ddict_checkpoints
16
17 # Restore and continue from persisted checkpoint:
18 dragon ddict_smoothing.py --mode restore --resume_iterations 10 --persist_path ./ddict_checkpoints
19"""
20import dragon
21import argparse
22import multiprocessing as mp
23from typing import Optional
24
25
26import numpy as np
27
28from dragon.data.ddict import DDict, PosixCheckpointPersister, DAOSCheckpointPersister
29from dragon.native.machine import System
30
31
32def build_ddict(args, persister_class=None):
33 nnodes = System().nnodes
34 kwargs = {
35 "wait_for_keys": True,
36 "working_set_size": args.working_set_size,
37 "timeout": 200,
38 }
39
40 # [smooth-kwargs-start]
41 if persister_class is not None:
42 kwargs.update(
43 {
44 "persister_class": persister_class,
45 "persist_freq": args.persist_frequency,
46 "persist_count": args.persist_count,
47 "persist_path": args.persist_path,
48 "name": args.name,
49 }
50 )
51 # [smooth-kwargs-end]
52
53 return DDict(args.managers_per_node, nnodes, nnodes * int(4 * 1024 * 1024 * 1024), **kwargs)
54
55
56def smooth_segment(segment: np.ndarray, left_val: Optional[float], right_val: Optional[float]) -> np.ndarray:
57 if segment.size == 0:
58 return segment
59
60 left_pad = segment[0] if left_val is None else left_val
61 right_pad = segment[-1] if right_val is None else right_val
62
63 if segment.size == 1:
64 return np.array([(left_pad + segment[0] + right_pad) / 3.0], dtype=segment.dtype)
65
66 smoothed = segment.copy()
67 smoothed[0] = (left_pad + segment[0] + segment[1]) / 3.0
68 smoothed[-1] = (segment[-2] + segment[-1] + right_pad) / 3.0
69 if segment.size > 2:
70 smoothed[1:-1] = (segment[:-2] + segment[1:-1] + segment[2:]) / 3.0
71
72 return smoothed
73
74
75def worker_smooth(ddict: DDict, chunk_id: int, iterations: int, checkpoint_id: int, trace: bool) -> None:
76 ddict.checkpoint_id = checkpoint_id
77 num_chunks = ddict["num_chunks"]
78 # Load the initial chunk data for this worker
79 current = np.array(ddict[f"chunk_{chunk_id}"])
80
81 for step in range(iterations):
82 left_val = None
83 right_val = None
84
85 # [ start-data-sharing-ref ]
86 if chunk_id > 0:
87 left_segment = np.array(ddict[f"chunk_{chunk_id - 1}"])
88 left_val = left_segment[-1]
89 if chunk_id < num_chunks - 1:
90 right_segment = np.array(ddict[f"chunk_{chunk_id + 1}"])
91 right_val = right_segment[0]
92 # [ end-data-sharing-ref ]
93
94 # [start-compute-avg]
95 current = smooth_segment(current, left_val, right_val)
96
97 ddict.checkpoint()
98
99 ddict[f"chunk_{chunk_id}"] = current
100 # [end-compute-avg]
101
102 if trace:
103 print(
104 f"chunk={chunk_id} iter={step} checkpoint={ddict.checkpoint_id} min={current.min():.5f} max={current.max():.5f}",
105 flush=True,
106 )
107
108 ddict.detach()
109
110
111def populate_initial_chunks(ddict: DDict, size: int, num_chunks: int) -> None:
112 chunk_size = size // num_chunks
113 if chunk_size * num_chunks != size:
114 raise ValueError("size must be divisible by num_chunks")
115
116 # [start-pput]
117 # These two kv pairs do not change on each iteration. They are the same
118 # across all iterations and as such are put into the DDict as persistent
119 # kv pairs. This means they are not affected by checkpointing.
120 ddict.pput("num_chunks", num_chunks)
121 ddict.pput("chunk_size", chunk_size)
122 # [end-pput]
123
124 values = np.linspace(0.0, 1.0, size, dtype=np.float64)
125 for chunk_id in range(num_chunks):
126 start = chunk_id * chunk_size
127 end = start + chunk_size
128 ddict[f"chunk_{chunk_id}"] = values[start:end]
129
130
131def collect_result(ddict: DDict) -> np.ndarray:
132 num_chunks = ddict["num_chunks"]
133 chunk_size = ddict["chunk_size"]
134 result = np.zeros(num_chunks * chunk_size, dtype=np.float64)
135
136 for chunk_id in range(num_chunks):
137 result[chunk_id * chunk_size : (chunk_id + 1) * chunk_size] = np.array(ddict[f"chunk_{chunk_id}"])
138
139 return result
140
141
142def parse_args():
143 parser = argparse.ArgumentParser(
144 description="Distributed smoothing algorithm using DDict checkpointing",
145 epilog="Note: Launch with 'dragon' command, not 'python3'"
146 )
147 parser.add_argument("--mode", choices=["validate", "persist", "restore"], default="validate")
148 parser.add_argument("--size", type=int, default=128, help="Total length of the array to smooth")
149 parser.add_argument("--chunks", type=int, default=4, help="Number of distributed chunks")
150 parser.add_argument("--iterations", type=int, default=10, help="Number of smoothing iterations")
151 parser.add_argument("--resume_iterations", type=int, default=10, help="Iterations to run after restore")
152 parser.add_argument("--managers_per_node", type=int, default=1)
153 parser.add_argument("--working_set_size", type=int, default=3)
154 parser.add_argument("--persist_path", type=str, default="./ddict_checkpoints")
155 parser.add_argument("--persist_count", type=int, default=5)
156 parser.add_argument("--persist_frequency", type=int, default=1)
157 parser.add_argument("--persister", choices=["POSIX", "DAOS"], default="POSIX")
158 parser.add_argument("--trace", action="store_true")
159 parser.add_argument("--name", type=str, default="ddict_smoothing_example")
160 return parser.parse_args()
161
162
163def main():
164 args = parse_args()
165 mp.set_start_method("dragon")
166
167 if args.persister == "POSIX":
168 persister_class = PosixCheckpointPersister
169 else:
170 persister_class = DAOSCheckpointPersister
171
172 if args.mode == "validate":
173 ddict = build_ddict(args)
174 print("DDict built in validate mode, populating initial chunks...", flush=True)
175 populate_initial_chunks(ddict, args.size, args.chunks)
176 print("Initial chunks populated, starting smoothing iterations...", flush=True)
177 num_iterations = args.iterations
178 checkpoint_id = 0
179 elif args.mode == "persist":
180 ddict = build_ddict(args, persister_class=persister_class)
181 populate_initial_chunks(ddict, args.size, args.chunks)
182 num_iterations = args.iterations
183 checkpoint_id = 0
184 else:
185 ddict = build_ddict(args, persister_class=persister_class)
186 available = ddict.persisted_ids()
187 if not available:
188 raise RuntimeError("No persisted checkpoints found to restore.")
189 checkpoint_id = available[-1]
190 print(f"Restoring persisted checkpoint {checkpoint_id}", flush=True)
191 ddict.restore(checkpoint_id)
192 num_iterations = args.resume_iterations
193
194 print(
195 f"Starting DDict smoothing: mode={args.mode} chunks={args.chunks} iterations={num_iterations} checkpoint={ddict.checkpoint_id}",
196 flush=True,
197 )
198
199 processes = []
200 for chunk_id in range(args.chunks):
201 proc = mp.Process(target=worker_smooth, args=(ddict, chunk_id, num_iterations, checkpoint_id, args.trace))
202 proc.start()
203 processes.append(proc)
204
205 for proc in processes:
206 proc.join()
207
208 ddict.sync_to_newest_checkpoint()
209 result = collect_result(ddict)
210
211 print(
212 f"Finished smoothing mode={args.mode} checkpoint={ddict.checkpoint_id} result min={result.min():.6f} max={result.max():.6f}",
213 flush=True,
214 )
215
216 if args.mode == "persist":
217 print(f"Persisted checkpoint ids: {ddict.persisted_ids()}", flush=True)
218 elif args.mode == "restore":
219 print(f"Restored to checkpoint {checkpoint_id} and continued smoothing.", flush=True)
220
221 ddict.destroy()
222
223
224if __name__ == "__main__":
225 main()