Parallel Producer - Consumer Communication with Queue

Here we show a multiple producers multiple consumers communication scheme with Multiprocessing and Dragon.

The parent process creates and starts a number processes. The first half acts as a producer, creating random strings, packaging them up with a random selection of lambdas and putting this work package into a shared queue. The second half of the processes acts as consumers. They get handed only the queue, execute a blocking get() on it to receive the work package. Once the package is received, they execute the lambdas one by one on the string, printing the output. Finally, the parent process joins on all process objects, to ensure they have completed successfully.

The code demonstrates the following key concepts working with Dragon:

  • Shared communication objects between processes, here using a queue.

  • Creating, starting and joining worker processes.

  • Out-of-order execution through the output.

  • Dragon’s puid’s replace the OS process IDs (pid) as a unique identifier for processes.

  • Serialization of data to pass it to another process. We use cloudpickle here, which is Python only. Multiprocessing with Dragon uses standard pickle, as default, which doesn’t support lambdas.

Listing 12 queue_demo.py: Parallel Producer - Consumer Communication with Dragon & Multiprocessing Queue
 1import random
 2import string
 3import cloudpickle
 4
 5import dragon
 6import multiprocessing as mp
 7
 8
 9def producer(serialized_args: bytes) -> None:
10    """Generate some string data, bundle it up with some random functions, add
11    it to a queue.
12
13    :param pickled_args: arguments to the function
14    :type funcs: bytes
15    """
16
17    q, funcs = cloudpickle.loads(serialized_args)
18
19    data = random.choices(string.ascii_lowercase, k=1)[0]
20    for i in range(5):
21        data = data + " " + "".join(random.choices(string.ascii_lowercase, k=i + 3))
22
23    print(
24        f'I am producer {mp.current_process().pid} and I\'m sending data: "{data}" and string ops:', end=" "
25    )
26
27    n = random.randint(1, len(funcs))
28    chosen = random.sample(list(funcs.items()), n)  # random selection without replacement
29
30    for item in chosen:
31        print(item[0], end=" ")
32    print(flush=True)
33
34    work_pkg = cloudpickle.dumps((chosen, data))
35
36    q.put(work_pkg)
37
38
39def consumer(q: mp.queues.Queue) -> None:
40    """Retrieve data from a queue, do some work on it and print the result.
41
42    :param q: Queue to retrieve from
43    :type q: mp.queues.Queue
44    """
45
46    # gives multi-node compatible Dragon puid, not OS pid.
47    print(f"I am consumer {mp.current_process().pid} --", end=" ")
48
49    serialized_data = q.get()  # implicit timeout=None here, blocking
50    funcs, data = cloudpickle.loads(serialized_data)
51
52    for identifier, f in funcs:
53        print(f'{identifier}: "{f(data)}"', end=" ")
54        data = f(data)
55    print(flush=True)
56
57
58if __name__ == "__main__":
59
60    mp.set_start_method("dragon")
61
62    # define some string transformations
63    funcs = {
64        "upper": lambda a: a.upper(),
65        "lower": lambda a: a.lower(),
66        "strip": lambda a: a.strip(),
67        "capitalize": lambda a: a.capitalize(),
68        'replace(" ", "")': lambda a: a.replace(" ", ""),
69    }
70
71    # use a queue for communication
72    q = mp.Queue()
73
74    # serialize producer arguments: Dragon uses pickle as default that doesn't
75    # work with lambdas
76    serialized_args = cloudpickle.dumps((q, funcs))
77
78    # create & start processes
79    processes = []
80    for _ in range(8):
81        p = mp.Process(target=producer, args=(serialized_args,))
82        processes.append(p)
83        p = mp.Process(target=consumer, args=(q,))
84        processes.append(p)
85
86    for p in processes:
87        p.start()
88
89    # wait for processes to finish
90    for p in processes:
91        p.join()

when run with dragon queue_demo.py, results in output similar to the following:

 1>$dragon queue_demo.py
 2I am producer 4294967297 and I'm sending data: "n jqc vneb itfqd eygjfc ljwzrfa" and string ops: capitalize upper lower strip
 3I am consumer 4294967298 -- capitalize: "N jqc vneb itfqd eygjfc ljwzrfa" upper: "N JQC VNEB ITFQD EYGJFC LJWZRFA" lower: "n jqc vneb itfqd eygjfc ljwzrfa" strip: "n jqc vneb itfqd eygjfc ljwzrfa"
 4I am producer 4294967301 and I'm sending data: "l xpp fvjh odgqi cmhxqa syxgnvl" and string ops: lower
 5I am consumer 4294967300 -- lower: "l xpp fvjh odgqi cmhxqa syxgnvl"
 6I am producer 4294967299 and I'm sending data: "w ebz uwjc ahpxw cmpfac uxyuoyd" and string ops: capitalize strip lower replace(" ", "") upper
 7I am consumer 4294967302 -- capitalize: "W ebz uwjc ahpxw cmpfac uxyuoyd" strip: "W ebz uwjc ahpxw cmpfac uxyuoyd" lower: "w ebz uwjc ahpxw cmpfac uxyuoyd" replace(" ", ""): "webzuwjcahpxwcmpfacuxyuoyd" upper: "WEBZUWJCAHPXWCMPFACUXYUOYD"
 8I am producer 4294967303 and I'm sending data: "x yga ysbv jqbvu eoryiv wemvydd" and string ops: upper lower replace(" ", "") capitalize strip
 9I am consumer 4294967306 -- upper: "X YGA YSBV JQBVU EORYIV WEMVYDD" lower: "x yga ysbv jqbvu eoryiv wemvydd" replace(" ", ""): "xygaysbvjqbvueoryivwemvydd" capitalize: "Xygaysbvjqbvueoryivwemvydd" strip: "Xygaysbvjqbvueoryivwemvydd"
10I am producer 4294967305 and I'm sending data: "m evl kaaq bbamw yuxces mflukgc" and string ops: replace(" ", "")
11I am consumer 4294967304 -- replace(" ", ""): "mevlkaaqbbamwyuxcesmflukgc"
12I am producer 4294967311 and I'm sending data: "r zdv gqni phjop rxxnjv mwnoavn" and string ops: lower upper replace(" ", "") capitalize
13I am consumer 4294967308 -- lower: "r zdv gqni phjop rxxnjv mwnoavn" upper: "R ZDV GQNI PHJOP RXXNJV MWNOAVN" replace(" ", ""): "RZDVGQNIPHJOPRXXNJVMWNOAVN" capitalize: "Rzdvgqniphjoprxxnjvmwnoavn"
14I am producer 4294967307 and I'm sending data: "j njm pnpg spkvg bfsukk ihfmklm" and string ops: capitalize strip lower upper replace(" ", "")
15I am consumer 4294967310 -- capitalize: "J njm pnpg spkvg bfsukk ihfmklm" strip: "J njm pnpg spkvg bfsukk ihfmklm" lower: "j njm pnpg spkvg bfsukk ihfmklm" upper: "J NJM PNPG SPKVG BFSUKK IHFMKLM" replace(" ", ""): "JNJMPNPGSPKVGBFSUKKIHFMKLM"
16I am producer 4294967309 and I'm sending data: "a eij rzuz rlilc jkiaxr raqzvft" and string ops: replace(" ", "") capitalize upper strip lower
17I am consumer 4294967312 -- replace(" ", ""): "aeijrzuzrlilcjkiaxrraqzvft" capitalize: "Aeijrzuzrlilcjkiaxrraqzvft" upper: "AEIJRZUZRLILCJKIAXRRAQZVFT" strip: "AEIJRZUZRLILCJKIAXRRAQZVFT" lower: "aeijrzuzrlilcjkiaxrraqzvft"
18+++ head proc exited, code 0

Note that producers and consumers are using their Dragon puid instead of their OS pid to identify themselves. On a distributed system, the pid is not unique anymore. Dragon generalizes the concept into unique IDs that identify managed objects across even federated systems.