PySpark foreachPartition()
Explained – Process DataFrame Partitions Efficiently
Introduction
The foreachPartition()
function in PySpark allows you to apply a custom function to each partition of a DataFrame. It's typically used for efficient bulk writes (e.g., to databases or files) per partition.
1. Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ForeachPartitionWithDataFrame") \
.getOrCreate()
2. Create Sample DataFrame
data = [
("Aamir Shahzad", "Pakistan", 30),
("Ali Raza", "USA", 28),
("Bob", "UK", 45),
("Lisa", "Canada", 33)
]
columns = ["Name", "Country", "Age"]
df = spark.createDataFrame(data, columns)
df.show()
Output:
+--------------+--------+---+
| Name| Country|Age|
+--------------+--------+---+
|Aamir Shahzad |Pakistan| 30|
| Ali Raza| USA| 28|
| Bob| UK| 45|
| Lisa| Canada| 33|
+--------------+--------+---+
3. Repartition the DataFrame
print(df.rdd.getNumPartitions()) # Check current number of partitions
df = df.repartition(2) # Repartition into 2
print(df.rdd.getNumPartitions())
Output:
1
2
4. Define the Custom Function
def process_partition(rows):
print("Processing a new partition:")
for row in rows:
print(f"Name: {row['Name']}, Country: {row['Country']}, Age: {row['Age']}")
5. Apply foreachPartition()
This will run on executors, and output may appear in logs (not notebook cells).
df.foreachPartition(process_partition)
6. Simulate foreachPartition with collect()
(for local testing)
for row in df.collect():
process_partition([row])
Output (Simulated):
Processing a new partition:
Name: Aamir Shahzad, Country: Pakistan, Age: 30
Processing a new partition:
Name: Ali Raza, Country: USA, Age: 28
...
No comments:
Post a Comment