import pandas as pd
import concurrent.futures
import timeEfficient Parallel Processing in Python with concurrent.futures
Introduction
ThreadPoolExecutor and ProcessPoolExecutor are classes from Python’s concurrent.futures library, which provides a high-level interface for concurrent programming.
ThreadPoolExecutor uses a pool of threads to execute tasks concurrently, making it suitable for I/O-bound operations where tasks spend time waiting for external resources. It allows for efficient multitasking by leveraging threads within the same process.
ProcessPoolExecutor employs a pool of separate processes to handle tasks, ideal for CPU-bound operations that require substantial computation. Each process runs independently with its own memory space, avoiding the Global Interpreter Lock (GIL) and enabling true parallelism.
Importing libraries
Importing data
The sample data provided includes weekly sales figures and associated marketing spend across various channels like branded and non-branded search, Facebook, print, and more. The dataset spans 5 years, capturing these metrics for a more extensive period.
| Week | sales | branded_search_spend | nonbranded_search_spend | facebook_spend | print_spend | ooh_spend | tv_spend | radio_spend | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 7/23/17 | 58850.0 | 1528.8 | 463.32 | 802.620 | 0 | 0 | 0 | 0 |
| 1 | 7/30/17 | 62050.0 | 1575.6 | 468.00 | 819.312 | 0 | 0 | 0 | 0 |
| 2 | 8/6/17 | 59388.0 | 1544.4 | 477.36 | 749.034 | 0 | 0 | 0 | 0 |
| 3 | 8/13/17 | 56964.0 | 1528.8 | 468.00 | 741.468 | 0 | 0 | 0 | 0 |
| 4 | 8/20/17 | 53460.0 | 1560.0 | 458.64 | 811.200 | 0 | 0 | 0 | 0 |
Function to get summary statistics
The function uses the describe() method from pandas to generate summary statistics for the specified columns in the DataFrame. This method provides key statistics such as mean, standard deviation, min, and max values.
# Task function: Perform data analysis (summary statistics for each task)
def task(n):
time.sleep(n) # Simulating a time-consuming task
summary_stats = df[['sales', 'branded_search_spend', 'nonbranded_search_spend', 'facebook_spend']].describe()
summary_stats = summary_stats.round(2)
return summary_stats1. Function using normal method
# Number of tasks and task duration
num_tasks = 5
task_duration = 1 # 1 second for each task
# Measure sequential execution time
start_time = time.time()
for _ in range(num_tasks):
result = task(task_duration)
sequential_time = time.time() - start_time2. Function using ThreadpoolExecutor()
ThreadPoolExecutor is a class from the concurrent.futures module that allows you to manage a pool of threads for concurrent execution.
Code Explanation:
- ThreadPoolExecutor() creates a thread pool for concurrent execution.
- executor.submit(task, task_duration) schedules the task function to run with task_duration as its argument, repeating for num_tasks.
- concurrent.futures.wait(futures) waits for all scheduled tasks to complete.
# Measure execution time using ThreadPoolExecutor
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(task, task_duration) for _ in range(num_tasks)]
concurrent.futures.wait(futures)
thread_pool_time = time.time() - start_timeNote: You can specify the maximum number of threads(max_workers=4) in the pool by passing the max_workers parameter to ThreadPoolExecutor().
3. Function using ProcessPoolExecutor()
ProcessPoolExecutor is also a class from the concurrent.futures module that manages a pool of processes for parallel execution.
# Measure execution time using ProcessPoolExecutor
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(task, task_duration) for _ in range(num_tasks)]
concurrent.futures.wait(futures)
process_pool_time = time.time() - start_timeNote: Just like ThreadPoolExecutor we can use (max_workers=4) in ProcessPoolExecutor().
Displaying Output
print("\nSummary statistics from one of the tasks:")
result
Summary statistics from one of the tasks:
| sales | branded_search_spend | nonbranded_search_spend | facebook_spend | |
|---|---|---|---|---|
| count | 260.00 | 260.00 | 260.00 | 260.00 |
| mean | 68107.55 | 2033.68 | 402.38 | 1015.26 |
| std | 19608.83 | 902.96 | 200.05 | 453.05 |
| min | 29088.00 | 1375.92 | 229.32 | 660.23 |
| 25% | 54405.00 | 1556.10 | 238.68 | 772.04 |
| 50% | 64442.07 | 1606.80 | 351.00 | 827.19 |
| 75% | 83872.11 | 1928.16 | 478.38 | 997.54 |
| max | 113762.88 | 7800.00 | 1093.69 | 3900.00 |
Display the result timing for all 3 methods
# Display the results
print(f"Sequential execution time: {sequential_time:.2f} seconds")
print(f"ThreadPoolExecutor time: {thread_pool_time:.2f} seconds")
print(f"ProcessPoolExecutor time: {process_pool_time:.2f} seconds")| Methods | Time Taken |
|---|---|
| Sequential execution time | 5.06 seconds |
| ThreadPoolExecutor time | 1.05 seconds |
| ProcessPoolExecutor time | 0.27 seconds |
Inference
ThreadPoolExecutor cut execution time from 5.06 to 1.05 seconds, and ProcessPoolExecutor reduced it further to 0.27 seconds, showing superior performance for CPU-bound tasks.
Conclusion
ThreadPoolExecutor is ideal for tasks that involve significant waiting, as it efficiently manages multiple threads within the same process. ProcessPoolExecutor is best for CPU-intensive tasks, providing true parallelism by using separate processes. Selecting the right executor enhances performance by aligning the concurrency model with the specific needs of your tasks.