DDict Function Smoothing Example

This example demonstrates how to use DragonHPC’s DDict distributed dictionary with checkpointing for a simple distributed smoothing algorithm. It also demonstrates how to use persistence in the DDict to be able to restart from a persisted checkpoint.

The Smoothing Algorithm

The algorithm presented here is primarily intended to showcase a few aspects of the DDict including:

  • Data sharing across processes

  • Checkpointing as implemented by the DDict for process synchronization

  • Checkpoint persistence when a restart of a long computation might be necessary

The smoothing algorithm implements a simple 1D averaging filter where each element in the array is replaced by the average of itself and its immediate neighbors. For an array element at position i, the smoothed value becomes:

smoothed[i] = (array[i-1] + array[i] + array[i+1]) / 3

The algorithm proceeds as follows:

  • It first divides a 1D array into N chunks.

  • Then it stores each chunk in a distributed dictionary (DDict).

  • Once initialized, it launches one Dragon worker process per chunk.

  • Each worker repeatedly smooths its chunk and at chunk boundaries it uses neighbor values from adjacent chunks.

  • After each iteration, the worker advances the DDict checkpoint using ddict.checkpoint().

  • It then writes its updated chunk vector to the DDict for use on the next iteration.

  • The example supports both in-memory communication between processes and persisted checkpointing.

The entire program is provided at https://github.com/DragonHPC/dragon/tree/main/examples/dragon_data/ddict/smoothing.py and in Listing 14 for your reference.

The algorithm begins by firing up N workers to work on the chunks. Each worker averages the values of the function by averaging three value at i, i-1, and i+1 for all i in the chunk assigned to the worker. This works great for the interior values of the chunk, but the values at the left and right boundaries depend on values from workers working on neighboring chunks. The code in Listing 10 shows how a worker retrieves the data it needs from the chunks to the left or right of its chunk.

Listing 10 Transparent Multi-node Data Sharing
1        if chunk_id > 0:
2            left_segment = np.array(ddict[f"chunk_{chunk_id - 1}"])
3            left_val = left_segment[-1]
4        if chunk_id < num_chunks - 1:
5            right_segment = np.array(ddict[f"chunk_{chunk_id + 1}"])
6            right_val = right_segment[0]

Again, since the array is distributed across multiple chunks, each worker process is responsible for smoothing one chunk. However, to compute the smoothed values at the chunk boundaries, workers need access to values from neighboring chunks:

  • The worker for chunk k needs the last element of chunk k-1 to smooth its first element.

  • The worker for chunk k needs the first element of chunk k+1 to smooth its last element.

Workers coordinate through the DDict to read neighbor values before computing each smoothing iteration. This creates a distributed dependency pattern where workers synchronize their reads and writes through the checkpoint mechanism.

The code on lines 2 and 5 of Listing 10 blocks if the data is not available. Initially, all chunks are written to the DDict at checkpoint 0 and since each worker starts at checkpoint 0, they will not block on the first iteration of the algorithm. For all but the first iteration the worker must wait for the next computed average value to compute the average of both the first and last values of the chunk as exhibited in Listing 10.

Listing 11 Update the Average
1        current = smooth_segment(current, left_val, right_val)
2
3        ddict.checkpoint()
4
5        ddict[f"chunk_{chunk_id}"] = current

Once the average of a chunk is completely computed, that new computed value is the value to be used for the next iteration of the worker. In Listing 11 the new value is computed by the call to smooth-segment. Then checkpoint is called to increment the worker’s checkpoint id, indicating that the client has completed it’s computation at step i of the iteration. Then on line 5 it stores its new average for the segment to be used by its neighbors on iteration i+1 of the algorithm. Any client trying to load a neighbor’s chunk on lines 2 or 4 of Listing 10 will block until that neighbor has executed line 5 of Listing 11. The DDict provides the synchronization necesssary to block the workers until data is available at the current checkpoint.

Listing 12 Persistent Key/Value Pairs in the DDict
1    # These two kv pairs do not change on each iteration. They are the same
2    # across all iterations and as such are put into the DDict as persistent
3    # kv pairs. This means they are not affected by checkpointing.
4    ddict.pput("num_chunks", num_chunks)
5    ddict.pput("chunk_size", chunk_size)

There is one additional snippet of code worth mentioning. In Listing 12 the algorithm initially stores num_chunks and chunk_size as two constants for use by workers. These constant values don’t change and as such they are called persistent key/value pairs. The don’t change so they are not affected by checkpointing. If an algorithm needs to store values that do not change with each iteration, then they probably should be stored using pput as persistent key/value pairs.

In summary, the algorithm demonstrates how distributed computations can be checkpointed at regular intervals to handle synchronization and communication between workers. The next section describes how checkpoints may be optionally persisted in this example program, demonstrating how persisted checkpoints allow for fault tolerance and the ability to resume long-running computations from intermediate states.

Persistence of Checkpoints

Should it be necessary, checkpoints can be persisted to longer-term storage like disk. For example, this might be necessary for longer computations that may be prone to hardware failure because of the scale at which they run. In such cases, resuming the computation from a persisted checkpoint may be required.

Listing 13 Specifying Persistence
 1    if persister_class is not None:
 2        kwargs.update(
 3            {
 4                "persister_class": persister_class,
 5                "persist_freq": args.persist_frequency,
 6                "persist_count": args.persist_count,
 7                "persist_path": args.persist_path,
 8                "name": args.name,
 9            }
10        )

No additional code is necessary when persisting checkpoints to disk. The DDict handles that by writing persisted checkpoints to disk on some user-specified frequency. When constructing the DDict you can specify four persistence arguments as shown in Listing 13.

  • persister_class - This selects how you want the checkpoints persisted. It is possible for user to specify their own persister class. Two persister classes are provided with Dragon, a Posix file system persister, and a DAOS persister.

  • persist_frequency - This is the frequency that checkpoints should be persisted. With this argument it is possible to only persist every so often as opposed to each checkpoint.

  • persist_count - This is the number of persisted checkpoints that are kept during a run. With this argument, the DDict is instructed to only keep the last persist_count checkpoints by deleting the oldest when a new one is created beyond the persist_count.

  • persist_path - The path (persister_class specific) where the persisted checkpoints should be stored.

To test persistence, you can run the program to persist checkpoints first, then run it again to restore from a persisted checkpoint and it will resume from the last persisted checkpoint. The various run modes and usage for this program are as follows.

Modes

  • validate: runs the smoothing algorithm without persistence, purely in-memory.

  • persist: runs the smoothing algorithm while persisting checkpoints using the configured persister.

  • restore: restores the latest persisted checkpoint and continues smoothing from that point.

Usage

This first example runs the smoothing program to validate that it runs to completion and utilizes checkpointing correctly.

1examples/dragon_data/ddict $ dragon ddict_smoothing.py --mode validate --size 128 --chunks 4 --iterations 10
2DDict built in validate mode, populating initial chunks...
3Initial chunks populated, starting smoothing iterations...
4Starting DDict smoothing: mode=validate chunks=4 iterations=10 checkpoint=0
5Finished smoothing mode=validate checkpoint=10 result min=0.012784 max=0.987216
6+++ head proc exited, code 0

This mode of running it runs the program with peristent checkpointing to create some sample persisted checkpoints which can then be used in the restart mode.

1examples/dragon_data/ddict $ mkdir ddict_checkpoints
2examples/dragon_data/ddict $ dragon ddict_smoothing.py --mode persist --size 128 --chunks 4 --iterations 10 --persist_path ./ddict_checkpoints
3Starting DDict smoothing: mode=persist chunks=4 iterations=10 checkpoint=0
4Finished smoothing mode=persist checkpoint=10 result min=0.012784 max=0.987216
5Persisted checkpoint ids: [3, 11, 12, 13, 14]
6+++ head proc exited, code 0

Finally, this mode will restart from the last persisted checkpoint and run for the number of specified iterations from that last checkpoint.

1examples/dragon_data/ddict $ dragon ddict_smoothing.py --mode restore --resume_iterations 8 --persist_path ./ddict_checkpoints
2Restoring persisted checkpoint 14
3Starting DDict smoothing: mode=restore chunks=4 iterations=8 checkpoint=14
4Finished smoothing mode=restore checkpoint=22 result min=0.020463 max=0.979537
5Restored to checkpoint 14 and continued smoothing.
6+++ head proc exited, code 0

Arguments

  • --size: total length of the array to smooth.

  • --chunks: number of distributed chunks / worker processes.

  • --iterations: number of smoothing iterations to perform.

  • --resume_iterations: number of iterations to run after restore.

  • --working_set_size: checkpoint working set size in the DDict.

  • --persist_path: directory or storage path for persisted checkpoints.

  • --persister: use POSIX or DAOS checkpoint backends.

  • --trace: print per-chunk progress during smoothing.

Full Program

Listing 14 The Complete Function Smoothing Program
  1"""
  2Distributed smoothing algorithm using DragonHPC DDict with checkpointing.
  3
  4This example demonstrates a distributed 1D smoothing algorithm where an array is divided
  5into chunks and each chunk is processed by a separate worker process. The DDict is used
  6to store the chunk data and checkpointing allows for fault tolerance and resumability.
  7
  8To run this example, you must use the 'dragon' command instead of 'python3' because
  9DragonHPC requires the runtime to be initialized:
 10
 11    # Validate mode (no persistence):
 12    dragon ddict_smoothing.py --mode validate --size 128 --chunks 4 --iterations 10
 13
 14    # Persist checkpoints:
 15    dragon ddict_smoothing.py --mode persist --size 128 --chunks 4 --iterations 10 --persist_path ./ddict_checkpoints
 16
 17    # Restore and continue from persisted checkpoint:
 18    dragon ddict_smoothing.py --mode restore --resume_iterations 10 --persist_path ./ddict_checkpoints
 19"""
 20import dragon
 21import argparse
 22import multiprocessing as mp
 23from typing import Optional
 24
 25
 26import numpy as np
 27
 28from dragon.data.ddict import DDict, PosixCheckpointPersister, DAOSCheckpointPersister
 29from dragon.native.machine import System
 30
 31
 32def build_ddict(args, persister_class=None):
 33    nnodes = System().nnodes
 34    kwargs = {
 35        "wait_for_keys": True,
 36        "working_set_size": args.working_set_size,
 37        "timeout": 200,
 38    }
 39
 40    # [smooth-kwargs-start]
 41    if persister_class is not None:
 42        kwargs.update(
 43            {
 44                "persister_class": persister_class,
 45                "persist_freq": args.persist_frequency,
 46                "persist_count": args.persist_count,
 47                "persist_path": args.persist_path,
 48                "name": args.name,
 49            }
 50        )
 51    # [smooth-kwargs-end]
 52
 53    return DDict(args.managers_per_node, nnodes, nnodes * int(4 * 1024 * 1024 * 1024), **kwargs)
 54
 55
 56def smooth_segment(segment: np.ndarray, left_val: Optional[float], right_val: Optional[float]) -> np.ndarray:
 57    if segment.size == 0:
 58        return segment
 59
 60    left_pad = segment[0] if left_val is None else left_val
 61    right_pad = segment[-1] if right_val is None else right_val
 62
 63    if segment.size == 1:
 64        return np.array([(left_pad + segment[0] + right_pad) / 3.0], dtype=segment.dtype)
 65
 66    smoothed = segment.copy()
 67    smoothed[0] = (left_pad + segment[0] + segment[1]) / 3.0
 68    smoothed[-1] = (segment[-2] + segment[-1] + right_pad) / 3.0
 69    if segment.size > 2:
 70        smoothed[1:-1] = (segment[:-2] + segment[1:-1] + segment[2:]) / 3.0
 71
 72    return smoothed
 73
 74
 75def worker_smooth(ddict: DDict, chunk_id: int, iterations: int, checkpoint_id: int, trace: bool) -> None:
 76    ddict.checkpoint_id = checkpoint_id
 77    num_chunks = ddict["num_chunks"]
 78    # Load the initial chunk data for this worker
 79    current = np.array(ddict[f"chunk_{chunk_id}"])
 80
 81    for step in range(iterations):
 82        left_val = None
 83        right_val = None
 84
 85        # [ start-data-sharing-ref ]
 86        if chunk_id > 0:
 87            left_segment = np.array(ddict[f"chunk_{chunk_id - 1}"])
 88            left_val = left_segment[-1]
 89        if chunk_id < num_chunks - 1:
 90            right_segment = np.array(ddict[f"chunk_{chunk_id + 1}"])
 91            right_val = right_segment[0]
 92        # [ end-data-sharing-ref ]
 93
 94        # [start-compute-avg]
 95        current = smooth_segment(current, left_val, right_val)
 96
 97        ddict.checkpoint()
 98
 99        ddict[f"chunk_{chunk_id}"] = current
100        # [end-compute-avg]
101
102        if trace:
103            print(
104                f"chunk={chunk_id} iter={step} checkpoint={ddict.checkpoint_id} min={current.min():.5f} max={current.max():.5f}",
105                flush=True,
106            )
107
108    ddict.detach()
109
110
111def populate_initial_chunks(ddict: DDict, size: int, num_chunks: int) -> None:
112    chunk_size = size // num_chunks
113    if chunk_size * num_chunks != size:
114        raise ValueError("size must be divisible by num_chunks")
115
116    # [start-pput]
117    # These two kv pairs do not change on each iteration. They are the same
118    # across all iterations and as such are put into the DDict as persistent
119    # kv pairs. This means they are not affected by checkpointing.
120    ddict.pput("num_chunks", num_chunks)
121    ddict.pput("chunk_size", chunk_size)
122    # [end-pput]
123
124    values = np.linspace(0.0, 1.0, size, dtype=np.float64)
125    for chunk_id in range(num_chunks):
126        start = chunk_id * chunk_size
127        end = start + chunk_size
128        ddict[f"chunk_{chunk_id}"] = values[start:end]
129
130
131def collect_result(ddict: DDict) -> np.ndarray:
132    num_chunks = ddict["num_chunks"]
133    chunk_size = ddict["chunk_size"]
134    result = np.zeros(num_chunks * chunk_size, dtype=np.float64)
135
136    for chunk_id in range(num_chunks):
137        result[chunk_id * chunk_size : (chunk_id + 1) * chunk_size] = np.array(ddict[f"chunk_{chunk_id}"])
138
139    return result
140
141
142def parse_args():
143    parser = argparse.ArgumentParser(
144        description="Distributed smoothing algorithm using DDict checkpointing",
145        epilog="Note: Launch with 'dragon' command, not 'python3'"
146    )
147    parser.add_argument("--mode", choices=["validate", "persist", "restore"], default="validate")
148    parser.add_argument("--size", type=int, default=128, help="Total length of the array to smooth")
149    parser.add_argument("--chunks", type=int, default=4, help="Number of distributed chunks")
150    parser.add_argument("--iterations", type=int, default=10, help="Number of smoothing iterations")
151    parser.add_argument("--resume_iterations", type=int, default=10, help="Iterations to run after restore")
152    parser.add_argument("--managers_per_node", type=int, default=1)
153    parser.add_argument("--working_set_size", type=int, default=3)
154    parser.add_argument("--persist_path", type=str, default="./ddict_checkpoints")
155    parser.add_argument("--persist_count", type=int, default=5)
156    parser.add_argument("--persist_frequency", type=int, default=1)
157    parser.add_argument("--persister", choices=["POSIX", "DAOS"], default="POSIX")
158    parser.add_argument("--trace", action="store_true")
159    parser.add_argument("--name", type=str, default="ddict_smoothing_example")
160    return parser.parse_args()
161
162
163def main():
164    args = parse_args()
165    mp.set_start_method("dragon")
166
167    if args.persister == "POSIX":
168        persister_class = PosixCheckpointPersister
169    else:
170        persister_class = DAOSCheckpointPersister
171
172    if args.mode == "validate":
173        ddict = build_ddict(args)
174        print("DDict built in validate mode, populating initial chunks...", flush=True)
175        populate_initial_chunks(ddict, args.size, args.chunks)
176        print("Initial chunks populated, starting smoothing iterations...", flush=True)
177        num_iterations = args.iterations
178        checkpoint_id = 0
179    elif args.mode == "persist":
180        ddict = build_ddict(args, persister_class=persister_class)
181        populate_initial_chunks(ddict, args.size, args.chunks)
182        num_iterations = args.iterations
183        checkpoint_id = 0
184    else:
185        ddict = build_ddict(args, persister_class=persister_class)
186        available = ddict.persisted_ids()
187        if not available:
188            raise RuntimeError("No persisted checkpoints found to restore.")
189        checkpoint_id = available[-1]
190        print(f"Restoring persisted checkpoint {checkpoint_id}", flush=True)
191        ddict.restore(checkpoint_id)
192        num_iterations = args.resume_iterations
193
194    print(
195        f"Starting DDict smoothing: mode={args.mode} chunks={args.chunks} iterations={num_iterations} checkpoint={ddict.checkpoint_id}",
196        flush=True,
197    )
198
199    processes = []
200    for chunk_id in range(args.chunks):
201        proc = mp.Process(target=worker_smooth, args=(ddict, chunk_id, num_iterations, checkpoint_id, args.trace))
202        proc.start()
203        processes.append(proc)
204
205    for proc in processes:
206        proc.join()
207
208    ddict.sync_to_newest_checkpoint()
209    result = collect_result(ddict)
210
211    print(
212        f"Finished smoothing mode={args.mode} checkpoint={ddict.checkpoint_id} result min={result.min():.6f} max={result.max():.6f}",
213        flush=True,
214    )
215
216    if args.mode == "persist":
217        print(f"Persisted checkpoint ids: {ddict.persisted_ids()}", flush=True)
218    elif args.mode == "restore":
219        print(f"Restored to checkpoint {checkpoint_id} and continued smoothing.", flush=True)
220
221    ddict.destroy()
222
223
224if __name__ == "__main__":
225    main()