How to Use call_udf() in PySpark | Dynamically Apply UDFs in Real-Time Data Pipelines | PySpark Tutorial

How to Use call_udf() in PySpark | Dynamically Apply Registered UDFs

How to Use call_udf() in PySpark

This tutorial teaches you how to dynamically apply registered user-defined functions (UDFs) in PySpark using call_udf(). It’s perfect for use cases where the logic needs to be reusable and flexible for different conditions or inputs.

📘 Sample Data

data = [
    (1, "apple"),
    (2, "banana"),
    (3, "carrot")
]
df = spark.createDataFrame(data, ["id", "fruit"])
df.show()

Output:

+---+------+
| id| fruit|
+---+------+
|  1| apple|
|  2|banana|
|  3|carrot|
+---+------+

🧠 Register and Use UDF with call_udf()

from pyspark.sql.functions import col, call_udf
from pyspark.sql.types import IntegerType, StringType

# Register a UDF to double an ID value
spark.udf.register("double_id", lambda x: x * 2, IntegerType())

# Apply UDF using call_udf by name
df = df.withColumn("id_doubled", call_udf("double_id", col("id")))
df.select("id", "fruit", "id_doubled").show()

Output:

+---+------+----------+
| id| fruit|id_doubled|
+---+------+----------+
|  1| apple|        2 |
|  2|banana|        4 |
|  3|carrot|        6 |
+---+------+----------+

🔤 Apply String-Based UDF

# Register another UDF that adds a prefix to fruit name
spark.udf.register("udf_add_prefix", lambda s: "Tasty " + s, StringType())

# Apply using call_udf
df = df.withColumn("fruit_tagged", call_udf("udf_add_prefix", col("fruit")))
df.select("fruit", "fruit_tagged").show()

Output:

+-------+-------------+
| fruit |fruit_tagged |
+-------+-------------+
| apple |Tasty apple  |
| banana|Tasty banana |
| carrot|Tasty carrot |
+-------+-------------+

🎥 Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.