RDD vs DataFrame in PySpark – Key Differences with Real Examples | PySpark for Beginners

RDD vs DataFrame in PySpark | Key Differences with Examples

RDD vs DataFrame in PySpark | Key Differences with Examples

Overview

In this tutorial, we explore the fundamental differences between RDD (Resilient Distributed Dataset) and DataFrame in PySpark. You will learn how both are used, when to prefer one over the other, and how their performance and schema-handling differ in real data engineering scenarios.

1️⃣ What is RDD?

RDD is a low-level object for distributed data processing. It's immutable, fault-tolerant, and supports functional transformations using methods like map(), flatMap(), filter().

rdd = spark.sparkContext.parallelize([("Alice", 30), ("Bob", 25)])
print(rdd.collect())

2️⃣ What is DataFrame?

DataFrame is a distributed collection of data organized into named columns, like a table. It's optimized via Catalyst and Tungsten engines for performance, and offers SQL support.

from pyspark.sql import Row

data = [Row(name="Alice", age=30), Row(name="Bob", age=25)]
df = spark.createDataFrame(data)
df.show()

3️⃣ RDD vs DataFrame Comparison Table

FeatureRDDDataFrame
SchemaNot enforcedEnforced (Column names/types)
PerformanceSlowerOptimized (Catalyst)
Ease of UseHarder (more code)Easier (SQL & APIs)
Use CaseComplex transformations, unstructured dataStructured data, analytics, ML pipelines

๐Ÿ“บ Watch Full Video

Some of the contents in this website were created with assistance from ChatGPT and Gemini

Boost PySpark Performance with Broadcast Variables & Accumulators | PySpark Tutorial #pyspark

Boost PySpark Performance with Broadcast Variables & Accumulators

Boost PySpark Performance with Broadcast Variables & Accumulators

๐Ÿš€ Introduction

In distributed computing with PySpark, sharing large lookup datasets and tracking global counters efficiently can greatly improve performance. This tutorial demonstrates how to use broadcast variables and accumulators to reduce shuffle, avoid redundant copies, and enhance parallel computations.

๐Ÿ“˜ What Are Broadcast Variables?

Broadcast variables allow you to cache a large read-only dataset on all worker nodes, preventing it from being re-sent with every task.

# Broadcast example
lookup_set = set(["apple", "banana", "orange", "grape", "kiwi"])
broadcast_var = sc.broadcast(lookup_set)

# Use inside transformation
matches = rdd.filter(lambda word: word in broadcast_var.value)
print(matches.collect())

๐Ÿงฎ What Are Accumulators?

Accumulators are write-only variables used for counters or metrics in parallel computations.

# Accumulator example
clicks = sc.accumulator(0)

def count_home_clicks(page):
    if page == "home":
        clicks.add(1)
    return page

click_rdd = rdd.map(count_home_clicks)
print("Accumulator Value for 'home' clicks:", clicks.value)

๐ŸŽฅ Watch the Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini

Optimize Spark Shuffles in PySpark: groupByKey vs reduceByKey Explained with Real Examples | PySpark Tutorial

Optimize Spark Shuffles: groupByKey vs reduceByKey | PySpark Internals Explained

Optimize Spark Shuffles: groupByKey vs reduceByKey

Learn how Spark handles shuffles behind the scenes and why reduceByKey is more efficient than groupByKey.

๐Ÿง  What You'll Learn

  • What is a Spark shuffle?
  • Why reduceByKey is more performant than groupByKey
  • Real code examples with data output
  • Tips to improve Spark job efficiency

⚙️ Code Example: groupByKey

data = [("Aamir", 100), ("Ali", 200), ("Aamir", 300), ("Raza", 150), ("Ali", 50)]
rdd = spark.sparkContext.parallelize(data)
grouped = rdd.groupByKey()
print("GroupByKey Result:")
for k, v in grouped.collect():
    print(k, list(v))

⚙️ Code Example: reduceByKey

reduced = rdd.reduceByKey(lambda a, b: a + b)
print("reduceByKey Result:")
print(reduced.collect())

๐Ÿ“Š Output Comparison

  • groupByKey: Results in more data shuffled across partitions.
  • reduceByKey: Performs local aggregation before the shuffle, minimizing data transfer.

๐Ÿ“บ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

RDD Persistence in PySpark Explained | MEMORY_ONLY vs MEMORY_AND_DISK with Examples #pyspark

RDD Persistence in PySpark | MEMORY_ONLY vs MEMORY_AND_DISK Explained

RDD Persistence in PySpark

Understand how RDD caching and persistence work in PySpark, especially the differences between MEMORY_ONLY and MEMORY_AND_DISK.

๐Ÿ“Œ Why Use Persistence?

When working with large datasets and performing iterative operations (like in loops or ML algorithms), recalculating the RDD can be time-consuming. Caching or persisting saves the RDD in memory or disk for reuse.

๐Ÿง  Storage Levels

  • MEMORY_ONLY: Stores in RAM (fails if not enough memory).
  • MEMORY_AND_DISK: Stores in RAM, spills to disk if needed.

⚙️ Example Code

from pyspark import SparkContext, StorageLevel

sc = SparkContext("local", "RDD Persistence Demo")
data = ["Ali", "Aamir", "Fatima", "Aamir", "Ali"]
rdd = sc.parallelize(data)

# Count each word
mapped_rdd = rdd.map(lambda name: (name, 1))
print(mapped_rdd.collect())

# Persist the RDD in memory and disk
mapped_rdd.persist(StorageLevel.MEMORY_AND_DISK)

# Count by key (1)
print("Count by Key (1):")
print(mapped_rdd.reduceByKey(lambda a, b: a + b).collect())

# Group by key and display values
print("Group by Key:")
print(mapped_rdd.groupByKey().mapValues(list).collect())

# Unpersist when done
mapped_rdd.unpersist()

๐ŸŽฅ Watch Video Tutorial

Some of the contents

RDD Actions in PySpark: collect(), count(), reduce() Explained with Real Examples | PySpark Tutorial

Understanding RDD Actions in PySpark | collect(), count(), reduce() Explained

Understanding RDD Actions in PySpark

Learn the difference between collect(), count(), and reduce() in PySpark through examples and output.

๐Ÿ“˜ Introduction

In PySpark, RDD actions are used to trigger the execution of transformations and return results. Unlike transformations (which are lazy), actions cause Spark to actually process the data.

๐Ÿงช Sample RDD

data = [10, 20, 30, 40, 50, 60]
rdd = spark.sparkContext.parallelize(data)

๐Ÿ”น collect()

Returns all elements to the driver:

rdd.collect()
# Output: [10, 20, 30, 40, 50, 60]

๐Ÿ”น count()

Returns the number of elements in the RDD:

rdd.count()
# Output: 6

๐Ÿ”น reduce()

Aggregates the RDD using a function:

rdd.reduce(lambda a, b: a + b)
# Output: 210

๐Ÿ’ก Summary

  • collect() → Brings all data to the driver (use with caution on large datasets)
  • count() → Returns number of elements
  • reduce() → Returns aggregated result

๐Ÿ“บ Watch the Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

PySpark PairRDD Transformations : groupByKey, reduceByKey, sortByKey Explained with Real Data | PySpark Tutorial

Mastering PySpark PairRDD Transformations | groupByKey, reduceByKey, sortByKey

Mastering PySpark PairRDD Transformations

groupByKey, reduceByKey, sortByKey Explained with Real Data

In this tutorial, we explore three essential PairRDD transformations in PySpark: groupByKey(), reduceByKey(), and sortByKey(). These functions enable you to group, reduce, and sort data using key-value pairs — powerful operations for distributed data processing.

๐Ÿ”น Step 1: Sample Data

data = [
    ("Alice", 300),
    ("Bob", 150),
    ("Alice", 200),
    ("Raza", 450),
    ("Bob", 100),
    ("Raza", 50)
]
rdd = spark.sparkContext.parallelize(data)

๐Ÿ”น Step 2: groupByKey()

grouped_rdd = rdd.groupByKey()
for k, v in grouped_rdd.collect():
    print(k, list(v))

๐Ÿ”น Step 3: reduceByKey()

reduced_rdd = rdd.reduceByKey(lambda a, b: a + b)
print(reduced_rdd.collect())

๐Ÿ”น Step 4: sortByKey()

sorted_rdd = reduced_rdd.sortByKey()
print(sorted_rdd.collect())

๐Ÿ“บ Watch the Full Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

PySpark RDD Transformations | map, flatMap, filter, distinct, Word Count | PySpark Tutorial

PySpark RDD Transformations Explained | map, flatMap, filter, distinct, Word Count

PySpark RDD Transformations Explained

This post covers how to use the most essential RDD transformations in PySpark. We’ll walk through map, flatMap, filter, distinct, and apply them in a Word Count example — all fundamental for big data processing.

1. Create a Spark Session

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("rdd_transformations_demo").getOrCreate()

2. Load Lines into RDD

lines = spark.sparkContext.parallelize([
    "Aamir loves Spark",
    "Ali and Raza love PySpark",
    "Spark is fast",
    "Ali loves big data"
])

3. Tokenize Text into Words

words_rdd = lines.flatMap(lambda line: line.split(" "))
words_rdd.collect()

4. Convert Words to (word, 1)

pair_rdd = words_rdd.map(lambda word: (word, 1))
pair_rdd.collect()

5. Filter Out Short Words

filtered_rdd = words_rdd.filter(lambda word: word != "is")
filtered_rdd.collect()

6. Distinct Words

distinct_rdd = words_rdd.distinct()
distinct_rdd.collect()

7. Count Word Occurrences

word_count_rdd = pair_rdd.reduceByKey(lambda a, b: a + b)
word_count_rdd.collect()

✅ Output Example:

[('Ali', 2), ('Spark', 2), ('Aamir', 1), ('loves', 3), ('PySpark', 1), ('Raza', 1), ('big', 1), ('data', 1)]

๐Ÿ“บ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

What Are RDD Partitions in PySpark? | How Spark Partitioning works | PySpark Tutorial #pyspark

What Are RDD Partitions in PySpark? | Full Guide with Examples

What Are RDD Partitions in PySpark?

Understanding how RDD partitions work is crucial for optimizing your PySpark applications. This guide walks you through the concept of partitioning, partition count, and how to tune performance using `repartition()` and `coalesce()` functions.

๐Ÿ” What is an RDD Partition?

Partitions are logical chunks of data in a distributed system. Spark uses partitions to divide the data for parallel processing across worker nodes. More partitions mean more parallelism and better scalability.

๐Ÿ“Œ Step 1: Create RDD with Default Partitions

data = ["Aamir", "Ali", "Raza", "Bob", "Lisa"]
rdd = spark.sparkContext.parallelize(data)
print("Original Partitions:", rdd.getNumPartitions())

๐Ÿ“Œ Step 2: Repartition RDD to 5

rdd_repart = rdd.repartition(5)
print("After Repartition to 5:", rdd_repart.getNumPartitions())

๐Ÿ“Œ Step 3: Coalesce Back to 2

rdd_coalesce = rdd_repart.coalesce(2)
print("After Coalesce to 2:", rdd_coalesce.getNumPartitions())

๐Ÿ“Œ Step 4: Show How Data is Distributed

print("Data in Each Partition (Final RDD):")
print(rdd_coalesce.glom().collect())

๐ŸŽฅ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

What is Azure Data Explorer in Synapse? Overview and Use Cases Explained | Azure Synapse Analytics Tutorial

What is Azure Data Explorer in Synapse? Overview and Use Cases

What is Azure Data Explorer in Synapse? Overview and Use Cases Explained

๐Ÿ“˜ Overview

Azure Data Explorer (ADX) is a high-performance, fully managed analytics service optimized for analyzing large volumes of telemetry, log, and time-series data. Within Azure Synapse Analytics, ADX is available as an integrated workspace experience, making it easier to ingest and query massive datasets using Kusto Query Language (KQL).

⚡ Why Use Azure Data Explorer in Synapse?

  • Blazing fast ingestion of structured, semi-structured, and unstructured data
  • Support for ad-hoc and near real-time analytics
  • Native support for time-series analysis
  • Works seamlessly with Spark, Serverless SQL, and Pipelines in Synapse

๐Ÿ” What is KQL?

Kusto Query Language (KQL) is the query language used by ADX. It's designed for fast read-only queries on large datasets, especially logs and telemetry.

Example KQL Query

StormEvents
| where State == "TEXAS"
| summarize Count = count() by EventType

๐Ÿง  Key Features

  • Highly optimized for analytical workloads (vs. transactional)
  • Data visualization integration with Power BI
  • Data retention policies and cache control
  • Time-series operations like make-series, summarize, render

๐ŸŽฏ Common Use Cases

  • IoT telemetry data exploration
  • Application performance monitoring
  • Clickstream and user behavior analytics
  • Real-time log investigation and alerting

๐Ÿ”ง How to Get Started in Synapse

  1. Create an ADX pool in your Synapse workspace
  2. Ingest data using pipelines, notebooks, or KQL
  3. Use KQL to explore and analyze data

๐Ÿ” Integration Highlights

  • Synapse Pipelines: for scheduled and automated ingestion
  • Notebooks: combine Spark & KQL in one environment
  • Power BI: for real-time dashboards

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

PySpark Tutorial : Easy Ways to Create RDD in PySpark | Beginner Guide with Real Examples

Different Ways to Create RDD in PySpark | Step-by-Step Examples

Different Ways to Create RDD in PySpark

This tutorial walks you through multiple practical ways to create RDDs (Resilient Distributed Datasets) in PySpark, a fundamental concept in Apache Spark. Whether you're building data pipelines or preparing for Spark interviews, these examples will help you get started confidently.

1. Using parallelize()

This is the simplest way to create an RDD from an in-memory Python list, ideal for testing or small datasets:

numbers = [1, 2, 3, 4, 5]
rdd_parallel = spark.sparkContext.parallelize(numbers)
print(rdd_parallel.collect())

2. Using textFile()

Loads a text file into an RDD, where each line becomes a single record:

rdd_text = spark.sparkContext.textFile("path/to/textfile.txt")
print(rdd_text.take(5))

3. Using wholeTextFiles()

Reads an entire directory of small text files, each returning a tuple of (filename, content):

rdd_whole = spark.sparkContext.wholeTextFiles("path/to/folder")
print(rdd_whole.take(1))

๐Ÿ“บ Watch the Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Load Lakehouse File to Warehouse Table via Copy Assistant | Microsoft Fabric Tutorial #fabrictutorial

Load Lakehouse File to Warehouse Table via Copy Assistant | Microsoft Fabric Tutorial

Load Lakehouse File to Warehouse Table via Copy Assistant

In this Microsoft Fabric tutorial, you’ll learn how to use the Copy Assistant feature in Data Pipelines to load data from a Lakehouse file into a Warehouse table. This no-code interface simplifies data movement for analysts and engineers alike.

✅ What is Copy Assistant in Fabric Pipelines?

  • A guided interface in Microsoft Fabric for building Copy activities
  • Helps you configure source-to-destination mappings without writing code
  • Ideal for simple Lakehouse to Warehouse transfers

๐Ÿ”— Connecting to the Lakehouse & Warehouse

  1. Start a new Data Pipeline in Microsoft Fabric
  2. Click Copy Assistant from the toolbar
  3. Select the source as Lakehouse and choose your file (CSV, Parquet, etc.)
  4. Select the destination as your Warehouse table

๐Ÿ“Š Mapping Columns

Copy Assistant provides an intuitive column-mapping screen where:

  • Column names from your file are matched to the Warehouse schema
  • You can manually correct any mismatches or transformations
  • Data types are validated to ensure consistency

๐Ÿš€ Executing and Validating

  1. Once configuration is done, click Run Pipeline
  2. Monitor the execution via the Activity Monitor
  3. Check row count and preview records in your target Warehouse table

๐Ÿ’ก Best Practices

  • Ensure your Lakehouse file format matches expected schema (CSV with headers, correct delimiters, etc.)
  • Use Preview before final execution to validate field mappings
  • Check Warehouse table for primary keys or constraints that could block inserts
  • Use Dataflow Gen2 for more advanced transformations, if needed

๐Ÿž Common Troubleshooting Tips

  • Mapping errors: Mismatch in data types or missing columns
  • Permission issues: Ensure Fabric roles allow write access to destination
  • Blank outputs: Check that source file actually contains rows

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

What is RDD in PySpark? | A Beginner’s Guide to Apache Spark’s Core Data Structure | PySpark Tutorial

What is RDD in PySpark? | Learn Spark’s Core Data Structure

๐Ÿ”ฅ What is RDD in PySpark?

RDD (Resilient Distributed Dataset) is the fundamental data structure in Apache Spark, representing an immutable, distributed collection of objects. This tutorial helps you understand what RDDs are, how they work, and when to use them.

๐Ÿ“˜ Definition of RDD

RDD is a low-level abstraction in PySpark. It provides:

  • Fault-tolerance: Can recover from node failures
  • Immutability: Once created, it cannot be changed
  • Partitioning: Internally split across nodes

๐Ÿงช Create RDD Example

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("rdd_intro").getOrCreate()
sc = spark.sparkContext

data = [("Alice", 28), ("Bob", 35), ("Charlie", 40), ("Diana", 23)]
rdd = sc.parallelize(data, 2)
print("Partition Count:", rdd.getNumPartitions())

Output:

Partition Count: 2

๐Ÿ”„ Basic Transformations

# map() transformation to format data
mapped = rdd.map(lambda x: f"{x[0]} is {x[1]} years old")
for item in mapped.collect():
    print(item)

Output:

Alice is 28 years old
Bob is 35 years old
Charlie is 40 years old
Diana is 23 years old

⚖️ RDD vs DataFrame

  • RDD: More control, lower-level, suitable for complex operations
  • DataFrame: Optimized, higher-level API, supports SQL queries

๐Ÿ“Œ When to Use RDD

  • When you need fine-grained control over transformations
  • When you’re working with unstructured or complex data
  • When DataFrame/Dataset APIs do not support your logic

๐ŸŽฅ Watch Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Build Sales Analytics Model – Star Schema, Semantic Model & Power BI | Microsoft Fabric Tutorial

Build Sales Analytics Model – Star Schema, Semantic Model & Power BI | Microsoft Fabric Tutorial

Build Sales Analytics Model – Star Schema, Semantic Model & Power BI

In this Microsoft Fabric tutorial, you’ll learn how to build an end-to-end Sales Analytics model using a star schema, semantic model, and Power BI integration with Direct Lake for blazing-fast performance.

๐Ÿง  What is a Semantic Model in Microsoft Fabric?

A Semantic Model is a business-friendly layer on top of your raw data that enables easier analysis and reporting.

  • Helps users understand and explore the data model intuitively
  • Supports business measures using DAX (e.g., Total Sales = SUM(FactSales[TotalAmount]))
  • Defines relationships, KPIs, hierarchies, and security layers
  • Optimized for tools like Power BI and Excel

๐ŸŒŸ Star Schema Design for Sales

In this model, we structure the data using a star schema:

  • Fact Table: FactSales
  • Dimension Tables: DimCustomer, DimProduct, DimDate, DimRegion

All dimension tables are linked to the central FactSales table to enable fast and meaningful aggregations in reports.

⚡ Direct Lake Mode in Fabric

  • Reads directly from Delta tables in OneLake
  • Combines import-speed with real-time freshness
  • No need for scheduled refreshes
  • Fully integrated with Power BI

๐Ÿ“Š Sample DAX Measures

Total Sales = SUM(FactSales[TotalAmount])
Total Orders = COUNT(FactSales[OrderID])
Sales Per Customer = DIVIDE([Total Sales], DISTINCTCOUNT(DimCustomer[CustomerID]))

๐Ÿ“ˆ Visualizing in Power BI

  • Connect to the semantic model using Direct Lake
  • Create visuals: Sales Trend, Sales by Product, Customer Retention
  • Apply filters, bookmarks, and slicers for interactivity
  • Publish and share via Power BI Service

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

How to Read Data from Dedicated SQL Pool and Create CSV in ADLS Gen2 Using Azure Synapse | Azure Synapse Analytics Tutorial

Read from Dedicated SQL Pool and Create CSV in ADLS Gen2 | Azure Synapse

How to Read Data from Dedicated SQL Pool and Create CSV in ADLS Gen2 Using Azure Synapse

๐Ÿ“˜ Overview

In this tutorial, you'll learn how to extract data from a Dedicated SQL Pool using PySpark and write it to a single CSV file in Azure Data Lake Storage Gen2 (ADLS Gen2). This is a common task for exporting curated data for sharing, archiving, or downstream analytics.

๐Ÿงฑ Prerequisites

  • Azure Synapse workspace with Spark and Dedicated SQL Pool
  • Configured Linked Services and credentials
  • ADLS Gen2 container path

๐Ÿ› ️ Step-by-Step Guide

✅ Step 1: Read from Dedicated SQL Pool Table

%%pyspark
df = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", "jdbc:sqlserver://yourserver.database.windows.net:1433;database=yourDB") \
    .option("dbtable", "dbo.sales_data") \
    .option("user", "youruser") \
    .option("password", "yourpassword") \
    .load()

✅ Step 2: Repartition to Single File

df_single = df.repartition(1)

✅ Step 3: Write to ADLS Gen2 as CSV

df_single.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("abfss://export@yourstorageaccount.dfs.core.windows.net/reports/sales_output")

๐Ÿ“Œ Notes

  • Use repartition(1) to generate a single CSV file
  • You may rename the file from part-00000.csv using Azure Storage Explorer
  • Ensure Spark pool has permission to access the ADLS path

๐ŸŽฏ Use Cases

  • Exporting business reports from SQL warehouse
  • Sharing datasets with external users
  • Backup/archive structured data in CSV format

๐Ÿ“Š Output Example

The CSV file will be saved in your specified ADLS path, containing all the rows from the selected Dedicated SQL Pool table.

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

Explore Databases and Tables in PySpark with spark.catalog | A Beginner’s Guide to Metadata Management | PySpark Tutorial

Explore Databases and Tables in PySpark using spark.catalog

๐Ÿ” How to Explore Tables and Databases in PySpark using spark.catalog

Learn how to interact with databases, tables, views, cached data, and registered functions using PySpark's powerful spark.catalog interface. Ideal for metadata exploration and data pipeline control.

๐Ÿ“˜ Step 1: Inspect Current Catalog State

print("Current Database:", spark.catalog.currentDatabase())
print("Current Catalog:", spark.catalog.currentCatalog())

๐Ÿ“‚ Step 2: List Databases and Tables

print("List of Databases:")
for db in spark.catalog.listDatabases():
    print(db.name)

print("List of Tables:")
for tbl in spark.catalog.listTables():
    print(tbl.name, tbl.tableType)

๐Ÿงช Step 3: Check Existence of Tables & Databases

print("Table Exists:", spark.catalog.tableExists("demo_table"))
print("Database Exists:", spark.catalog.databaseExists("demo_db"))

๐Ÿ” Step 4: Explore Table Columns

for col in spark.catalog.listColumns("demo_table"):
    print(col.name, col.dataType)

๐Ÿงผ Step 5: Drop Temporary Views

spark.catalog.dropTempView("demo_view")

๐Ÿง  Step 6: Manage Cache

print("Is Cached:", spark.catalog.isCached("demo_view"))
spark.catalog.uncacheTable("demo_view")
spark.catalog.clearCache()

๐ŸŽฅ Watch Full Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

How to Create Notebook to Join Two CSV Files and Write to Lake Database in Synapse Pipelines | Azure Synapse Analytics Tutorial

Join Two CSV Files and Write to Lake Database Using Synapse Notebook

How to Create Notebook to Join Two CSV Files and Write to Lake Database in Synapse Pipelines

๐Ÿ“˜ Overview

In this tutorial, we’ll show you how to create a PySpark notebook in Azure Synapse Analytics that reads two CSV files, performs a join operation, and writes the result to a Lake Database table. This notebook can then be integrated into a Synapse pipeline for automated data workflows.

๐Ÿงฑ Prerequisites

  • Azure Synapse workspace with Spark Pool
  • Lake Database created (or Spark database)
  • CSV files stored in ADLS Gen2

๐Ÿ› ️ Step-by-Step Instructions

✅ Step 1: Read CSV Files

%%pyspark
df_customers = spark.read.option("header", True).csv("abfss://data@yourstorage.dfs.core.windows.net/customers.csv")
df_orders = spark.read.option("header", True).csv("abfss://data@yourstorage.dfs.core.windows.net/orders.csv")

✅ Step 2: Join the Two DataFrames

df_joined = df_customers.join(df_orders, df_customers.customer_id == df_orders.customer_id, "inner")

✅ Step 3: Write the Result to a Lake Database Table

df_joined.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("LakeDB.joined_customer_orders")

๐Ÿ“‚ Result

The output Delta table will be created inside your Lake Database and will be queryable via Spark and Serverless SQL Pools.

๐Ÿ” Integrate Notebook in Synapse Pipeline

  1. Go to Synapse Studio → Integrate → Pipeline
  2. Add “Notebook” activity
  3. Select the notebook you just created
  4. Configure Spark Pool and parameters (if any)
  5. Publish and trigger the pipeline

๐Ÿ“Œ Tips

  • Validate file schema before joining
  • Use display() for previewing data inside the notebook
  • Leverage overwrite mode for testing; use append for incremental writes

๐ŸŽฏ Use Cases

  • Merge transactional and customer data
  • Create curated data layers in Lakehouse
  • Automate data ingestion with Synapse Pipelines

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

PySpark Tutorial: How to Build User-Defined Table Functions (UDTFs) in PySpark | Split Rows

Create UDTF in PySpark | Generate Multiple Rows per Input

How to Create a UDTF (User Defined Table Function) in PySpark

Need to return multiple rows per input in Spark? Learn how to define and use UDTFs (User Defined Table Functions) in PySpark using the @udtf decorator with real-world logic.

๐Ÿ“˜ Step 1: Define a Class to Split Words

from pyspark.sql.functions import udtf

class SplitWords:
    def eval(self, text: str):
        for word in text.split(" "):
            yield (word,)

split_words_udtf = udtf(SplitWords, returnType="word: string")

Use Case: This takes a sentence and emits each word as a row.

⚡ Step 2: Use UDTF Directly in Select

from pyspark.sql.functions import lit

split_words_udtf(lit("pyspark is powerful")).show()

Output:

+---------+
|  word   |
+---------+
| pyspark |
| is      |
| powerful|
+---------+

๐Ÿ“Š Step 3: Another UDTF Returning Two Columns

@udtf(returnType="num: int, plus_one: int")
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

PlusOne(lit(5)).show()

Output:

+-----+---------+
| num | plus_one|
+-----+---------+
|  5  |    6    |
+-----+---------+

๐ŸŽฅ Full Tutorial Video

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Monitor Performance & Query Execution in Fabric Warehouse-Microsoft Fabric Tutorial #fabrictutorial

Monitor Performance & Query Execution in Fabric Warehouse | Microsoft Fabric Tutorial

Monitor Performance & Query Execution in Fabric Warehouse

Monitoring performance and query execution in Microsoft Fabric Warehouse is essential for optimizing resource usage, identifying bottlenecks, and tuning SQL workloads. In this tutorial, we’ll walk through powerful system views and dynamic management queries you can use for real-time and historical insights.

๐Ÿ” Query Insights Views in Fabric

Microsoft Fabric provides specialized views to track query and session behavior:

  • queryinsights.exec_requests_history – Tracks historical query executions
  • queryinsights.exec_sessions_history – Tracks session metadata
  • queryinsights.frequently_run_queries – Identifies most commonly executed queries
  • queryinsights.long_running_queries – Lists slowest queries for optimization
SELECT * FROM queryinsights.long_running_queries;

๐Ÿ› ️ Using SQL DMVs for Live Monitoring

Dynamic Management Views (DMVs) offer real-time insight into query activity:

  • sys.dm_exec_requests – Tracks currently running queries
  • sys.dm_exec_sessions – Lists active sessions
  • sys.dm_exec_connections – Shows active client connections
SELECT
    r.session_id,
    r.status,
    r.start_time,
    r.command,
    r.cpu_time,
    r.total_elapsed_time AS elapsed_ms,
    r.reads,
    r.writes,
    t.text AS query_text,
    s.program_name,
    s.login_name,
    s.host_name
FROM sys.dm_exec_requests r
JOIN sys.dm_exec_sessions s ON r.session_id = s.session_id
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
WHERE r.status = 'running'
ORDER BY r.total_elapsed_time DESC;

Optionally, you can use the KILL command to stop long-running or blocked sessions (admin permission required):

KILL 56;

✅ Best Practices

  • Use queryinsights views for historical trends and usage stats
  • Use DMVs for real-time analysis of live workloads
  • Investigate long-running queries and optimize indexes or logic
  • Periodically review session and resource usage patterns

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

Create Custom Column Logic in PySpark Using udf() | Easy Guide with Real Examples | PySpark Tutorial

Create Custom Column Logic with udf() in PySpark

Create Custom Column Logic with udf() in PySpark

Sometimes built-in PySpark functions just aren't enough. That’s when you use udf() to apply your own Python logic directly to Spark DataFrames.

๐Ÿ“˜ Sample Data

data = [("apple", 2), ("banana", 3), ("kiwi", 1)]
df = spark.createDataFrame(data, ["fruit", "quantity"])
df.show()

Output:

+-------+--------+
| fruit |quantity|
+-------+--------+
| apple |   2    |
| banana|   3    |
| kiwi  |   1    |
+-------+--------+

๐Ÿง  Step 1: Define a Python Function to Tag Price

def tag_price(qty: int) -> str:
    return "expensive" if qty >= 3 else "cheap"

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

tag_price_udf = udf(tag_price, StringType())

⚡ Step 2: Apply UDF to Create a New Column

df = df.withColumn("price_tag", tag_price_udf(df["quantity"]))
df.select("fruit", "quantity", "price_tag").show()

Output:

+--------+--------+-----------+
| fruit  |quantity| price_tag |
+--------+--------+-----------+
| apple  |   2    |   cheap   |
| banana |   3    | expensive |
| kiwi   |   1    |   cheap   |
+--------+--------+-----------+

๐Ÿ”ค Step 3: Another UDF to Transform Fruit Name

def shout_name(name: str) -> str:
    return name.upper() + "!"

shout_udf = udf(shout_name, StringType())
df = df.withColumn("fruit_shout", shout_udf(df["fruit"]))
df.select("fruit", "fruit_shout").show()

Output:

+--------+-------------+
| fruit  |fruit_shout  |
+--------+-------------+
| apple  |  APPLE!     |
| banana |  BANANA!    |
| kiwi   |  KIWI!      |
+--------+-------------+

๐ŸŽฅ Watch the Full Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

What Are Statistics in Fabric Data Warehouse & How to Manage Statistics | Microsoft Fabric Tutorial

What Are Statistics in Fabric Data Warehouse & How to Manage Statistics | Microsoft Fabric Tutorial

What Are Statistics in Fabric Data Warehouse & How to Manage Statistics

In this Microsoft Fabric tutorial, we explain what statistics are in a Fabric Data Warehouse, why they are critical for performance, and how to manage them manually or automatically using T-SQL.

๐Ÿ“Š What Are Statistics?

  • Statistics describe the distribution of values in a column
  • Used by the query optimizer to estimate row counts and selectivity
  • Critical to choosing an efficient query execution plan

๐Ÿ”ง Manual Statistics Management

Create, update, and inspect statistics on frequently filtered or joined columns:

-- Create statistics manually
CREATE STATISTICS DimCustomer_CustomerKey_FullScan
ON dbo.DimCustomer (CustomerKey) WITH FULLSCAN;

-- Update statistics after data changes
UPDATE STATISTICS dbo.DimCustomer (DimCustomer_CustomerKey_FullScan) WITH FULLSCAN;

-- View histogram of statistics
DBCC SHOW_STATISTICS ('dbo.DimCustomer', 'DimCustomer_CustomerKey_FullScan') WITH HISTOGRAM;

⚙️ Automatic Statistics

  • Automatically created when queries involve JOIN, GROUP BY, WHERE, or ORDER BY
  • Generated under names like _WA_Sys_...
  • Automatically refreshed when underlying data changes significantly
-- Query that triggers auto-stat creation
SELECT CustomerKey FROM dbo.DimCustomer GROUP BY CustomerKey;

-- View system-generated statistics
SELECT
    object_name(s.object_id) AS object_name,
    c.name AS column_name,
    s.name AS stats_name,
    STATS_DATE(s.object_id, s.stats_id) AS stats_update_date,
    s.auto_created
FROM sys.stats s
JOIN sys.stats_columns sc ON s.stats_id = sc.stats_id AND s.object_id = sc.object_id
JOIN sys.columns c ON sc.object_id = c.object_id AND sc.column_id = c.column_id
WHERE object_name(s.object_id) = 'DimCustomer';

๐Ÿ’ก Best Practices

  • Create stats manually for columns heavily used in WHERE and JOIN clauses
  • Use WITH FULLSCAN for higher accuracy
  • Inspect STATS_DATE to verify freshness
  • Allow auto stats to handle low-frequency changes

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.