PySpark localCheckpoint Tutorial
localCheckpoint()
in PySpark allows you to truncate the logical plan lineage of a DataFrame and save it in memory for performance improvements. Unlike checkpoint()
, it does not require setting up a checkpoint directory on disk, making it faster but less fault-tolerant.
🔹 Step 1: Set Up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LocalCheckpointExample").getOrCreate()
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
🔹 Step 2: Create a Sample DataFrame
data = [("Aamir Shahzad", 35), ("Ali Raza", 40), ("Bob", 29), ("Lisa", 31)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
df.show()
Output:
+-------------+---+
| name |age|
+-------------+---+
|Aamir Shahzad| 35|
|Ali Raza | 40|
|Bob | 29|
|Lisa | 31|
+-------------+---+
🔹 Step 3: Add Transformation
from pyspark.sql.functions import col
df_transformed = df.withColumn("age_plus_10", col("age") + 10)
🔹 Step 4: Apply localCheckpoint()
df_checkpointed = df_transformed.localCheckpoint()
🔹 Step 5: View Results
print("Transformed and Checkpointed DataFrame:")
df_checkpointed.show()
Output:
+-------------+---+-----------+
| name |age|age_plus_10|
+-------------+---+-----------+
|Aamir Shahzad| 35| 45|
|Ali Raza | 40| 50|
|Bob | 29| 39|
|Lisa | 31| 41|
+-------------+---+-----------+
No comments:
Post a Comment