Data Processing

Dragon supports processing data sets through a number of its APIs, including with the Python multiprocessing API . Let’s go through a few common use cases and how to implement them with Dragon.

Processing Files

Processing static data that sits on a filesystem is a very common use case. Since Dragon’s primary value is easily distributing an application or workflow across many nodes, we do need to consider two versions of this use case. One is where the data is resident in a shared filesystem accessible from any of the nodes Dragon is running on. The other case is where the data is only visible to one of the compute nodes.

Filesystem Visible Only on One Node

Consider the case where files are created in node-local storage and now we need to initiate processing from a Python process on that node. In this case we cannot directly access the data from a process running on any compute node. We must instead read the data in from the current process and send that data out to each node we want to parallelize the processing across. The simplest way to program that is with multiprocessing.pool.Pool (mp.Pool() for short). Let’s assume to start that there is enough memory on the initial node to read all the data in and hold two copies of it in process memory.

Listing 23 Read data from one process and process it in parallel across all nodes
 1import dragon
 2from multiprocessing import set_start_method, Pool, cpu_count
 3from pathlib import Path
 4
 5def process_data(fdata):
 6    # perform some operations on fdata and place into variable called result
 7    return result
 8
 9
10if __name__ == '__main__':
11    set_start_method("dragon")
12
13    dir_to_process = "."
14    data_to_process = []
15
16    num_cores = cpu_count() // 2  # all cores across all nodes, but let's not count hyperthreads
17
18    for f in Path(dir_to_process).glob('*')):
19        with open(str(f.resolve()), mode='r') as fo:
20            data_to_process.append(fo.read())
21
22    with Pool(num_cores) as p:
23        processed_data = p.map(process_data, data_to_process)

One drawback to this implementation is the sequential nature of reading all the data in before any processing can begin. Depending on the nature of the processing it may be much better to use a iterator along with , which will also save us some memory overhead.

Listing 24 Lazily read data from one process and process it in parallel across all nodes
 1import dragon
 2from multiprocessing import set_start_method, Pool, cpu_count
 3from pathlib import Path
 4
 5def process_data(fdata):
 6    # perform some operations on fdata and place into variable called result
 7    return result
 8
 9
10class DataReader:
11
12    def __init__(self, dir):
13        self.dir = dir
14        self.filenames = [str(f.resolve()) for f in Path(dir_to_process).glob('*')]
15        self.idx = 0
16
17    def __iter__(self):
18        return self
19
20    def __next__(self):
21        with open(self.filenames[self.idx], mode='r') as f:
22            data_to_process = f.read()
23        self.idx += 1
24        return data_to_process
25
26
27if __name__ == '__main__':
28    set_start_method("dragon")
29
30    dir_to_process = "."
31    dr = DataReader(dir_to_process)
32
33    num_cores = cpu_count() // 2  # all cores across all nodes, but let's not count hyperthreads
34
35    with Pool(num_cores) as p:
36        processed_data = p.imap(process_data, dr, chunk_size=4)

The nice thing about this style of code is it makes few assumptions about the type of system you are running on, in particluar there is no assumption files are accessible on all nodes. The drawback is all input and output data flows through the initial process. If there is a shared filesystem, it may be more efficient to make use of it.

Shared Filesystem

For very large datasets that have no hope of fitting in a single node’s memory or local storage, it can be better to leverage a systems shared filesystem (if it has one). On supercomputers, this is often a Lustre filesystem or something similar. In this case, a process on any node can access any file, and we don’t need to flow all input data through the initial process. Instead we just pass filenames to the workers.

Listing 25 Read and process data in parallel across all nodes
 1import dragon
 2from multiprocessing import set_start_method, Pool, cpu_count
 3from pathlib import Path
 4
 5def process_data(filename):
 6    with open(filename, mode='r') as f:
 7        fdata = f.read()
 8
 9    # perform some operations on fdata and place into variable called result
10    return result
11
12
13if __name__ == '__main__':
14    set_start_method("dragon")
15
16    dir_to_process = "."
17    files_to_process = []
18
19    num_cores = cpu_count() // 2  # all cores across all nodes, but let's not count hyperthreads
20
21    files_to_process = [str(f.resolve()) for f in Path(dir_to_process).glob('*')]
22
23    with Pool(num_cores) as p:
24        processed_data = p.map(process_data, files_to_process)

We could also write the processed data back to the shared filesystem, but if we intend to do more work with the processed data, we’re introducing a potential bottleneck into our workflow with the filesystem. One approach with Dragon to keep data closer to new and existing processes is to use the in-memory distributed dictionary, DDict.

Listing 26 Read and process data in parallel across all nodes and store results in a DDict
 1import dragon
 2import multiprocessing as mp
 3from multiprocessing import set_start_method, Pool, cpu_count, current_process
 4from pathlib import Path
 5
 6from dragon.data.ddict import DDict
 7from dragon.native.machine import System, Node
 8
 9
10def initialize_worker(the_ddict):
11    # Since we want each worker to maintain a persistent handle to the DDict,
12    # attach it to the current/local process instance. Done this way, workers attach only
13    # once and can reuse it between processing work items
14    me = current_process()
15    me.stash = {}
16    me.stash["ddict"] = the_ddict
17
18
19def process_data(filename):
20    the_ddict = current_process().stash["ddict"]
21    try:
22        with open(filename, mode='r') as f:
23            fdata = f.read()
24
25        # perform some operations on fdata and place into variable called result
26        the_ddict[filename] = result
27        return True
28    except:
29        return False
30
31
32def setup_ddict():
33
34    # let's place the DDict across all nodes Dragon is running on
35    my_system = System()
36    num_nodes = my_system.nnodes
37
38    total_mem = 0
39    for huid in my_system.nodes:
40        anode = Node(huid)
41        total_mem += anode.physical_mem
42    dict_mem = 0.1 * total_mem  # use 10% of the mem
43
44    return DDict(
45        2,  # two managers per node
46        num_nodes,
47        int(dict_mem),
48        )
49
50
51if __name__ == '__main__':
52    set_start_method("dragon")
53
54    dir_to_process = "."
55
56    num_cores = cpu_count() // 2  # all cores across all nodes, but let's not count hyperthreads
57
58    files_to_process = [str(f.resolve()) for f in Path(dir_to_process).glob('*')]
59
60    the_ddict = setup_ddict()
61
62    # use the standard initializer argument to Pool to pass the DDict to each worker
63    with Pool(num_cores, initializer=initialize_worker, initargs=(the_ddict,)) as p:
64        files_were_processed = p.map(process_data, files_to_process)
65
66    # peek at data from one file
67    print(the_ddict[files_to_process[2]], flush=True)

Streaming Data Processing Pipeline

Processing streaming data, where it is likely the volume of data is not known, is also well supported by the Python multiprocessing API and across many nodes with Dragon. Here is an example processing pipeline with three phases: a ingest phase listening on a socket, a raw data processing phase, and a second processing phase. Each phase may have different amounts of computation required. This example handles that by giving different amounts of CPU cores to the two processing phases.

Listing 27 Process data through a pipeline coming from a socket through two phases
 1import dragon
 2from multiprocessing import set_start_method, Pool, cpu_count
 3import socket
 4
 5
 6def generate_data(data_blk):
 7    # perform operations on data_blk in the first phase of a pipeline
 8    return result
 9
10
11def process_data(data_item):
12    # perform post processing operations
13    return result
14
15
16class StreamDataReader:
17
18    def __init__(self, host='0.0.0.0', port=9000, chunk_size=1024):
19        self.chunk_size = chunk_size
20        self.socket = socket.socket()
21        s.connect((host, port))
22
23    def __iter__(self):
24        return self
25
26    def __next__(self):
27        data = self.socket.recv(self.chunk_size)
28        if data:
29            return data_to_process
30        else:
31            raise(StopIteration)
32
33
34if __name__ == '__main__':
35    set_start_method("dragon")
36
37    num_cores = cpu_count() // 2  # all cores across all nodes, but let's not count hyperthreads
38    num_producers = num_cores // 4
39    num_consumers = num_cores - num_producers
40
41    data_stream = StreamDataReader()
42
43    producers = Pool(num_producers)
44    consumers = Pool(num_consumers)
45
46    # note that imap() returns an iterable itself, which allows a pipeline like this to get overlap between phases
47    gen_data = producers.imap(generate_data, data_stream, 4)
48    proc_data = consumers.imap(process_data, gen_data, 2)
49    for i, item in enumerate(proc_data):
50        print(f"Pipeline product {i}={item}")
51
52    producers.close()
53    consumers.close()
54    producers.join()
55    consumers.join()

The implementation uses to pull work through an iterator class StreamDataReader. As blocks of data come in through the socket, they are fed to a pool of