Data Processing
Dragon supports processing data sets through a number of its APIs, including with the Python multiprocessing API . Let’s go through a few common use cases and how to implement them with Dragon.
Processing Files
Processing static data that sits on a filesystem is a very common use case. Since Dragon’s primary value is easily distributing an application or workflow across many nodes, we do need to consider two versions of this use case. One is where the data is resident in a shared filesystem accessible from any of the nodes Dragon is running on. The other case is where the data is only visible to one of the compute nodes.
Filesystem Visible Only on One Node
Consider the case where files are created in node-local storage and now we need to initiate processing from a Python
process on that node. In this case we cannot directly access the data from a process running on any compute node.
We must instead read the data in from the current process and send that data out to each node we want to parallelize
the processing across. The simplest way to program that is with multiprocessing.pool.Pool
(mp.Pool()
for short). Let’s assume to start that there is enough memory on the initial node to read all the data in
and hold two copies of it in process memory.
1import dragon
2from multiprocessing import set_start_method, Pool, cpu_count
3from pathlib import Path
4
5def process_data(fdata):
6 # perform some operations on fdata and place into variable called result
7 return result
8
9
10if __name__ == '__main__':
11 set_start_method("dragon")
12
13 dir_to_process = "."
14 data_to_process = []
15
16 num_cores = cpu_count() // 2 # all cores across all nodes, but let's not count hyperthreads
17
18 for f in Path(dir_to_process).glob('*')):
19 with open(str(f.resolve()), mode='r') as fo:
20 data_to_process.append(fo.read())
21
22 with Pool(num_cores) as p:
23 processed_data = p.map(process_data, data_to_process)
One drawback to this implementation is the sequential nature of reading all the data in before any processing can begin. Depending on the nature of the processing it may be much better to use a iterator along with , which will also save us some memory overhead.
1import dragon
2from multiprocessing import set_start_method, Pool, cpu_count
3from pathlib import Path
4
5def process_data(fdata):
6 # perform some operations on fdata and place into variable called result
7 return result
8
9
10class DataReader:
11
12 def __init__(self, dir):
13 self.dir = dir
14 self.filenames = [str(f.resolve()) for f in Path(dir_to_process).glob('*')]
15 self.idx = 0
16
17 def __iter__(self):
18 return self
19
20 def __next__(self):
21 with open(self.filenames[self.idx], mode='r') as f:
22 data_to_process = f.read()
23 self.idx += 1
24 return data_to_process
25
26
27if __name__ == '__main__':
28 set_start_method("dragon")
29
30 dir_to_process = "."
31 dr = DataReader(dir_to_process)
32
33 num_cores = cpu_count() // 2 # all cores across all nodes, but let's not count hyperthreads
34
35 with Pool(num_cores) as p:
36 processed_data = p.imap(process_data, dr, chunk_size=4)
The nice thing about this style of code is it makes few assumptions about the type of system you are running on, in particluar there is no assumption files are accessible on all nodes. The drawback is all input and output data flows through the initial process. If there is a shared filesystem, it may be more efficient to make use of it.
Streaming Data Processing Pipeline
Processing streaming data, where it is likely the volume of data is not known, is also well supported by the Python multiprocessing API and across many nodes with Dragon. Here is an example processing pipeline with three phases: a ingest phase listening on a socket, a raw data processing phase, and a second processing phase. Each phase may have different amounts of computation required. This example handles that by giving different amounts of CPU cores to the two processing phases.
1import dragon
2from multiprocessing import set_start_method, Pool, cpu_count
3import socket
4
5
6def generate_data(data_blk):
7 # perform operations on data_blk in the first phase of a pipeline
8 return result
9
10
11def process_data(data_item):
12 # perform post processing operations
13 return result
14
15
16class StreamDataReader:
17
18 def __init__(self, host='0.0.0.0', port=9000, chunk_size=1024):
19 self.chunk_size = chunk_size
20 self.socket = socket.socket()
21 s.connect((host, port))
22
23 def __iter__(self):
24 return self
25
26 def __next__(self):
27 data = self.socket.recv(self.chunk_size)
28 if data:
29 return data_to_process
30 else:
31 raise(StopIteration)
32
33
34if __name__ == '__main__':
35 set_start_method("dragon")
36
37 num_cores = cpu_count() // 2 # all cores across all nodes, but let's not count hyperthreads
38 num_producers = num_cores // 4
39 num_consumers = num_cores - num_producers
40
41 data_stream = StreamDataReader()
42
43 producers = Pool(num_producers)
44 consumers = Pool(num_consumers)
45
46 # note that imap() returns an iterable itself, which allows a pipeline like this to get overlap between phases
47 gen_data = producers.imap(generate_data, data_stream, 4)
48 proc_data = consumers.imap(process_data, gen_data, 2)
49 for i, item in enumerate(proc_data):
50 print(f"Pipeline product {i}={item}")
51
52 producers.close()
53 consumers.close()
54 producers.join()
55 consumers.join()
The implementation uses to pull work through an iterator
class StreamDataReader
. As blocks of data come in through the socket, they are fed to a pool of