- Published on
Parallel Processing with Python and Multiprocessing using Queue
- Authors
- Name
- Ruan Bekker
- @ruanbekker
Today I had the requirement to achieve a task by using parallel processing in order to save time.
The task to be achieved
For this demonstration, I have a list of people and each task needs to lookup its pet name and return to stdout. I want to spawn a task for each persons pet name lookup and run the tasks in parallel so that all the results can be returned back at once, instead of sequential.
This is a basic task, but you could have a CPU intensive job, where it will shine better.
Multiprocesing Queues
When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.
The Queue type is a multi producer, multi consumer FIFO queues modelled on the queue.Queue class in the standard library. You can read more up on it here
Our Workflow
Our multiprocessing workflow will look like this:
- We will define our data, which will be a dictionary of people and their pet names
- We will define an output queue
- Create a example function that will produce each task to the queue
- Then we will setup a lost of processes that we want to run
- From the list of processes that we defined, we will run each process, then wait and exit the completed processes
- We will then consume from the queue. For each process in our processes list
Note that I also added a delay of 2 seconds, so that you can see that the tasks are run in parallel, so the delay will only be 2 seconds.
Our code:
import multiprocessing as mp
import random
import string
import time
pet_maps = {
"adam": {"pet_name": "max"},
"steve": {"pet_name": "sylvester"},
"michelle": {"pet_name": "fuzzy"},
"frank": {"pet_name": "pete"},
"will": {"pet_name": "cat"},
"natasha": {"pet_name": "tweety"},
"samantha": {"pet_name": "bob"},
"peter": {"pet_name": "garfield"},
"susan": {"pet_name": "zazu"},
"josh": {"pet_name": "tom"},
}
pet_owners = pet_maps.keys()
output = mp.Queue()
def get_pet_name(data, output):
time.sleep(2)
print('adding to queue')
response = 'pet name: {}'.format(data)
output.put(response)
processes = [mp.Process(target=get_pet_name, args=(pet_maps[name]['pet_name'], output)) for name in pet_owners]
for p in processes:
p.start()
for p in processes:
p.join()
print('consuming from queue:')
results = [output.get() for p in processes]
print(results)
Running the example:
$ python3 mp.py
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
adding to queue
consuming from queue:
['pet name: max', 'pet name: sylvester', 'pet name: fuzzy', 'pet name: pete', 'pet name: cat', 'pet name: tweety', 'pet name: garfield', 'pet name: bob', 'pet name: zazu', 'pet name: tom']
Thank You
Thanks for reading, feel free to check out my website, and subscribe to my newsletter or follow me at @ruanbekker on Twitter.
- Linktree: https://go.ruan.dev/links
- Patreon: https://go.ruan.dev/patreon