Now Reading
Run Python Code In Parallel Using Multiprocessing

Run Python Code In Parallel Using Multiprocessing

multiprocessing in Python

Multiprocessing enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. This parallelization leads to significant speedup in tasks that involve a lot of computation. This article will cover multiprocessing in Python; it’ll start by illustrating multiprocessing in Python with some basic sleep methods and then finish up with a real-world image processing example. In one of our recent articles, we discussed using multithreading in Python to speed up programs; I recommend reading that before continuing.

Multiprocessing in Python 

Like the threading module, the multiprocessing module comes with the Python standard library. You can create processes by creating a Process object using a callable object or function or by inheriting the Process class and overriding the run() method. Let’s create the dummy function we will use to illustrate the basics of multiprocessing in Python.

Register for our Workshop>>
 import time
 def useless_function(sec = 1):
     print(f'Sleeping for {sec} second(s)')
     time.sleep(sec)
     print(f'Done sleeping')
 start = time.perf_counter()
 useless_function()
 useless_function()
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 1 second(s)
 Done sleeping
 Sleeping for 1 second(s)
 Done sleeping
 Finished in 2.02 second(s) 

Using the Thread() Constructor

Running the function twice sequentially took roughly two seconds as expected. Let’s create two processes, run them in parallel and see how that pans out. 

 import multiprocessing
 start = time.perf_counter()
 process1 = multiprocessing.Process(target=useless_function)
 process2 = multiprocessing.Process(target=useless_function)
 process1.start()
 process2.start()
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-----------------------------Output-----------------------------
 Finished in 0.02 second(s)
 Sleeping for 1 second(s)
 Sleeping for 1 second(s) 

Something seems wrong with the output, granted that we forgot to wait for the processes to finish but according to the output, the processes were started after the program finished execution. The output appears in this order because it takes a while to create the processes and get them running. This isn’t the case for threads that start instantly. Like threads, the join() method is used to wait for the processes to finish execution. 

 start = time.perf_counter()
 process1.start()
 process2.start()
 process1.join()
 process2.join()
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 1 second(s)
 Sleeping for 1 second(s)
 Done sleeping
 Done sleeping
 Finished in 1.04 second(s) 

Right now we are not getting that big of a speedup, but that’s mainly because our function doesn’t take too much time to execute, and we are only running it twice. What if we want to run it ten times? If we were to run it sequentially, it would take a little over ten seconds because one would have to finish before the other. However, if we run these parallelly in multiple processes, it should be significantly faster. Instead of manually creating the ten processes, let’s create and start these in a loop. 

Unlike threads, when passing arguments to processes, the arguments must be serializable using pickle. Simply put, serialization means converting python objects into a format (binary format) that can be deconstructed and reconstructed in another python script.

 start = time.perf_counter()
 processes = []
 for _ in range(10):
     p = multiprocessing.Process(target=useless_function, args = [2])
     p.start()
     processes.append(p) 

Now we can’t run join() within the same loop because it would wait for the process to finish before looping and creating the next one. So it would be the same as running them sequentially.

 for p in processes:
     p.join()
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)')     
-----------------------------Output-----------------------------
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Sleeping for 2 second(s)
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Done sleeping
 Finished in 2.15 second(s) 

Even when running the functions ten times, it finishes in about two seconds. Now, this does seem a bit strange, seeing that my processor only has 4 cores. However, the computer has its own abstracted ways of switching cores when one of them isn’t busy (hint: multithreading).

Creating a Custom Process Class

To create your own custom process class, you can inherit the Process class and override its run() method.

  from multiprocessing import Process
  def countdown(name, delay, count):
      while count:
          time.sleep(delay)
          print (f'{name, time.ctime(time.time()), count}')
          count -= 1
  class newProcess(Process):
      def __init__(self, name, count):
          multiprocessing.Process.__init__(self)
          self.name = name
          self.count = count
      def run(self):
          print("Starting: " + self.name + "\n")
          countdown(self.name, 1,self.count)
          print("Exiting: " + self.name + "\n")
  t = newProcess("newProcess 1", 5)
  t.start()
  t.join()
  print("Done")  
-----------------------------Output-----------------------------
 Starting: newProcess 1

 ('newProcess 1', 'Fri Apr 30 07:24:56 2021', 5)
 ('newProcess 1', 'Fri Apr 30 07:24:57 2021', 4)
 ('newProcess 1', 'Fri Apr 30 07:24:58 2021', 3)
 ('newProcess 1', 'Fri Apr 30 07:24:59 2021', 2)
 ('newProcess 1', 'Fri Apr 30 07:25:00 2021', 1)

 Exiting: newProcess 1

 Done 

Using ProcessPoolExecutor

In addition to using the multiprocessing library, there’s another way of running processes. In Python 3.2, they introduced ProcessPoolExecuter. It is a more efficient way of running multiple processes. It also allows us to switch over to using multiple threads instead of processes with minimal changes. If we want to execute the function one at a time, we can use the submit() method. It schedules the target function for execution and returns a futures object. This futures object encapsulates the function’s execution and allows us to check that it’s running or if it’s done and fetch the return value using result()

Let’s redefine the dummy function, so it has a return value and illustrates the use of  ProcessPoolExecuter.

 import concurrent.futures
 start = time.perf_counter()
 def useless_function(sec = 1):
     print(f'Sleeping for {sec} second(s)')
     time.sleep(sec)
     print(f'Done sleeping')
     return sec

 with concurrent.futures.ProcessPoolExecutor() as executor:
     process1 = executor.submit(useless_function, 1)
     process2 = executor.submit(useless_function, 1)
     print(f'Return Value: {process1.result()}')
     print(f'Return Value: {process2.result()}')
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 1 second(s)
 Sleeping for 1 second(s)
 Done sleeping
 Done sleeping
 Return Value: 1
 Return Value: 1
 Finished in 1.06 second(s) 

If we want to run this ten times, we will have to create two loops, one for creating the processes and another for fetching their results. A better way of doing this would be the as_completed() method. The as_completed() method returns an iterator that we can loop over to get the results of the processes as they’re completed, i.e, in the order of their completion.

See Also
multithreading in Python

 with concurrent.futures.ProcessPoolExecutor() as executor:
     secs = [5, 4, 3, 2, 1]
     pool = [executor.submit(useless_function, i) for i in secs]
     for i in concurrent.futures.as_completed(pool):
         print(f'Return Value: {i.result()}')
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 5 second(s)
 Sleeping for 4 second(s)
 Done sleeping
 Sleeping for 3 second(s)
 Return Value: 4
 Done sleeping
 Sleeping for 2 second(s)
 Return Value: 5
 Done sleeping
 Done sleeping
 Sleeping for 1 second(s)
 Return Value: 2
 Return Value: 3
 Done sleeping
 Return Value: 1
 Finished in 6.07 second(s) 

To avoid using loops altogether, we can use the map() method. This map() method is similar to the built-in map() method; it runs the function for every item of the iterable we pass in. It just uses processes rather than doing it sequentially. And instead of returning a futures object, it returns an iterable containing the results. These results are in the order the processes were started, not in the order they are completed. Another thing to note is that if our function raises an exception, it won’t raise it while running the process; the exception will be raised when its value is retrieved from the results iterator. 

 start = time.perf_counter()
 with concurrent.futures.ProcessPoolExecutor() as executor:
     secs = [5, 4, 3, 2, 1]
     pool = executor.map(useless_function, sec)
     for res in pool:
         print(f'Return Value: {res}')
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} second(s)') 
-----------------------------Output-----------------------------
 Sleeping for 5 second(s)
 Sleeping for 4 second(s)
 Done sleeping
 Sleeping for 3 second(s)
 Done sleeping
 Sleeping for 2 second(s)
 Return Value: 5
 Return Value: 4
 Done sleeping
 Done sleeping
 Sleeping for 1 second(s)
 Return Value: 3
 Return Value: 2
 Done sleeping
 Return Value: 1
 Finished in 6.06 second(s) 

Parallelized Image Augmentation 

To demonstrate the use of multiprocessing in a somewhat realistic setting we will continue with the images example used in the multithreading article, and perform some image augmentation on the images we downloaded from Pexels. Although image augmentation is a computation-intensive task, it is by no means the perfect use case for multiprocessing because it does involve a fair bit of I/O operations.

Running the Image Augmentation Function Sequentially 

 from PIL import Image, ImageFilter
 file_names = ['305821.jpg', '509922.jpg', '325812.jpg',
             '1252814.jpg', '1420709.jpg', '963486.jpg',
             '1557183.jpg', '3023211.jpg', '1031641.jpg',
             '439227.jpg', '696644.jpg', '911254.jpg',
             '1001990.jpg', '3518623.jpg', '916044.jpg']
 start = time.perf_counter()
 size = (1200, 1200)
 def augment_image(img_name):
     img = Image.open(img_name)
     img = img.filter(ImageFilter.GaussianBlur(15))
     img.thumbnail(size)
     img.save(f'augmented-{img_name}')
     print(f'{img_name} was augmented...')
 for f in file_names:
     augment_image(f)
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} seconds') 
-----------------------------Output-----------------------------
 305821.jpg was augmented...
 509922.jpg was augmented...
 325812.jpg was augmented...
 1252814.jpg was augmented...
 1420709.jpg was augmented...
 963486.jpg was augmented...
 1557183.jpg was augmented...
 3023211.jpg was augmented...
 1031641.jpg was augmented...
 439227.jpg was augmented...
 696644.jpg was augmented...
 911254.jpg was augmented...
 1001990.jpg was augmented...
 3518623.jpg was augmented...
 916044.jpg was augmented...
 Finished in 20.66153374500027 seconds 

Running the Function in Parallel using Multiprocessing

 start = time.perf_counter()
 with concurrent.futures.ProcessPoolExecutor() as executor:
     executor.map(augment_image, file_names)
 end = time.perf_counter()
 print(f'Finished in {round(end-start, 2)} seconds') 
-----------------------------Output-----------------------------
 509922.jpg was augmented...
 305821.jpg was augmented...
 325812.jpg was augmented...
 1420709.jpg was augmented...
 1252814.jpg was augmented...
 963486.jpg was augmented...
 1557183.jpg was augmented...
 3023211.jpg was augmented...
 1031641.jpg was augmented...
 696644.jpg was augmented...
 911254.jpg was augmented...
 1001990.jpg was augmented...
 439227.jpg was augmented...
 3518623.jpg was augmented...
 916044.jpg was augmented...
 Finished in 8.63 seconds 

Using multiprocessing enables the program to finish execution in almost one-third the time of sequential execution.

To learn more about the Python multiprocessing module, refer to the official documentation and thw source code.

Want to learn more about the ins and outs of Python? Check out these articles:

What Do You Think?

Join Our Telegram Group. Be part of an engaging online community. Join Here.

Subscribe to our Newsletter

Get the latest updates and relevant offers by sharing your email.

Copyright Analytics India Magazine Pvt Ltd

Scroll To Top