Parallel Producer - Consumer Communication with Queue
Here we show a multiple producers multiple consumers communication scheme with Multiprocessing and Dragon.
The parent process creates and starts a number processes. The first half acts
as a producer, creating random strings, packaging them up with a random
selection of lambdas and putting this work package into a shared queue. The
second half of the processes acts as consumers. They get handed only the queue,
execute a blocking get()
on it to receive the work package. Once the package
is received, they execute the lambdas one by one on the string, printing the
output. Finally, the parent process joins on all process objects, to ensure
they have completed successfully.
The code demonstrates the following key concepts working with Dragon:
Shared communication objects between processes, here using a queue.
Creating, starting and joining worker processes.
Out-of-order execution through the output.
Dragon’s
puid
’s replace the OS process IDs (pid
) as a unique identifier for processes.Serialization of data to pass it to another process. We use
cloudpickle
here, which is Python only. Multiprocessing with Dragon uses standard pickle, as default, which doesn’t support lambdas.
1import random
2import string
3import cloudpickle
4
5import dragon
6import multiprocessing as mp
7
8
9def producer(serialized_args: bytes) -> None:
10 """Generate some string data, bundle it up with some random functions, add
11 it to a queue.
12
13 :param pickled_args: arguments to the function
14 :type funcs: bytes
15 """
16
17 q, funcs = cloudpickle.loads(serialized_args)
18
19 data = random.choices(string.ascii_lowercase, k=1)[0]
20 for i in range(5):
21 data = data + " " + "".join(random.choices(string.ascii_lowercase, k=i + 3))
22
23 print(
24 f'I am producer {mp.current_process().pid} and I\'m sending data: "{data}" and string ops:', end=" "
25 )
26
27 n = random.randint(1, len(funcs))
28 chosen = random.sample(list(funcs.items()), n) # random selection without replacement
29
30 for item in chosen:
31 print(item[0], end=" ")
32 print(flush=True)
33
34 work_pkg = cloudpickle.dumps((chosen, data))
35
36 q.put(work_pkg)
37
38
39def consumer(q: mp.queues.Queue) -> None:
40 """Retrieve data from a queue, do some work on it and print the result.
41
42 :param q: Queue to retrieve from
43 :type q: mp.queues.Queue
44 """
45
46 # gives multi-node compatible Dragon puid, not OS pid.
47 print(f"I am consumer {mp.current_process().pid} --", end=" ")
48
49 serialized_data = q.get() # implicit timeout=None here, blocking
50 funcs, data = cloudpickle.loads(serialized_data)
51
52 for identifier, f in funcs:
53 print(f'{identifier}: "{f(data)}"', end=" ")
54 data = f(data)
55 print(flush=True)
56
57
58if __name__ == "__main__":
59
60 mp.set_start_method("dragon")
61
62 # define some string transformations
63 funcs = {
64 "upper": lambda a: a.upper(),
65 "lower": lambda a: a.lower(),
66 "strip": lambda a: a.strip(),
67 "capitalize": lambda a: a.capitalize(),
68 'replace(" ", "")': lambda a: a.replace(" ", ""),
69 }
70
71 # use a queue for communication
72 q = mp.Queue()
73
74 # serialize producer arguments: Dragon uses pickle as default that doesn't
75 # work with lambdas
76 serialized_args = cloudpickle.dumps((q, funcs))
77
78 # create & start processes
79 processes = []
80 for _ in range(8):
81 p = mp.Process(target=producer, args=(serialized_args,))
82 processes.append(p)
83 p = mp.Process(target=consumer, args=(q,))
84 processes.append(p)
85
86 for p in processes:
87 p.start()
88
89 # wait for processes to finish
90 for p in processes:
91 p.join()
when run with dragon queue_demo.py
, results in output similar to the following:
1>$dragon queue_demo.py
2I am producer 4294967297 and I'm sending data: "n jqc vneb itfqd eygjfc ljwzrfa" and string ops: capitalize upper lower strip
3I am consumer 4294967298 -- capitalize: "N jqc vneb itfqd eygjfc ljwzrfa" upper: "N JQC VNEB ITFQD EYGJFC LJWZRFA" lower: "n jqc vneb itfqd eygjfc ljwzrfa" strip: "n jqc vneb itfqd eygjfc ljwzrfa"
4I am producer 4294967301 and I'm sending data: "l xpp fvjh odgqi cmhxqa syxgnvl" and string ops: lower
5I am consumer 4294967300 -- lower: "l xpp fvjh odgqi cmhxqa syxgnvl"
6I am producer 4294967299 and I'm sending data: "w ebz uwjc ahpxw cmpfac uxyuoyd" and string ops: capitalize strip lower replace(" ", "") upper
7I am consumer 4294967302 -- capitalize: "W ebz uwjc ahpxw cmpfac uxyuoyd" strip: "W ebz uwjc ahpxw cmpfac uxyuoyd" lower: "w ebz uwjc ahpxw cmpfac uxyuoyd" replace(" ", ""): "webzuwjcahpxwcmpfacuxyuoyd" upper: "WEBZUWJCAHPXWCMPFACUXYUOYD"
8I am producer 4294967303 and I'm sending data: "x yga ysbv jqbvu eoryiv wemvydd" and string ops: upper lower replace(" ", "") capitalize strip
9I am consumer 4294967306 -- upper: "X YGA YSBV JQBVU EORYIV WEMVYDD" lower: "x yga ysbv jqbvu eoryiv wemvydd" replace(" ", ""): "xygaysbvjqbvueoryivwemvydd" capitalize: "Xygaysbvjqbvueoryivwemvydd" strip: "Xygaysbvjqbvueoryivwemvydd"
10I am producer 4294967305 and I'm sending data: "m evl kaaq bbamw yuxces mflukgc" and string ops: replace(" ", "")
11I am consumer 4294967304 -- replace(" ", ""): "mevlkaaqbbamwyuxcesmflukgc"
12I am producer 4294967311 and I'm sending data: "r zdv gqni phjop rxxnjv mwnoavn" and string ops: lower upper replace(" ", "") capitalize
13I am consumer 4294967308 -- lower: "r zdv gqni phjop rxxnjv mwnoavn" upper: "R ZDV GQNI PHJOP RXXNJV MWNOAVN" replace(" ", ""): "RZDVGQNIPHJOPRXXNJVMWNOAVN" capitalize: "Rzdvgqniphjoprxxnjvmwnoavn"
14I am producer 4294967307 and I'm sending data: "j njm pnpg spkvg bfsukk ihfmklm" and string ops: capitalize strip lower upper replace(" ", "")
15I am consumer 4294967310 -- capitalize: "J njm pnpg spkvg bfsukk ihfmklm" strip: "J njm pnpg spkvg bfsukk ihfmklm" lower: "j njm pnpg spkvg bfsukk ihfmklm" upper: "J NJM PNPG SPKVG BFSUKK IHFMKLM" replace(" ", ""): "JNJMPNPGSPKVGBFSUKKIHFMKLM"
16I am producer 4294967309 and I'm sending data: "a eij rzuz rlilc jkiaxr raqzvft" and string ops: replace(" ", "") capitalize upper strip lower
17I am consumer 4294967312 -- replace(" ", ""): "aeijrzuzrlilcjkiaxrraqzvft" capitalize: "Aeijrzuzrlilcjkiaxrraqzvft" upper: "AEIJRZUZRLILCJKIAXRRAQZVFT" strip: "AEIJRZUZRLILCJKIAXRRAQZVFT" lower: "aeijrzuzrlilcjkiaxrraqzvft"
18+++ head proc exited, code 0
Note that producers and consumers are using their Dragon puid
instead of their
OS pid
to identify themselves. On a distributed system, the pid
is not
unique anymore. Dragon generalizes the concept into unique IDs that identify
managed objects across even federated systems.