PySpark foreach()
Function Tutorial – Apply Custom Logic to Each Row
Introduction
The foreach()
function in PySpark allows you to apply a custom Python function to each row of a DataFrame. This is commonly used when you want to perform actions like sending records to external databases or writing logs—operations that have side effects.
1. Create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ForeachExample").getOrCreate()
2. Create Sample DataFrame
data = [
("Aamir Shahzad", "Pakistan", 30),
("Ali Raza", "USA", 28),
("Bob", "UK", 45),
("Lisa", "Canada", 33)
]
columns = ["Name", "Country", "Age"]
df = spark.createDataFrame(data, columns)
df.show()
Output:
+--------------+--------+---+
| Name| Country|Age|
+--------------+--------+---+
|Aamir Shahzad |Pakistan| 30|
| Ali Raza| USA| 28|
| Bob| UK| 45|
| Lisa| Canada| 33|
+--------------+--------+---+
3. Define a foreach Function
def process_row(row):
print(f"Processing: {row.Name} from {row.Country}, Age: {row.Age}")
4. Apply foreach()
on the DataFrame
df.foreach(process_row)
Note: Output won’t appear in the notebook UI—it’s executed on worker nodes.
5. Optional: Simulate foreach using collect()
def print_row(row):
print(f"Simulated: {row.Name} is {row.Age} years old from {row.Country}")
for row in df.collect():
print_row(row)
Output:
Simulated: Aamir Shahzad is 30 years old from Pakistan
Simulated: Ali Raza is 28 years old from USA
Simulated: Bob is 45 years old from UK
Simulated: Lisa is 33 years old from Canada
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.