PySpark checkpoint(): Improve Fault Tolerance & Speed in Spark Jobs
In this tutorial, you'll learn how to use checkpoint()
in PySpark to improve the performance and reliability of your Spark jobs. This technique is especially useful when dealing with long lineage or iterative transformations in your data pipelines.
1. What is checkpoint() in PySpark?
- checkpoint() saves a copy of a DataFrame (or RDD) to reliable storage like DBFS or HDFS.
- This cuts off Spark’s lineage (history of transformations) leading to the current data.
- Helps with:
- Improving performance
- Adding fault tolerance
- Protecting against recomputation if Spark needs to restart a job
2. Create Spark Session and Set Checkpoint Directory
from pyspark.sql import SparkSession
spark = SparkSession.builder \\
.appName("PySpark checkpoint() Example") \\
.getOrCreate()
# Set checkpoint directory (required before using checkpoint)
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
3. Create Sample DataFrame
data = [
(1, "Aamir Shahzad", 35),
(2, "Ali Raza", 30),
(3, "Bob", 25),
(4, "Lisa", 28)
]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+
4. Apply a Transformation
# Example transformation: Filter people under age 28
df_filtered = df.filter(df["age"] > 28)
5. Checkpoint the Filtered DataFrame
# Apply checkpoint to save current state and cut lineage
df_checkpointed = df_filtered.checkpoint(eager=True)
6. Perform Further Operations on Checkpointed Data
# Group by name and calculate average age
result = df_checkpointed.groupBy("name").avg("age")
result.show()
+--------------+--------+
| name|avg(age)|
+--------------+--------+
| Aamir Shahzad| 35.0|
| Ali Raza| 30.0|
+--------------+--------+
| name|avg(age)|
+--------------+--------+
| Aamir Shahzad| 35.0|
| Ali Raza| 30.0|
+--------------+--------+
7. Why Use checkpoint()?
- Breaks long lineage chains to avoid stack overflow or slow jobs.
- Saves a known good state to reliable storage (disk).
- Protects against recomputation if Spark needs to restart tasks.
- Helps with complex or iterative jobs that involve looping.
No comments:
Post a Comment