Create Delta Table from CSV, JSON & Parquet Files in Microsoft Fabric
Microsoft Fabric Tutorial
📘 Overview
This tutorial demonstrates how to convert various file formats (CSV, JSON, and Parquet) into Delta Tables using PySpark within Microsoft Fabric Lakehouse. This is especially useful when automating ingestion pipelines from the Staging folder.
🛠️ Step-by-Step PySpark Script
from pyspark.sql.functions import input_file_name
import os
# Define the staging folder path inside Fabric Lakehouse
base_path = "Files/Staging/"
# Use Spark to list all files in the directory
files_df = spark.read.format("binaryFile").load(base_path + "*")
file_paths = [row.path for row in files_df.select("path").distinct().collect()]
# Loop through each file path
for file_path in file_paths:
file_name = os.path.basename(file_path)
file_ext = file_name.split(".")[-1].lower()
table_base = os.path.splitext(file_name)[0]
table_name = table_base + "_" + file_ext # Example: customer_csv
try:
# Read file based on extension
if file_ext == "csv":
df = spark.read.option("header", "true").csv(file_path)
elif file_ext == "json":
df = spark.read.option("multiline", "true").json(file_path)
elif file_ext == "parquet":
df = spark.read.parquet(file_path)
else:
print(f"Skipping unsupported file type: {file_ext}")
continue
# Save as Delta Table
df.write.format("delta").mode("overwrite").saveAsTable(table_name)
print(f"Table created: {table_name}")
except Exception as e:
print(f"Failed to process {file_path}: {e}")
💡 Why Use Delta Tables?
- Supports ACID transactions and time travel
- Optimized for analytical workloads in Fabric
- Can be queried using both Spark and T-SQL
- Perfect for structured storage of semi-structured data
📁 Use Case
Imagine receiving customer data in different formats (CSV, JSON, Parquet). This notebook allows you to bulk convert those to reliable and high-performance Delta Tables — ready for downstream reporting and machine learning workflows.
No comments:
Post a Comment
Note: Only a member of this blog may post a comment.