Parallel Merge Sort

Here we show a simple recursive merge sort implementation using Python Multiprocessing with Dragon. This divide-and-conquer algorithm uses recursion to subdivide the target list into smaller chunks and calls itself on each sublist. This is done until a minimal cutoff size is reached, at which point the Python standard is used on the sublist. See e.g. Knuth, The Art of Computer Programming, 1998, Vol. 3, section 5.2.4 for more information on merge sort.

In our parallel implementation, a new process is started for every sublist using Python Multiprocessing. Results are communicated using Multiprocessing queues. Parent processes block (sleep) in the result queues, until the child process has put their result into the queue.

Note that __main__ actually executes (MAX_SIZE-MIN_SIZE)/INCREMENT merge sorts of increasingly large sublists, measures the time and prints the timing results.

Listing 11 merge_sort.py: Parallel merge sort using Python Multiprocessing with Dragon
  1import random
  2import sys
  3import time
  4import math
  5
  6import dragon
  7import multiprocessing as mp
  8
  9# values are chosen so memory usage fits into the default Dragon memory pool of 4GB
 10CUTOFF = 20000
 11MIN_SIZE = 100000
 12MAX_SIZE = 1000000
 13INCREMENT = MIN_SIZE
 14
 15
 16def merge(left: list, right: list) -> list:
 17    """This function merges two lists.
 18
 19    :param left: First list containing data
 20    :type left: list
 21    :param right: Second list containing data
 22    :type right: list
 23    :return: Merged data
 24    :rtype: list
 25    """
 26
 27    merged_list = [None] * (len(left) + len(right))
 28
 29    i = 0
 30    j = 0
 31    k = 0
 32
 33    while i < len(left) and j < len(right):
 34        if left[i] < right[j]:
 35            merged_list[k] = left[i]
 36            i = i + 1
 37        else:
 38            merged_list[k] = right[j]
 39            j = j + 1
 40        k = k + 1
 41
 42    # When we are done with the while loop above
 43    # it is either the case that i > midpoint or
 44    # that j > end but not both.
 45
 46    # finish up copying over the 1st list if needed
 47    while i < len(left):
 48        merged_list[k] = left[i]
 49        i = i + 1
 50        k = k + 1
 51
 52    # finish up copying over the 2nd list if needed
 53    while j < len(right):
 54        merged_list[k] = right[j]
 55        j = j + 1
 56        k = k + 1
 57
 58    return merged_list
 59
 60
 61def parallel_merge_sort(chunk: list, cutoff: int, sorted_chunk_queue: object) -> None:
 62    """Recursive merge sort function. Below `cutoff` items, sort immediately.
 63    Otherwise, divide the input list `chunk` into two, start two processes
 64    executing the same function on one of the sublist. Wait for their result in
 65    the `result_queue`. Merge the two results and put the resulting list into
 66    `sorted_chunk_queue`.
 67
 68    :param chunk: sub-list to recursively sort
 69    :type chunk: list
 70    :param cutoff: number of items below which the list will be sorted immediately
 71    :type cutoff: int
 72    :param sorted_chunk_queue: Queue to put the merged list into
 73    :type sorted_chunk_queue: mp.Queue object
 74    """
 75
 76    if len(chunk) <= cutoff:
 77        chunk.sort()
 78        sorted_chunk_queue.put(chunk)
 79
 80    else:
 81        midpoint = len(chunk) // 2
 82
 83        left_chunk = chunk[:midpoint]
 84        right_chunk = chunk[midpoint:]
 85
 86        result_queue = mp.Queue()
 87
 88        left_proc = mp.Process(target=parallel_merge_sort, args=(left_chunk, cutoff, result_queue))
 89        right_proc = mp.Process(target=parallel_merge_sort, args=(right_chunk, cutoff, result_queue))
 90
 91        left_proc.start()
 92        right_proc.start()
 93
 94        result_a = result_queue.get(timeout=None)  # blocking
 95        right_b = result_queue.get(timeout=None)
 96
 97        result = merge(result_a, right_b)
 98
 99        sorted_chunk_queue.put(result)
100
101
102def merge_sort(data: list, size: int, cutoff: int) -> int:
103    """Kick off merge sort on a slice of `data` of size `size`,
104    measure the run-time and return it.
105
106    :param data: The whole input data
107    :type data: list
108    :param size: the size of the slice to sort
109    :type size: int
110    :param cutoff: when to stop recursing
111    :type cutoff: int
112    :return: runtime in seconds
113    :rtype: int
114    """
115
116    the_lst = data[:size]
117
118    start = time.perf_counter()
119
120    result_queue = mp.Queue()
121    parallel_merge_sort(the_lst, cutoff, result_queue)
122    result = result_queue.get()
123
124    the_lst.clear()
125    the_lst.extend(result)
126
127    end = time.perf_counter()
128    delta = end - start
129
130    return delta
131
132
133def find_number_of_processes(n: int, cutoff: int) -> int:
134    """Return the number of processes started by effectively
135    replaying the recursion.
136
137    :param n: number of elements
138    :type n: int
139    :param cutoff: umber of items below which no additional process is started
140    :type cutoff: int
141    :return: number of started processes
142    :rtype: int
143    """
144
145    procs = 1
146
147    while True:
148        left = n // 2
149        right = n - left
150        procs = 2 * procs + 1
151
152        if left <= cutoff:
153            if right > cutoff:
154                procs = procs + 2 * left
155
156            return procs
157
158        n = n // 2
159
160
161if __name__ == "__main__":
162
163    if "dragon" in sys.argv:
164        mp.set_start_method("dragon")
165
166    data: list = [random.randrange(MAX_SIZE) for i in range(MAX_SIZE)]
167
168    print(
169        f"    List Size    Time (seconds)    Processes    Channels (or Queues) with cutoff={CUTOFF}",
170        flush=True,
171    )
172
173    for size in range(MIN_SIZE, MAX_SIZE + 1, INCREMENT):
174
175        delta = merge_sort(data, size, CUTOFF)
176        proc_count = find_number_of_processes(size, CUTOFF)
177        channel_count = proc_count // 2
178        print(f"{size:13d}    {delta:14.6f}{proc_count:12}{channel_count:12}")

The code can be run with dragon merge_sort.py dragon using Dragon.

Example output using standard Multiprocessing

 1>$dragon merge_sort.py dragon
 2List Size    Time (seconds)    Processes    Channels (or Queues) with cutoff=20000
 3   100000          0.696831          15           7
 4   200000          0.934078          31          15
 5   300000          1.016903          31          15
 6   400000          1.372683          63          31
 7   500000          1.462831          63          31
 8   600000          1.575892          63          31
 9   700000          2.307709         127          63
10   800000          2.358397         127          63
11   900000          2.471223         127          63
12  1000000          2.561216         127          63
13+++ head proc exited, code 0

For comparison, the code can be run with standard Multiprocessing using python3 merge_sort.py

 1>$python3 merge_sort.py
 2List Size    Time (seconds)    Processes    Channels (or Queues) with cutoff=20000
 3   100000          0.143925          15           7
 4   200000          0.232850          31          15
 5   300000          0.319611          31          15
 6   400000          0.461013          63          31
 7   500000          0.562094          63          31
 8   600000          0.669838          63          31
 9   700000          0.784344         127          63
10   800000          0.882651         127          63
11   900000          0.975826         127          63
12  1000000          1.103736         127          63