PySpark inputFiles() Function Explained: Get Source File Paths from DataFrame
The input_file_name()
function in PySpark is useful when you need to identify the source file of each row in a DataFrame, especially when working with multiple files from blob or log storage.
Step 1: Read Multiple Files into a DataFrame
from pyspark.sql.functions import input_file_name
# Read JSON files from blob storage (you can also use .csv or .parquet)
df = spark.read.option("header", True).json("your_blob_path/*.json")
# Show sample data
df.show()
Output:
+---+-------+-----+
| id| name|dept |
+---+-------+-----+
| 1| Aamir | IT |
| 2| Lisa | HR |
| 3| Bob | FIN |
+---+-------+-----+
Step 2: Add Source File Path to Each Row
from pyspark.sql.functions import input_file_name
df_with_filename = df.withColumn("filename", input_file_name())
df_with_filename.show()
Output:
+---+-------+-----+---------------------------------------------+
| id| name|dept |filename |
+---+-------+-----+---------------------------------------------+
| 1| Aamir | IT |dbfs:/mnt/data/file1.json |
| 2| Lisa | HR |dbfs:/mnt/data/file1.json |
| 3| Bob | FIN |dbfs:/mnt/data/file2.json |
+---+-------+-----+---------------------------------------------+
Step 3: Clean File Name Using Regex
from pyspark.sql.functions import regexp_extract
df_clean = df_with_filename.withColumn(
"filename", regexp_extract("filename", r"([^/]+$)", 1)
)
df_clean.show()
Output:
+---+-------+-----+----------+
| id| name|dept | filename |
+---+-------+-----+----------+
| 1| Aamir | IT |file1.json|
| 2| Lisa | HR |file1.json|
| 3| Bob | FIN |file2.json|
+---+-------+-----+----------+
This tutorial on the PySpark inputFiles function is extremely relevant and essential for tracking source files in a DataFrame! It's amazing how such a simple quick fix block blast can hinder your data processing efficiency!
ReplyDelete