Basic Pandarallel Demonstration for Single Node Environment
This Jupyter benchmark is a simple use case for the pandarallel parallel_apply
call.
It can be run with dragon
and base multiprocessing to compare performance on your machine.
The program demonstrates how to use parallel_apply
, the multiprocessing verison of pandas apply
, on a pandas dataframe with random input.
The code demonstrates the following key concepts working with Dragon:
How to write basic programs that can run with Dragon and base multiprocessing
How to use pandarallel and pandas with Dragon and base multiprocessing
How pandarallel handles various dtypes
# ---
# jupyter:
# jupytext:
# text_representation:
# extension: .py
# format_name: light
# format_version: '1.5'
# jupytext_version: 1.15.2
# kernelspec:
# display_name: Python 3 (ipykernel)
# language: python
# name: python3
# ---
# # `pandarallel` with Controlled Number of Progress Bars
#
# Up until at least version 1.6.4 of `pandarallel`, it displayed 1 progress bar from every 1 worker process. With a sufficiently large number of workers, this becomes overwhelming.
#
# This notebook demos a modification to `pandarallel` which exposes control over how many progress bars should be displayed and maps each worker process to one and only one of those progress bars. In a multi-node `dragon` execution configuration (which is _not_ demonstrated here), some nodes may be slower/faster than others and it may be helpful to see the relative progress/speed of one cluster's nodes versus others -- this motivates showing more than just a single progress bar representing all workers.
# !pip install pandarallel
# +
import dragon
import multiprocessing
import cloudpickle
import numpy as np
import pandas as pd
import time
import pandarallel; pandarallel.__version__
# -
multiprocessing.set_start_method("dragon")
pandarallel.core.dill = cloudpickle
ctx = multiprocessing.get_context("dragon")
ctx.Manager = type("PMgr", (), {"Queue": ctx.Queue})
pandarallel.core.CONTEXT = ctx
pandarallel.pandarallel.initialize(progress_bar=True)
# +
num_rows = 10
df = pd.DataFrame(
{
"seqnum": np.arange(42, (42 + num_rows), dtype=int),
#"metric_A": np.random.rand(num_rows),
#"metric_B": np.random.rand(num_rows),
"metric_C": np.random.rand(num_rows),
"alt_seq": np.random.randint(low=42, high=(42 + num_rows), size=(num_rows,)),
"label": np.array(list("ATCG"))[np.random.randint(0, 4, num_rows)],
},
)
# -
df.head()
# The use of a global variable inside a lambda function demonstrates key functionality from `cloudpickle` that is not otherwise available through `dill`.
cutoff = 0.3
# Running this next cell will cause as many progress bars to be displayed as there are workers (potentially a lot).
start = time.monotonic()
df['highlow_C'] = df['metric_C'].parallel_apply(lambda x: x < cutoff)
stop = time.monotonic()
tot_time = stop - start
time_dict = {}
time_dict["1"] = tot_time
# Now we have our new column of values in our `pandas.DataFrame`.
df.head()
# We can change our minds about how many progress bars to display, at will.
pandarallel.pandarallel.initialize(progress_bar=10) # Will display a total of 10 progress bars.
start = time.monotonic()
df['highlow_C'] = df['metric_C'].parallel_apply(lambda x: x < cutoff)
stop = time.monotonic()
tot_time = stop - start
time_dict["2"] = tot_time
# There will be plenty of use cases / scenarios where a single progress bar is all we want.
pandarallel.pandarallel.initialize(progress_bar=1) # Will display 1 progress bar representing all workers.
start = stop = time.monotonic()
df['highlow_C'] = df['metric_C'].parallel_apply(lambda x: x < cutoff)
stop = time.monotonic()
tot_time = stop - start
time_dict["3"] = tot_time
print("parallel_apply","\t", "Time (nanoseconds)")
for key, value in time_dict.items():
print("{:<20} {:<20}".format(key, value))
# Though it is very minor compared to the overall wall time, reducing the number of progress bars displayed can shave off a small amount of execution time.