Parallel Merge Sort
Here we show a simple recursive merge sort implementation using Python Multiprocessing with Dragon. This divide-and-conquer algorithm uses recursion to subdivide the target list into smaller chunks and calls itself on each sublist. This is done until a minimal cutoff size is reached, at which point the Python standard is used on the sublist. See e.g. Knuth, The Art of Computer Programming, 1998, Vol. 3, section 5.2.4 for more information on merge sort.
In our parallel implementation, a new process is started for every sublist using Python Multiprocessing. Results are communicated using Multiprocessing queues. Parent processes block (sleep) in the result queues, until the child process has put their result into the queue.
Note that __main__
actually executes (MAX_SIZE-MIN_SIZE)/INCREMENT
merge
sorts of increasingly large sublists, measures the time and prints the
timing results.
1import random
2import sys
3import time
4import math
5
6import dragon
7import multiprocessing as mp
8
9# values are chosen so memory usage fits into the default Dragon memory pool of 4GB
10CUTOFF = 20000
11MIN_SIZE = 100000
12MAX_SIZE = 1000000
13INCREMENT = MIN_SIZE
14
15
16def merge(left: list, right: list) -> list:
17 """This function merges two lists.
18
19 :param left: First list containing data
20 :type left: list
21 :param right: Second list containing data
22 :type right: list
23 :return: Merged data
24 :rtype: list
25 """
26
27 merged_list = [None] * (len(left) + len(right))
28
29 i = 0
30 j = 0
31 k = 0
32
33 while i < len(left) and j < len(right):
34 if left[i] < right[j]:
35 merged_list[k] = left[i]
36 i = i + 1
37 else:
38 merged_list[k] = right[j]
39 j = j + 1
40 k = k + 1
41
42 # When we are done with the while loop above
43 # it is either the case that i > midpoint or
44 # that j > end but not both.
45
46 # finish up copying over the 1st list if needed
47 while i < len(left):
48 merged_list[k] = left[i]
49 i = i + 1
50 k = k + 1
51
52 # finish up copying over the 2nd list if needed
53 while j < len(right):
54 merged_list[k] = right[j]
55 j = j + 1
56 k = k + 1
57
58 return merged_list
59
60
61def parallel_merge_sort(chunk: list, cutoff: int, sorted_chunk_queue: object) -> None:
62 """Recursive merge sort function. Below `cutoff` items, sort immediately.
63 Otherwise, divide the input list `chunk` into two, start two processes
64 executing the same function on one of the sublist. Wait for their result in
65 the `result_queue`. Merge the two results and put the resulting list into
66 `sorted_chunk_queue`.
67
68 :param chunk: sub-list to recursively sort
69 :type chunk: list
70 :param cutoff: number of items below which the list will be sorted immediately
71 :type cutoff: int
72 :param sorted_chunk_queue: Queue to put the merged list into
73 :type sorted_chunk_queue: mp.Queue object
74 """
75
76 if len(chunk) <= cutoff:
77 chunk.sort()
78 sorted_chunk_queue.put(chunk)
79
80 else:
81 midpoint = len(chunk) // 2
82
83 left_chunk = chunk[:midpoint]
84 right_chunk = chunk[midpoint:]
85
86 result_queue = mp.Queue()
87
88 left_proc = mp.Process(target=parallel_merge_sort, args=(left_chunk, cutoff, result_queue))
89 right_proc = mp.Process(target=parallel_merge_sort, args=(right_chunk, cutoff, result_queue))
90
91 left_proc.start()
92 right_proc.start()
93
94 result_a = result_queue.get(timeout=None) # blocking
95 right_b = result_queue.get(timeout=None)
96
97 result = merge(result_a, right_b)
98
99 sorted_chunk_queue.put(result)
100
101
102def merge_sort(data: list, size: int, cutoff: int) -> int:
103 """Kick off merge sort on a slice of `data` of size `size`,
104 measure the run-time and return it.
105
106 :param data: The whole input data
107 :type data: list
108 :param size: the size of the slice to sort
109 :type size: int
110 :param cutoff: when to stop recursing
111 :type cutoff: int
112 :return: runtime in seconds
113 :rtype: int
114 """
115
116 the_lst = data[:size]
117
118 start = time.perf_counter()
119
120 result_queue = mp.Queue()
121 parallel_merge_sort(the_lst, cutoff, result_queue)
122 result = result_queue.get()
123
124 the_lst.clear()
125 the_lst.extend(result)
126
127 end = time.perf_counter()
128 delta = end - start
129
130 return delta
131
132
133def find_number_of_processes(n: int, cutoff: int) -> int:
134 """Return the number of processes started by effectively
135 replaying the recursion.
136
137 :param n: number of elements
138 :type n: int
139 :param cutoff: umber of items below which no additional process is started
140 :type cutoff: int
141 :return: number of started processes
142 :rtype: int
143 """
144
145 procs = 1
146
147 while True:
148 left = n // 2
149 right = n - left
150 procs = 2 * procs + 1
151
152 if left <= cutoff:
153 if right > cutoff:
154 procs = procs + 2 * left
155
156 return procs
157
158 n = n // 2
159
160
161if __name__ == "__main__":
162
163 if "dragon" in sys.argv:
164 mp.set_start_method("dragon")
165
166 data: list = [random.randrange(MAX_SIZE) for i in range(MAX_SIZE)]
167
168 print(
169 f" List Size Time (seconds) Processes Channels (or Queues) with cutoff={CUTOFF}",
170 flush=True,
171 )
172
173 for size in range(MIN_SIZE, MAX_SIZE + 1, INCREMENT):
174
175 delta = merge_sort(data, size, CUTOFF)
176 proc_count = find_number_of_processes(size, CUTOFF)
177 channel_count = proc_count // 2
178 print(f"{size:13d} {delta:14.6f}{proc_count:12}{channel_count:12}")
The code can be run with dragon merge_sort.py dragon
using Dragon.
Example output using standard Multiprocessing
1>$dragon merge_sort.py dragon
2List Size Time (seconds) Processes Channels (or Queues) with cutoff=20000
3 100000 0.696831 15 7
4 200000 0.934078 31 15
5 300000 1.016903 31 15
6 400000 1.372683 63 31
7 500000 1.462831 63 31
8 600000 1.575892 63 31
9 700000 2.307709 127 63
10 800000 2.358397 127 63
11 900000 2.471223 127 63
12 1000000 2.561216 127 63
13+++ head proc exited, code 0
For comparison, the code can be run with standard Multiprocessing using python3 merge_sort.py
1>$python3 merge_sort.py
2List Size Time (seconds) Processes Channels (or Queues) with cutoff=20000
3 100000 0.143925 15 7
4 200000 0.232850 31 15
5 300000 0.319611 31 15
6 400000 0.461013 63 31
7 500000 0.562094 63 31
8 600000 0.669838 63 31
9 700000 0.784344 127 63
10 800000 0.882651 127 63
11 900000 0.975826 127 63
12 1000000 1.103736 127 63