Boost PySpark Performance with Broadcast Variables & Accumulators | PySpark Tutorial #pyspark

Boost PySpark Performance with Broadcast Variables & Accumulators

Boost PySpark Performance with Broadcast Variables & Accumulators

๐Ÿš€ Introduction

In distributed computing with PySpark, sharing large lookup datasets and tracking global counters efficiently can greatly improve performance. This tutorial demonstrates how to use broadcast variables and accumulators to reduce shuffle, avoid redundant copies, and enhance parallel computations.

๐Ÿ“˜ What Are Broadcast Variables?

Broadcast variables allow you to cache a large read-only dataset on all worker nodes, preventing it from being re-sent with every task.

# Broadcast example
lookup_set = set(["apple", "banana", "orange", "grape", "kiwi"])
broadcast_var = sc.broadcast(lookup_set)

# Use inside transformation
matches = rdd.filter(lambda word: word in broadcast_var.value)
print(matches.collect())

๐Ÿงฎ What Are Accumulators?

Accumulators are write-only variables used for counters or metrics in parallel computations.

# Accumulator example
clicks = sc.accumulator(0)

def count_home_clicks(page):
    if page == "home":
        clicks.add(1)
    return page

click_rdd = rdd.map(count_home_clicks)
print("Accumulator Value for 'home' clicks:", clicks.value)

๐ŸŽฅ Watch the Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini

Optimize Spark Shuffles in PySpark: groupByKey vs reduceByKey Explained with Real Examples | PySpark Tutorial

Optimize Spark Shuffles: groupByKey vs reduceByKey | PySpark Internals Explained

Optimize Spark Shuffles: groupByKey vs reduceByKey

Learn how Spark handles shuffles behind the scenes and why reduceByKey is more efficient than groupByKey.

๐Ÿง  What You'll Learn

  • What is a Spark shuffle?
  • Why reduceByKey is more performant than groupByKey
  • Real code examples with data output
  • Tips to improve Spark job efficiency

⚙️ Code Example: groupByKey

data = [("Aamir", 100), ("Ali", 200), ("Aamir", 300), ("Raza", 150), ("Ali", 50)]
rdd = spark.sparkContext.parallelize(data)
grouped = rdd.groupByKey()
print("GroupByKey Result:")
for k, v in grouped.collect():
    print(k, list(v))

⚙️ Code Example: reduceByKey

reduced = rdd.reduceByKey(lambda a, b: a + b)
print("reduceByKey Result:")
print(reduced.collect())

๐Ÿ“Š Output Comparison

  • groupByKey: Results in more data shuffled across partitions.
  • reduceByKey: Performs local aggregation before the shuffle, minimizing data transfer.

๐Ÿ“บ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

RDD Persistence in PySpark Explained | MEMORY_ONLY vs MEMORY_AND_DISK with Examples #pyspark

RDD Persistence in PySpark | MEMORY_ONLY vs MEMORY_AND_DISK Explained

RDD Persistence in PySpark

Understand how RDD caching and persistence work in PySpark, especially the differences between MEMORY_ONLY and MEMORY_AND_DISK.

๐Ÿ“Œ Why Use Persistence?

When working with large datasets and performing iterative operations (like in loops or ML algorithms), recalculating the RDD can be time-consuming. Caching or persisting saves the RDD in memory or disk for reuse.

๐Ÿง  Storage Levels

  • MEMORY_ONLY: Stores in RAM (fails if not enough memory).
  • MEMORY_AND_DISK: Stores in RAM, spills to disk if needed.

⚙️ Example Code

from pyspark import SparkContext, StorageLevel

sc = SparkContext("local", "RDD Persistence Demo")
data = ["Ali", "Aamir", "Fatima", "Aamir", "Ali"]
rdd = sc.parallelize(data)

# Count each word
mapped_rdd = rdd.map(lambda name: (name, 1))
print(mapped_rdd.collect())

# Persist the RDD in memory and disk
mapped_rdd.persist(StorageLevel.MEMORY_AND_DISK)

# Count by key (1)
print("Count by Key (1):")
print(mapped_rdd.reduceByKey(lambda a, b: a + b).collect())

# Group by key and display values
print("Group by Key:")
print(mapped_rdd.groupByKey().mapValues(list).collect())

# Unpersist when done
mapped_rdd.unpersist()

๐ŸŽฅ Watch Video Tutorial

Some of the contents

RDD Actions in PySpark: collect(), count(), reduce() Explained with Real Examples | PySpark Tutorial

Understanding RDD Actions in PySpark | collect(), count(), reduce() Explained

Understanding RDD Actions in PySpark

Learn the difference between collect(), count(), and reduce() in PySpark through examples and output.

๐Ÿ“˜ Introduction

In PySpark, RDD actions are used to trigger the execution of transformations and return results. Unlike transformations (which are lazy), actions cause Spark to actually process the data.

๐Ÿงช Sample RDD

data = [10, 20, 30, 40, 50, 60]
rdd = spark.sparkContext.parallelize(data)

๐Ÿ”น collect()

Returns all elements to the driver:

rdd.collect()
# Output: [10, 20, 30, 40, 50, 60]

๐Ÿ”น count()

Returns the number of elements in the RDD:

rdd.count()
# Output: 6

๐Ÿ”น reduce()

Aggregates the RDD using a function:

rdd.reduce(lambda a, b: a + b)
# Output: 210

๐Ÿ’ก Summary

  • collect() → Brings all data to the driver (use with caution on large datasets)
  • count() → Returns number of elements
  • reduce() → Returns aggregated result

๐Ÿ“บ Watch the Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

PySpark PairRDD Transformations : groupByKey, reduceByKey, sortByKey Explained with Real Data | PySpark Tutorial

Mastering PySpark PairRDD Transformations | groupByKey, reduceByKey, sortByKey

Mastering PySpark PairRDD Transformations

groupByKey, reduceByKey, sortByKey Explained with Real Data

In this tutorial, we explore three essential PairRDD transformations in PySpark: groupByKey(), reduceByKey(), and sortByKey(). These functions enable you to group, reduce, and sort data using key-value pairs — powerful operations for distributed data processing.

๐Ÿ”น Step 1: Sample Data

data = [
    ("Alice", 300),
    ("Bob", 150),
    ("Alice", 200),
    ("Raza", 450),
    ("Bob", 100),
    ("Raza", 50)
]
rdd = spark.sparkContext.parallelize(data)

๐Ÿ”น Step 2: groupByKey()

grouped_rdd = rdd.groupByKey()
for k, v in grouped_rdd.collect():
    print(k, list(v))

๐Ÿ”น Step 3: reduceByKey()

reduced_rdd = rdd.reduceByKey(lambda a, b: a + b)
print(reduced_rdd.collect())

๐Ÿ”น Step 4: sortByKey()

sorted_rdd = reduced_rdd.sortByKey()
print(sorted_rdd.collect())

๐Ÿ“บ Watch the Full Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

PySpark RDD Transformations | map, flatMap, filter, distinct, Word Count | PySpark Tutorial

PySpark RDD Transformations Explained | map, flatMap, filter, distinct, Word Count

PySpark RDD Transformations Explained

This post covers how to use the most essential RDD transformations in PySpark. We’ll walk through map, flatMap, filter, distinct, and apply them in a Word Count example — all fundamental for big data processing.

1. Create a Spark Session

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("rdd_transformations_demo").getOrCreate()

2. Load Lines into RDD

lines = spark.sparkContext.parallelize([
    "Aamir loves Spark",
    "Ali and Raza love PySpark",
    "Spark is fast",
    "Ali loves big data"
])

3. Tokenize Text into Words

words_rdd = lines.flatMap(lambda line: line.split(" "))
words_rdd.collect()

4. Convert Words to (word, 1)

pair_rdd = words_rdd.map(lambda word: (word, 1))
pair_rdd.collect()

5. Filter Out Short Words

filtered_rdd = words_rdd.filter(lambda word: word != "is")
filtered_rdd.collect()

6. Distinct Words

distinct_rdd = words_rdd.distinct()
distinct_rdd.collect()

7. Count Word Occurrences

word_count_rdd = pair_rdd.reduceByKey(lambda a, b: a + b)
word_count_rdd.collect()

✅ Output Example:

[('Ali', 2), ('Spark', 2), ('Aamir', 1), ('loves', 3), ('PySpark', 1), ('Raza', 1), ('big', 1), ('data', 1)]

๐Ÿ“บ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

What Are RDD Partitions in PySpark? | How Spark Partitioning works | PySpark Tutorial #pyspark

What Are RDD Partitions in PySpark? | Full Guide with Examples

What Are RDD Partitions in PySpark?

Understanding how RDD partitions work is crucial for optimizing your PySpark applications. This guide walks you through the concept of partitioning, partition count, and how to tune performance using `repartition()` and `coalesce()` functions.

๐Ÿ” What is an RDD Partition?

Partitions are logical chunks of data in a distributed system. Spark uses partitions to divide the data for parallel processing across worker nodes. More partitions mean more parallelism and better scalability.

๐Ÿ“Œ Step 1: Create RDD with Default Partitions

data = ["Aamir", "Ali", "Raza", "Bob", "Lisa"]
rdd = spark.sparkContext.parallelize(data)
print("Original Partitions:", rdd.getNumPartitions())

๐Ÿ“Œ Step 2: Repartition RDD to 5

rdd_repart = rdd.repartition(5)
print("After Repartition to 5:", rdd_repart.getNumPartitions())

๐Ÿ“Œ Step 3: Coalesce Back to 2

rdd_coalesce = rdd_repart.coalesce(2)
print("After Coalesce to 2:", rdd_coalesce.getNumPartitions())

๐Ÿ“Œ Step 4: Show How Data is Distributed

print("Data in Each Partition (Final RDD):")
print(rdd_coalesce.glom().collect())

๐ŸŽฅ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

What is Azure Data Explorer in Synapse? Overview and Use Cases Explained | Azure Synapse Analytics Tutorial

What is Azure Data Explorer in Synapse? Overview and Use Cases

What is Azure Data Explorer in Synapse? Overview and Use Cases Explained

๐Ÿ“˜ Overview

Azure Data Explorer (ADX) is a high-performance, fully managed analytics service optimized for analyzing large volumes of telemetry, log, and time-series data. Within Azure Synapse Analytics, ADX is available as an integrated workspace experience, making it easier to ingest and query massive datasets using Kusto Query Language (KQL).

⚡ Why Use Azure Data Explorer in Synapse?

  • Blazing fast ingestion of structured, semi-structured, and unstructured data
  • Support for ad-hoc and near real-time analytics
  • Native support for time-series analysis
  • Works seamlessly with Spark, Serverless SQL, and Pipelines in Synapse

๐Ÿ” What is KQL?

Kusto Query Language (KQL) is the query language used by ADX. It's designed for fast read-only queries on large datasets, especially logs and telemetry.

Example KQL Query

StormEvents
| where State == "TEXAS"
| summarize Count = count() by EventType

๐Ÿง  Key Features

  • Highly optimized for analytical workloads (vs. transactional)
  • Data visualization integration with Power BI
  • Data retention policies and cache control
  • Time-series operations like make-series, summarize, render

๐ŸŽฏ Common Use Cases

  • IoT telemetry data exploration
  • Application performance monitoring
  • Clickstream and user behavior analytics
  • Real-time log investigation and alerting

๐Ÿ”ง How to Get Started in Synapse

  1. Create an ADX pool in your Synapse workspace
  2. Ingest data using pipelines, notebooks, or KQL
  3. Use KQL to explore and analyze data

๐Ÿ” Integration Highlights

  • Synapse Pipelines: for scheduled and automated ingestion
  • Notebooks: combine Spark & KQL in one environment
  • Power BI: for real-time dashboards

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

PySpark Tutorial : Easy Ways to Create RDD in PySpark | Beginner Guide with Real Examples

Different Ways to Create RDD in PySpark | Step-by-Step Examples

Different Ways to Create RDD in PySpark

This tutorial walks you through multiple practical ways to create RDDs (Resilient Distributed Datasets) in PySpark, a fundamental concept in Apache Spark. Whether you're building data pipelines or preparing for Spark interviews, these examples will help you get started confidently.

1. Using parallelize()

This is the simplest way to create an RDD from an in-memory Python list, ideal for testing or small datasets:

numbers = [1, 2, 3, 4, 5]
rdd_parallel = spark.sparkContext.parallelize(numbers)
print(rdd_parallel.collect())

2. Using textFile()

Loads a text file into an RDD, where each line becomes a single record:

rdd_text = spark.sparkContext.textFile("path/to/textfile.txt")
print(rdd_text.take(5))

3. Using wholeTextFiles()

Reads an entire directory of small text files, each returning a tuple of (filename, content):

rdd_whole = spark.sparkContext.wholeTextFiles("path/to/folder")
print(rdd_whole.take(1))

๐Ÿ“บ Watch the Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Load Lakehouse File to Warehouse Table via Copy Assistant | Microsoft Fabric Tutorial #fabrictutorial

Load Lakehouse File to Warehouse Table via Copy Assistant | Microsoft Fabric Tutorial

Load Lakehouse File to Warehouse Table via Copy Assistant

In this Microsoft Fabric tutorial, you’ll learn how to use the Copy Assistant feature in Data Pipelines to load data from a Lakehouse file into a Warehouse table. This no-code interface simplifies data movement for analysts and engineers alike.

✅ What is Copy Assistant in Fabric Pipelines?

  • A guided interface in Microsoft Fabric for building Copy activities
  • Helps you configure source-to-destination mappings without writing code
  • Ideal for simple Lakehouse to Warehouse transfers

๐Ÿ”— Connecting to the Lakehouse & Warehouse

  1. Start a new Data Pipeline in Microsoft Fabric
  2. Click Copy Assistant from the toolbar
  3. Select the source as Lakehouse and choose your file (CSV, Parquet, etc.)
  4. Select the destination as your Warehouse table

๐Ÿ“Š Mapping Columns

Copy Assistant provides an intuitive column-mapping screen where:

  • Column names from your file are matched to the Warehouse schema
  • You can manually correct any mismatches or transformations
  • Data types are validated to ensure consistency

๐Ÿš€ Executing and Validating

  1. Once configuration is done, click Run Pipeline
  2. Monitor the execution via the Activity Monitor
  3. Check row count and preview records in your target Warehouse table

๐Ÿ’ก Best Practices

  • Ensure your Lakehouse file format matches expected schema (CSV with headers, correct delimiters, etc.)
  • Use Preview before final execution to validate field mappings
  • Check Warehouse table for primary keys or constraints that could block inserts
  • Use Dataflow Gen2 for more advanced transformations, if needed

๐Ÿž Common Troubleshooting Tips

  • Mapping errors: Mismatch in data types or missing columns
  • Permission issues: Ensure Fabric roles allow write access to destination
  • Blank outputs: Check that source file actually contains rows

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

What is RDD in PySpark? | A Beginner’s Guide to Apache Spark’s Core Data Structure | PySpark Tutorial

What is RDD in PySpark? | Learn Spark’s Core Data Structure

๐Ÿ”ฅ What is RDD in PySpark?

RDD (Resilient Distributed Dataset) is the fundamental data structure in Apache Spark, representing an immutable, distributed collection of objects. This tutorial helps you understand what RDDs are, how they work, and when to use them.

๐Ÿ“˜ Definition of RDD

RDD is a low-level abstraction in PySpark. It provides:

  • Fault-tolerance: Can recover from node failures
  • Immutability: Once created, it cannot be changed
  • Partitioning: Internally split across nodes

๐Ÿงช Create RDD Example

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("rdd_intro").getOrCreate()
sc = spark.sparkContext

data = [("Alice", 28), ("Bob", 35), ("Charlie", 40), ("Diana", 23)]
rdd = sc.parallelize(data, 2)
print("Partition Count:", rdd.getNumPartitions())

Output:

Partition Count: 2

๐Ÿ”„ Basic Transformations

# map() transformation to format data
mapped = rdd.map(lambda x: f"{x[0]} is {x[1]} years old")
for item in mapped.collect():
    print(item)

Output:

Alice is 28 years old
Bob is 35 years old
Charlie is 40 years old
Diana is 23 years old

⚖️ RDD vs DataFrame

  • RDD: More control, lower-level, suitable for complex operations
  • DataFrame: Optimized, higher-level API, supports SQL queries

๐Ÿ“Œ When to Use RDD

  • When you need fine-grained control over transformations
  • When you’re working with unstructured or complex data
  • When DataFrame/Dataset APIs do not support your logic

๐ŸŽฅ Watch Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Build Sales Analytics Model – Star Schema, Semantic Model & Power BI | Microsoft Fabric Tutorial

Build Sales Analytics Model – Star Schema, Semantic Model & Power BI | Microsoft Fabric Tutorial

Build Sales Analytics Model – Star Schema, Semantic Model & Power BI

In this Microsoft Fabric tutorial, you’ll learn how to build an end-to-end Sales Analytics model using a star schema, semantic model, and Power BI integration with Direct Lake for blazing-fast performance.

๐Ÿง  What is a Semantic Model in Microsoft Fabric?

A Semantic Model is a business-friendly layer on top of your raw data that enables easier analysis and reporting.

  • Helps users understand and explore the data model intuitively
  • Supports business measures using DAX (e.g., Total Sales = SUM(FactSales[TotalAmount]))
  • Defines relationships, KPIs, hierarchies, and security layers
  • Optimized for tools like Power BI and Excel

๐ŸŒŸ Star Schema Design for Sales

In this model, we structure the data using a star schema:

  • Fact Table: FactSales
  • Dimension Tables: DimCustomer, DimProduct, DimDate, DimRegion

All dimension tables are linked to the central FactSales table to enable fast and meaningful aggregations in reports.

⚡ Direct Lake Mode in Fabric

  • Reads directly from Delta tables in OneLake
  • Combines import-speed with real-time freshness
  • No need for scheduled refreshes
  • Fully integrated with Power BI

๐Ÿ“Š Sample DAX Measures

Total Sales = SUM(FactSales[TotalAmount])
Total Orders = COUNT(FactSales[OrderID])
Sales Per Customer = DIVIDE([Total Sales], DISTINCTCOUNT(DimCustomer[CustomerID]))

๐Ÿ“ˆ Visualizing in Power BI

  • Connect to the semantic model using Direct Lake
  • Create visuals: Sales Trend, Sales by Product, Customer Retention
  • Apply filters, bookmarks, and slicers for interactivity
  • Publish and share via Power BI Service

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

How to Read Data from Dedicated SQL Pool and Create CSV in ADLS Gen2 Using Azure Synapse | Azure Synapse Analytics Tutorial

Read from Dedicated SQL Pool and Create CSV in ADLS Gen2 | Azure Synapse

How to Read Data from Dedicated SQL Pool and Create CSV in ADLS Gen2 Using Azure Synapse

๐Ÿ“˜ Overview

In this tutorial, you'll learn how to extract data from a Dedicated SQL Pool using PySpark and write it to a single CSV file in Azure Data Lake Storage Gen2 (ADLS Gen2). This is a common task for exporting curated data for sharing, archiving, or downstream analytics.

๐Ÿงฑ Prerequisites

  • Azure Synapse workspace with Spark and Dedicated SQL Pool
  • Configured Linked Services and credentials
  • ADLS Gen2 container path

๐Ÿ› ️ Step-by-Step Guide

✅ Step 1: Read from Dedicated SQL Pool Table

%%pyspark
df = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", "jdbc:sqlserver://yourserver.database.windows.net:1433;database=yourDB") \
    .option("dbtable", "dbo.sales_data") \
    .option("user", "youruser") \
    .option("password", "yourpassword") \
    .load()

✅ Step 2: Repartition to Single File

df_single = df.repartition(1)

✅ Step 3: Write to ADLS Gen2 as CSV

df_single.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("abfss://export@yourstorageaccount.dfs.core.windows.net/reports/sales_output")

๐Ÿ“Œ Notes

  • Use repartition(1) to generate a single CSV file
  • You may rename the file from part-00000.csv using Azure Storage Explorer
  • Ensure Spark pool has permission to access the ADLS path

๐ŸŽฏ Use Cases

  • Exporting business reports from SQL warehouse
  • Sharing datasets with external users
  • Backup/archive structured data in CSV format

๐Ÿ“Š Output Example

The CSV file will be saved in your specified ADLS path, containing all the rows from the selected Dedicated SQL Pool table.

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

Explore Databases and Tables in PySpark with spark.catalog | A Beginner’s Guide to Metadata Management | PySpark Tutorial

Explore Databases and Tables in PySpark using spark.catalog

๐Ÿ” How to Explore Tables and Databases in PySpark using spark.catalog

Learn how to interact with databases, tables, views, cached data, and registered functions using PySpark's powerful spark.catalog interface. Ideal for metadata exploration and data pipeline control.

๐Ÿ“˜ Step 1: Inspect Current Catalog State

print("Current Database:", spark.catalog.currentDatabase())
print("Current Catalog:", spark.catalog.currentCatalog())

๐Ÿ“‚ Step 2: List Databases and Tables

print("List of Databases:")
for db in spark.catalog.listDatabases():
    print(db.name)

print("List of Tables:")
for tbl in spark.catalog.listTables():
    print(tbl.name, tbl.tableType)

๐Ÿงช Step 3: Check Existence of Tables & Databases

print("Table Exists:", spark.catalog.tableExists("demo_table"))
print("Database Exists:", spark.catalog.databaseExists("demo_db"))

๐Ÿ” Step 4: Explore Table Columns

for col in spark.catalog.listColumns("demo_table"):
    print(col.name, col.dataType)

๐Ÿงผ Step 5: Drop Temporary Views

spark.catalog.dropTempView("demo_view")

๐Ÿง  Step 6: Manage Cache

print("Is Cached:", spark.catalog.isCached("demo_view"))
spark.catalog.uncacheTable("demo_view")
spark.catalog.clearCache()

๐ŸŽฅ Watch Full Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

How to Create Notebook to Join Two CSV Files and Write to Lake Database in Synapse Pipelines | Azure Synapse Analytics Tutorial

Join Two CSV Files and Write to Lake Database Using Synapse Notebook

How to Create Notebook to Join Two CSV Files and Write to Lake Database in Synapse Pipelines

๐Ÿ“˜ Overview

In this tutorial, we’ll show you how to create a PySpark notebook in Azure Synapse Analytics that reads two CSV files, performs a join operation, and writes the result to a Lake Database table. This notebook can then be integrated into a Synapse pipeline for automated data workflows.

๐Ÿงฑ Prerequisites

  • Azure Synapse workspace with Spark Pool
  • Lake Database created (or Spark database)
  • CSV files stored in ADLS Gen2

๐Ÿ› ️ Step-by-Step Instructions

✅ Step 1: Read CSV Files

%%pyspark
df_customers = spark.read.option("header", True).csv("abfss://data@yourstorage.dfs.core.windows.net/customers.csv")
df_orders = spark.read.option("header", True).csv("abfss://data@yourstorage.dfs.core.windows.net/orders.csv")

✅ Step 2: Join the Two DataFrames

df_joined = df_customers.join(df_orders, df_customers.customer_id == df_orders.customer_id, "inner")

✅ Step 3: Write the Result to a Lake Database Table

df_joined.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("LakeDB.joined_customer_orders")

๐Ÿ“‚ Result

The output Delta table will be created inside your Lake Database and will be queryable via Spark and Serverless SQL Pools.

๐Ÿ” Integrate Notebook in Synapse Pipeline

  1. Go to Synapse Studio → Integrate → Pipeline
  2. Add “Notebook” activity
  3. Select the notebook you just created
  4. Configure Spark Pool and parameters (if any)
  5. Publish and trigger the pipeline

๐Ÿ“Œ Tips

  • Validate file schema before joining
  • Use display() for previewing data inside the notebook
  • Leverage overwrite mode for testing; use append for incremental writes

๐ŸŽฏ Use Cases

  • Merge transactional and customer data
  • Create curated data layers in Lakehouse
  • Automate data ingestion with Synapse Pipelines

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

PySpark Tutorial: How to Build User-Defined Table Functions (UDTFs) in PySpark | Split Rows

Create UDTF in PySpark | Generate Multiple Rows per Input

How to Create a UDTF (User Defined Table Function) in PySpark

Need to return multiple rows per input in Spark? Learn how to define and use UDTFs (User Defined Table Functions) in PySpark using the @udtf decorator with real-world logic.

๐Ÿ“˜ Step 1: Define a Class to Split Words

from pyspark.sql.functions import udtf

class SplitWords:
    def eval(self, text: str):
        for word in text.split(" "):
            yield (word,)

split_words_udtf = udtf(SplitWords, returnType="word: string")

Use Case: This takes a sentence and emits each word as a row.

⚡ Step 2: Use UDTF Directly in Select

from pyspark.sql.functions import lit

split_words_udtf(lit("pyspark is powerful")).show()

Output:

+---------+
|  word   |
+---------+
| pyspark |
| is      |
| powerful|
+---------+

๐Ÿ“Š Step 3: Another UDTF Returning Two Columns

@udtf(returnType="num: int, plus_one: int")
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

PlusOne(lit(5)).show()

Output:

+-----+---------+
| num | plus_one|
+-----+---------+
|  5  |    6    |
+-----+---------+

๐ŸŽฅ Full Tutorial Video

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Monitor Performance & Query Execution in Fabric Warehouse-Microsoft Fabric Tutorial #fabrictutorial

Monitor Performance & Query Execution in Fabric Warehouse | Microsoft Fabric Tutorial

Monitor Performance & Query Execution in Fabric Warehouse

Monitoring performance and query execution in Microsoft Fabric Warehouse is essential for optimizing resource usage, identifying bottlenecks, and tuning SQL workloads. In this tutorial, we’ll walk through powerful system views and dynamic management queries you can use for real-time and historical insights.

๐Ÿ” Query Insights Views in Fabric

Microsoft Fabric provides specialized views to track query and session behavior:

  • queryinsights.exec_requests_history – Tracks historical query executions
  • queryinsights.exec_sessions_history – Tracks session metadata
  • queryinsights.frequently_run_queries – Identifies most commonly executed queries
  • queryinsights.long_running_queries – Lists slowest queries for optimization
SELECT * FROM queryinsights.long_running_queries;

๐Ÿ› ️ Using SQL DMVs for Live Monitoring

Dynamic Management Views (DMVs) offer real-time insight into query activity:

  • sys.dm_exec_requests – Tracks currently running queries
  • sys.dm_exec_sessions – Lists active sessions
  • sys.dm_exec_connections – Shows active client connections
SELECT
    r.session_id,
    r.status,
    r.start_time,
    r.command,
    r.cpu_time,
    r.total_elapsed_time AS elapsed_ms,
    r.reads,
    r.writes,
    t.text AS query_text,
    s.program_name,
    s.login_name,
    s.host_name
FROM sys.dm_exec_requests r
JOIN sys.dm_exec_sessions s ON r.session_id = s.session_id
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
WHERE r.status = 'running'
ORDER BY r.total_elapsed_time DESC;

Optionally, you can use the KILL command to stop long-running or blocked sessions (admin permission required):

KILL 56;

✅ Best Practices

  • Use queryinsights views for historical trends and usage stats
  • Use DMVs for real-time analysis of live workloads
  • Investigate long-running queries and optimize indexes or logic
  • Periodically review session and resource usage patterns

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

Create Custom Column Logic in PySpark Using udf() | Easy Guide with Real Examples | PySpark Tutorial

Create Custom Column Logic with udf() in PySpark

Create Custom Column Logic with udf() in PySpark

Sometimes built-in PySpark functions just aren't enough. That’s when you use udf() to apply your own Python logic directly to Spark DataFrames.

๐Ÿ“˜ Sample Data

data = [("apple", 2), ("banana", 3), ("kiwi", 1)]
df = spark.createDataFrame(data, ["fruit", "quantity"])
df.show()

Output:

+-------+--------+
| fruit |quantity|
+-------+--------+
| apple |   2    |
| banana|   3    |
| kiwi  |   1    |
+-------+--------+

๐Ÿง  Step 1: Define a Python Function to Tag Price

def tag_price(qty: int) -> str:
    return "expensive" if qty >= 3 else "cheap"

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

tag_price_udf = udf(tag_price, StringType())

⚡ Step 2: Apply UDF to Create a New Column

df = df.withColumn("price_tag", tag_price_udf(df["quantity"]))
df.select("fruit", "quantity", "price_tag").show()

Output:

+--------+--------+-----------+
| fruit  |quantity| price_tag |
+--------+--------+-----------+
| apple  |   2    |   cheap   |
| banana |   3    | expensive |
| kiwi   |   1    |   cheap   |
+--------+--------+-----------+

๐Ÿ”ค Step 3: Another UDF to Transform Fruit Name

def shout_name(name: str) -> str:
    return name.upper() + "!"

shout_udf = udf(shout_name, StringType())
df = df.withColumn("fruit_shout", shout_udf(df["fruit"]))
df.select("fruit", "fruit_shout").show()

Output:

+--------+-------------+
| fruit  |fruit_shout  |
+--------+-------------+
| apple  |  APPLE!     |
| banana |  BANANA!    |
| kiwi   |  KIWI!      |
+--------+-------------+

๐ŸŽฅ Watch the Full Video Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

What Are Statistics in Fabric Data Warehouse & How to Manage Statistics | Microsoft Fabric Tutorial

What Are Statistics in Fabric Data Warehouse & How to Manage Statistics | Microsoft Fabric Tutorial

What Are Statistics in Fabric Data Warehouse & How to Manage Statistics

In this Microsoft Fabric tutorial, we explain what statistics are in a Fabric Data Warehouse, why they are critical for performance, and how to manage them manually or automatically using T-SQL.

๐Ÿ“Š What Are Statistics?

  • Statistics describe the distribution of values in a column
  • Used by the query optimizer to estimate row counts and selectivity
  • Critical to choosing an efficient query execution plan

๐Ÿ”ง Manual Statistics Management

Create, update, and inspect statistics on frequently filtered or joined columns:

-- Create statistics manually
CREATE STATISTICS DimCustomer_CustomerKey_FullScan
ON dbo.DimCustomer (CustomerKey) WITH FULLSCAN;

-- Update statistics after data changes
UPDATE STATISTICS dbo.DimCustomer (DimCustomer_CustomerKey_FullScan) WITH FULLSCAN;

-- View histogram of statistics
DBCC SHOW_STATISTICS ('dbo.DimCustomer', 'DimCustomer_CustomerKey_FullScan') WITH HISTOGRAM;

⚙️ Automatic Statistics

  • Automatically created when queries involve JOIN, GROUP BY, WHERE, or ORDER BY
  • Generated under names like _WA_Sys_...
  • Automatically refreshed when underlying data changes significantly
-- Query that triggers auto-stat creation
SELECT CustomerKey FROM dbo.DimCustomer GROUP BY CustomerKey;

-- View system-generated statistics
SELECT
    object_name(s.object_id) AS object_name,
    c.name AS column_name,
    s.name AS stats_name,
    STATS_DATE(s.object_id, s.stats_id) AS stats_update_date,
    s.auto_created
FROM sys.stats s
JOIN sys.stats_columns sc ON s.stats_id = sc.stats_id AND s.object_id = sc.object_id
JOIN sys.columns c ON sc.object_id = c.object_id AND sc.column_id = c.column_id
WHERE object_name(s.object_id) = 'DimCustomer';

๐Ÿ’ก Best Practices

  • Create stats manually for columns heavily used in WHERE and JOIN clauses
  • Use WITH FULLSCAN for higher accuracy
  • Inspect STATS_DATE to verify freshness
  • Allow auto stats to handle low-frequency changes

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

DataFlow vs Copy Activity | How to Join Two Files & Create Output Using Data Flow Activity in Azure Synapse Pipelines | Azure Synapse Analytics Tutorial

DataFlow vs Copy Activity in Azure Synapse | Join Two Files & Output Using Data Flow

DataFlow vs Copy Activity | How to Join Two Files & Create Output Using Data Flow in Azure Synapse

๐Ÿ“˜ Overview

Azure Synapse Analytics provides two powerful ways to move and transform data: Copy Activity and Data Flow Activity. While Copy Activity is ideal for simple file movement, Data Flows allow for complex transformations such as joining, filtering, and aggregating data before writing it to the destination.

๐Ÿ”„ Copy Activity vs Data Flow: Key Differences

Feature Copy Activity Data Flow Activity
Use Case Data movement (copy files/tables) Data transformation (joins, filters, derived columns)
UI-based Logic No Yes
Performance Tuning Limited More control (partitioning, caching)
Join Support No Yes

๐Ÿ› ️ Use Case: Join Two Files & Write Output

We demonstrate how to use Data Flow Activity to join two CSV files stored in ADLS Gen2 and output the joined result to a new file.

✅ Step 1: Create Linked Services

  • Linked Service to your Azure Data Lake Gen2

✅ Step 2: Create Source Datasets

  • Dataset 1: customers.csv
  • Dataset 2: orders.csv

✅ Step 3: Design the Data Flow

  1. In Synapse Studio, go to the Data Flows tab
  2. Add two source transformations for customers and orders
  3. Add a Join transformation: Join on customer_id
  4. Use a Select transformation to choose columns
  5. Add Sink transformation to write output to ADLS Gen2 in CSV format

✅ Step 4: Create and Trigger Pipeline

1. Go to Integrate tab → New pipeline
2. Drag in a Data Flow activity and select the one you created
3. Debug and publish
4. Trigger manually or schedule

๐Ÿ“‚ Output Example

The resulting file contains joined customer and order information, and is saved as a new CSV file in your configured output path in ADLS.

๐Ÿ“Œ Best Practices

  • Use Data Flow for any transformation-heavy pipelines
  • Parameterize paths and filters for reusability
  • Use debugging to test logic before publishing

๐ŸŽฏ When to Use Each

  • Use Copy Activity for fast, simple data transfers
  • Use Data Flow Activity when joining, filtering, or reshaping data

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

Boost PySpark Performance with pandas_udf | Beginner-Friendly Tutorial with Real Examples | PySpark Tutorial

Speed Up PySpark with pandas_udf | Beginner-Friendly Tutorial

๐Ÿš€ Speed Up PySpark with pandas_udf() – Easy Tutorial

Want faster performance in your PySpark jobs? This tutorial covers how to use pandas_udf() to process data in batches using Pandas under the hood—providing serious speed boosts over regular UDFs.

๐Ÿ“˜ Sample DataFrame

data = [("apple",), ("banana",), ("kiwi",)]
df = spark.createDataFrame(data, ["fruit"])
df.show()

Output:

+--------+
|  fruit |
+--------+
|  apple |
| banana |
|   kiwi |
+--------+

⚡ Step 1: Define a pandas_udf to Get Length of Fruit Name

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
import pandas as pd

@pandas_udf(IntegerType())
def fruit_length(series: pd.Series) -> pd.Series:
    return series.str.len()

df = df.withColumn("length", fruit_length(df["fruit"]))
df.select("fruit", "length").show()

Output:

+--------+------+
|  fruit |length|
+--------+------+
|  apple |     5|
| banana |     6|
|   kiwi |     4|
+--------+------+

๐ŸŽฏ Step 2: Classify Fruit Based on Length

from pyspark.sql.types import StringType

@pandas_udf(StringType())
def classify_fruit(series: pd.Series) -> pd.Series:
    return series.apply(lambda name: "long name" if len(name) > 5 else "short name")

df = df.withColumn("length_category", classify_fruit(df["fruit"]))
df.select("fruit", "length_category").show()

Output:

+--------+----------------+
|  fruit |length_category|
+--------+----------------+
|  apple |     short name |
| banana|     long name   |
|   kiwi |     short name |
+--------+----------------+

๐ŸŽฅ Watch Full Tutorial on YouTube

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Read & Write to Fabric Warehouse from Notebooks | Microsoft Fabric Tutorial #fabrictutorial

Read & Write to Fabric Warehouse from Notebooks | Microsoft Fabric Tutorial

Read & Write to Fabric Warehouse from Notebooks

In this tutorial, you'll learn how to read data from a Microsoft Fabric Warehouse and write it to a Lakehouse — and vice versa — all within a Fabric Notebook using PySpark. This allows for seamless integration between the structured warehouse and unstructured or semi-structured file storage.

๐Ÿ“ฅ 1. Read Table from Fabric Warehouse & Write to Lakehouse Files

from datetime import datetime
import com.microsoft.spark.fabric

# Read from Warehouse
df = spark.read.synapsesql("TechDWH.dbo.Employees")
display(df)

# Generate timestamped filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"Files/Employees_{timestamp}.parquet"

# Write to Lakehouse
df.write.mode("overwrite").parquet(output_path)

print(f"Data written to: {output_path}")

๐Ÿ“ค 2. Read from Lakehouse & Write to Warehouse Table

from datetime import date
import com.microsoft.spark.fabric

# Read CSV file from Lakehouse
order_df = spark.read.option("header", True).csv("Files/order.csv")
display(order_df)

# Generate table name with today’s date
today = date.today().strftime("%Y%m%d")
table_name = f"TechDWH.dbo.Order_{today}"

# Write to Fabric Warehouse table
order_df.write.mode("overwrite").synapsesql(table_name)

print(f"Data written to table: {table_name}")

✅ Benefits of Using Fabric Notebooks

  • Dynamic integration between Warehouse and Lakehouse
  • Use of PySpark APIs for flexible data manipulation
  • Fast prototyping and data engineering workflows
  • Great for automation and scheduled execution

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

How to Create Synapse Pipeline to Copy Data from CSV to Dedicated SQL Pool from Scratch | Azure Synapse Analytics Tutorial

Create Synapse Pipeline to Copy CSV Data to Dedicated SQL Pool

How to Create Synapse Pipeline to Copy Data from CSV to Dedicated SQL Pool

๐Ÿ“˜ Overview

In this tutorial, you’ll learn how to build a Synapse pipeline from scratch to copy data from a CSV file in Azure Data Lake Storage Gen2 to a table in a Dedicated SQL Pool. This is a common ETL scenario in modern data warehousing using Azure Synapse Analytics.

๐Ÿงฑ Prerequisites

  • A Synapse workspace with a dedicated SQL pool created
  • CSV file stored in Azure Data Lake Gen2
  • Proper permissions for Linked Services and data access

๐Ÿ› ️ Step-by-Step Instructions

✅ Step 1: Open Synapse Studio

Navigate to your Synapse workspace → Open Synapse Studio

✅ Step 2: Create Linked Services

  • One for your Azure Data Lake Gen2 (source)
  • One for your Dedicated SQL Pool (sink)

✅ Step 3: Create Source Dataset

Choose DelimitedText → CSV
Point to the container and folder where your CSV file is stored
Enable header row, set column delimiter, and schema if needed

✅ Step 4: Create Sink Dataset

Choose Azure Synapse Analytics
Select or define the table in your dedicated SQL pool

✅ Step 5: Create the Pipeline

1. Go to the Integrate tab
2. Click + > Pipeline
3. Add a Copy Data activity
4. Configure source and sink datasets
5. Map columns (if needed)
6. Debug or trigger manually

✅ Step 6: Publish and Monitor

  • Click “Publish All” to deploy the pipeline
  • Run the pipeline manually or create a trigger
  • Monitor progress under the “Monitor” tab

๐Ÿ“Œ Tips

  • Make sure your CSV schema matches the destination table
  • Use staging if file size is large for better performance
  • Enable logging for troubleshooting

๐ŸŽฏ Use Cases

  • Daily ingestion of batch files into SQL Pool
  • Building historical data from external systems
  • Loading external partner data for analytics

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

How to Use call_udf() in PySpark | Dynamically Apply UDFs in Real-Time Data Pipelines | PySpark Tutorial

How to Use call_udf() in PySpark | Dynamically Apply Registered UDFs

How to Use call_udf() in PySpark

This tutorial teaches you how to dynamically apply registered user-defined functions (UDFs) in PySpark using call_udf(). It’s perfect for use cases where the logic needs to be reusable and flexible for different conditions or inputs.

๐Ÿ“˜ Sample Data

data = [
    (1, "apple"),
    (2, "banana"),
    (3, "carrot")
]
df = spark.createDataFrame(data, ["id", "fruit"])
df.show()

Output:

+---+------+
| id| fruit|
+---+------+
|  1| apple|
|  2|banana|
|  3|carrot|
+---+------+

๐Ÿง  Register and Use UDF with call_udf()

from pyspark.sql.functions import col, call_udf
from pyspark.sql.types import IntegerType, StringType

# Register a UDF to double an ID value
spark.udf.register("double_id", lambda x: x * 2, IntegerType())

# Apply UDF using call_udf by name
df = df.withColumn("id_doubled", call_udf("double_id", col("id")))
df.select("id", "fruit", "id_doubled").show()

Output:

+---+------+----------+
| id| fruit|id_doubled|
+---+------+----------+
|  1| apple|        2 |
|  2|banana|        4 |
|  3|carrot|        6 |
+---+------+----------+

๐Ÿ”ค Apply String-Based UDF

# Register another UDF that adds a prefix to fruit name
spark.udf.register("udf_add_prefix", lambda s: "Tasty " + s, StringType())

# Apply using call_udf
df = df.withColumn("fruit_tagged", call_udf("udf_add_prefix", col("fruit")))
df.select("fruit", "fruit_tagged").show()

Output:

+-------+-------------+
| fruit |fruit_tagged |
+-------+-------------+
| apple |Tasty apple  |
| banana|Tasty banana |
| carrot|Tasty carrot |
+-------+-------------+

๐ŸŽฅ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Get Row Counts of All Tables in Fabric Warehouse | Microsoft Fabric Tutorial #fabrictutorial

Get Row Counts of All Tables in Fabric Warehouse | Microsoft Fabric Tutorial

Get Row Counts of All Tables in Fabric Warehouse

In Microsoft Fabric Warehouse, it's often helpful to get the row counts of all tables in your database — whether for validation, monitoring, or reporting purposes. This tutorial shows you a dynamic and efficient way to accomplish that using T-SQL.

๐ŸŽฏ Goal

Dynamically generate and execute a script that counts rows from every BASE TABLE in the current Fabric Warehouse.

๐Ÿงพ T-SQL Script

DECLARE @QueryString NVARCHAR(MAX);
DECLARE @NewLine CHAR(2) = CHAR(13) + CHAR(10); -- Define newline once

SELECT @QueryString = COALESCE(@QueryString + @NewLine + 'UNION ALL' + @NewLine, '')
                      + 'SELECT '
                      + '''' + t.TABLE_SCHEMA + '.' + t.TABLE_NAME + '''' + ' AS [TableName]' + @NewLine
                      + ', COUNT(*) AS [RowCount] FROM '
                      + QUOTENAME(t.TABLE_SCHEMA) + '.' + QUOTENAME(t.TABLE_NAME) + @NewLine
FROM INFORMATION_SCHEMA.TABLES AS t
WHERE t.TABLE_TYPE = 'BASE TABLE'
ORDER BY t.TABLE_SCHEMA, t.TABLE_NAME;

-- Optional: print to verify the dynamic SQL
PRINT @QueryString;

-- Execute the dynamic SQL
EXEC sp_executesql @QueryString;

๐Ÿ› ️ How It Works

  • Pulls all BASE TABLE names from INFORMATION_SCHEMA.TABLES
  • Builds a SELECT COUNT(*) query for each table
  • Combines them using UNION ALL into one dynamic script
  • Prints and executes the final T-SQL query

๐Ÿ” Benefits

  • No manual table listing needed
  • Query adapts to future schema changes
  • Can be used for automated audit reporting

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

Linked Services, Datasets & Copy Data Tool in Azure Synapse Analytics Explained | Azure Synapse Pipelines

Linked Services, Datasets & Copy Data Tool in Azure Synapse Analytics Explained

Linked Services, Datasets & Copy Data Tool in Azure Synapse Analytics Explained

๐Ÿ“˜ Overview

Azure Synapse Analytics offers a visual and low-code environment to integrate data from various sources using Linked Services, Datasets, and the Copy Data Tool. These components are essential for building scalable data ingestion and ETL pipelines inside Synapse Studio.

๐Ÿ”— What Is a Linked Service?

A Linked Service acts like a connection string. It defines the connection information required for Synapse to connect to a data source or compute environment.

๐Ÿ“ฅ Examples:

  • Azure Blob Storage
  • Azure Data Lake Storage Gen2
  • Azure SQL Database or Synapse SQL Pools
  • Amazon S3 or external APIs

๐Ÿ“„ What Is a Dataset?

A Dataset defines the structure of the data (like schema or file type) used as input or output in a data activity (e.g., Copy Data).

๐Ÿ“ Dataset Examples:

  • CSV file in Blob Storage
  • Table in Synapse SQL Pool
  • JSON document in ADLS Gen2

๐Ÿš€ Using the Copy Data Tool

The Copy Data Tool is a wizard-driven interface that lets you quickly copy data between sources and destinations.

✅ Step-by-Step

1. Go to Synapse Studio > Integrate > + > Copy Data Tool
2. Choose your source linked service (e.g., Azure Blob)
3. Choose your dataset (e.g., CSV file)
4. Choose the destination (e.g., Synapse SQL Table)
5. Map columns and set copy behavior
6. Trigger immediately or schedule

๐Ÿงฉ How They Work Together

Linked Services define where to get or store the data. Datasets define what data is being moved. The Copy Data Tool defines how the data is moved.

๐Ÿ“Œ Best Practices

  • Use parameterized linked services for reuse across pipelines
  • Validate datasets for schema consistency
  • Use copy activity logging for troubleshooting

๐ŸŽฏ Common Use Cases

  • Ingesting data from on-premises to cloud
  • Loading staging tables from flat files
  • Copying reference data between sources

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

How to Use call_function() in PySpark – Dynamically Apply SQL Functions in Your Code #pyspark | PySpark Tutorial

How to Use call_function() in PySpark | Dynamically Apply SQL Functions

How to Use call_function() in PySpark

Want to call Spark SQL functions dynamically without hardcoding them? In this tutorial, we explore the call_function() utility in PySpark, which allows you to apply SQL functions dynamically using variables—perfect for reusable pipelines and configurable logic.

๐Ÿ“˜ Sample DataFrame

data = [
    ("Aamir", 3000.456),
    ("Bob", 4999.899),
    ("Charlie", 2500.5)
]
df = spark.createDataFrame(data, ["name", "salary"])
df.show()

Output:

+-------+--------+
| name  | salary |
+-------+--------+
| Aamir | 3000.45|
| Bob   | 4999.89|
| Charlie|2500.5 |
+-------+--------+

๐Ÿ”„ Use call_function to Dynamically Call SQL Functions

from pyspark.sql.functions import col, call_function

# Uppercase name
df = df.withColumn("name_upper", call_function("upper", col("name")))
df.select("name", "name_upper").show()

Output:

+-------+-----------+
| name  | name_upper|
+-------+-----------+
| Aamir | AAMIR     |
| Bob   | BOB       |
| Charlie|CHARLIE   |
+-------+-----------+

๐Ÿ’ฒ Format Number with call_function

df = df.withColumn("salary_formatted", call_function("format_number", col("salary"), lit(2)))
df.select("salary", "salary_formatted").show()

Output:

+--------+----------------+
| salary | salary_formatted|
+--------+----------------+
|3000.456|        3,000.46 |
|4999.899|        4,999.90 |
|2500.5  |        2,500.50 |
+--------+----------------+

๐Ÿ“ Dynamically Apply Any SQL Function

func_name = "length"
df = df.withColumn("name_length", call_function(func_name, col("name")))
df.select("name", "name_length").show()

Output:

+--------+------------+
| name   | name_length|
+--------+------------+
| Aamir  | 5          |
| Bob    | 3          |
| Charlie| 7          |
+--------+------------+

๐ŸŽฅ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

T-SQL in Fabric Warehouse – What’s Supported and What’s Not | Microsoft Fabric Tutorial

T-SQL in Fabric Warehouse – What’s Supported and What’s Not | Microsoft Fabric Tutorial

T-SQL in Fabric Warehouse – What’s Supported and What’s Not

Microsoft Fabric introduces powerful T-SQL support within the Fabric Warehouse. However, there are some important differences between what you can do in a Warehouse vs a SQL Analytics Endpoint.

✅ Supported T-SQL Features in Fabric Warehouse

  • CREATE, ALTER, DROP for tables, views, functions, and procedures
  • Full support for INSERT, UPDATE, DELETE, and TRUNCATE
  • sp_rename to rename columns
  • Common Table Expressions (CTEs), including nested CTEs (Preview)
  • Session-scoped #temp tables
  • ADD/DROP COLUMN (nullable only)
  • ADD/DROP CONSTRAINT (only with NOT ENFORCED)

❌ Not Supported T-SQL Features

  • MERGE, PREDICT, TRIGGERS
  • SET ROWCOUNT, SET TRANSACTION ISOLATION LEVEL
  • IDENTITY columns
  • FOR JSON and FOR XML in subqueries
  • ALTER TABLE (only limited cases supported)
  • sp_showspaceused, CREATE USER
  • Recursive queries
  • Materialized views
  • Manual statistics (auto only)

⚠️ SQL Analytics Endpoint – Key Limitations

  • Read-only access to Delta tables
  • Can create views and functions referencing Delta Lake
  • No INSERT, UPDATE, DELETE, or CREATE TABLE support
  • Designed for analytical querying, not DML operations
✅ Use Fabric Warehouse for complete DML + DDL capabilities.
๐Ÿ” Use SQL Analytics Endpoint for querying Lakehouse Delta tables only.

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.

Introduction to Azure Synapse Pipelines | Data Integration & ETL in Synapse Analytics | Azure Synapse Analytics Tutorial

Introduction to Azure Synapse Pipelines | Data Integration & ETL

Introduction to Azure Synapse Pipelines | Data Integration & ETL in Synapse Analytics

๐Ÿ“˜ Overview

Azure Synapse Pipelines provide a unified platform for building and orchestrating data integration workflows in the Azure Synapse workspace. With Synapse Pipelines, you can move, transform, and process data at scale, similar to Azure Data Factory, but tightly integrated with Synapse Analytics.

๐Ÿš€ Key Features

  • Low-code drag-and-drop pipeline designer
  • Support for multiple data sources including Azure Data Lake, SQL, Blob, Cosmos DB, etc.
  • Activities for data movement, transformation (Data Flows), notebooks, and stored procedures
  • Trigger-based orchestration (manual, schedule, tumbling window, event)

๐Ÿ› ️ Components of a Synapse Pipeline

  • Pipeline: A logical container of activities
  • Activity: A task like copy, execute notebook, data flow, or web call
  • Dataset: Defines the structure and location of the data
  • Linked Service: Connection information to data stores or compute
  • Trigger: Defines how and when a pipeline should run

๐Ÿ“ฅ Sample Pipeline Flow

Here's a basic flow of a pipeline in Synapse:

  1. Trigger fires a pipeline on schedule
  2. Copy activity moves data from Blob Storage to SQL Pool
  3. Notebook activity cleans or transforms the data
  4. Stored procedure updates metadata or logs

๐Ÿ“Š Example: Create a Pipeline with Copy Activity

1. Go to Synapse Studio → Integrate tab
2. Click "+ New pipeline"
3. Add "Copy Data" activity
4. Configure source (e.g., CSV file in ADLS Gen2)
5. Configure sink (e.g., table in SQL pool)
6. Publish and trigger manually or via schedule

๐Ÿ“Œ Best Practices

  • Use parameterization to create reusable pipelines
  • Monitor with Synapse Monitoring or Log Analytics
  • Break complex flows into modular pipelines

๐ŸŽฏ Use Cases

  • Batch ingestion and transformation of files
  • Orchestration of Spark notebooks with SQL pipelines
  • Building data warehouses and lakes

๐Ÿ“บ Watch the Full Video Tutorial

๐Ÿ“š Credit: Content created with the help of ChatGPT and Gemini.

Extract Substrings Easily in PySpark with regexp_substr() | Real-World Regex Examples | PySpark Tutorial

How to Use regexp_substr() in PySpark | Extract with Regex

Extract Substrings with regexp_substr() in PySpark

In this tutorial, you'll learn how to use the regexp_substr() function in PySpark to extract specific patterns or substrings using regular expressions. This function is especially helpful for extracting dates, prices, or identifiers from messy text data.

๐Ÿ“˜ Sample Data

data = [
    ("[INFO] Task completed at 2024-04-10 14:33:22", "Price: $199.99"),
    ("[ERROR] Failed on 2022-12-25 08:15:00", "Price: $49.50 + tax"),
    ("[WARN] Updated 2022-01-01 10:00:00", "10")
]
cols = ["log_msg", "price"]
df = spark.createDataFrame(data, cols)
df.show(truncate=False)

Output:

+------------------------------------------+-------------------+
|log_msg                                   |price              |
+------------------------------------------+-------------------+
|[INFO] Task completed at 2024-04-10 14:33:22|Price: $199.99     |
|[ERROR] Failed on 2022-12-25 08:15:00     |Price: $49.50 + tax|
|[WARN] Updated 2022-01-01 10:00:00        |10                 |
+------------------------------------------+-------------------+

๐Ÿ“… Extract Date from Log Message

from pyspark.sql.functions import regexp_substr, col

df = df.withColumn("log_date", regexp_substr(col("log_msg"), "\\d{4}-\\d{2}-\\d{2}", 0))
df.show(truncate=False)

Output: Extracts the date in format YYYY-MM-DD from the log message.

๐Ÿ’ฒ Extract Price from Price String

df = df.withColumn("extracted_price", regexp_substr(col("price"), "\\d+\\.\\d+", 0))
df.show(truncate=False)

Output: Extracts the numeric price value from the text string.

๐ŸŽฅ Watch the Full Tutorial

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

Create Warehouse Snapshots in Fabric – Point-in-Time Data Copy | Microsoft Fabric Tutorial

Create Warehouse Snapshots in Fabric – Point-in-Time Data Copy | Microsoft Fabric Tutorial

Create Warehouse Snapshots in Fabric – Point-in-Time Data Copy

In this Microsoft Fabric tutorial, you’ll learn how to use Warehouse Snapshots — a powerful feature that allows you to capture point-in-time read-only copies of your data warehouse for reporting, auditing, and analytics.

๐Ÿง  What is a Warehouse Snapshot?

  • A read-only, point-in-time view of a Microsoft Fabric Warehouse
  • Not automatically created — must be initiated by users
  • Remains consistent even as base tables are updated
  • Valid for querying historical data up to 30 days in the past
  • Can be refreshed manually to reflect the latest data

๐Ÿ“Œ Use Cases

  • ๐Ÿ“Š Historical analytics and period comparisons
  • ๐Ÿ“ฆ Stable inputs for machine learning and BI pipelines
  • ๐Ÿ” Regulatory compliance and audits
  • ✅ Backup for critical datasets before running ETL transformations

๐Ÿงช Sample Query on a Snapshot

SELECT TOP 5 * 
FROM [TechDWH_Snapshot].[dbo].[Sales];

๐Ÿ” Updating a Snapshot

  • Only available to Admin, Member, or Contributor roles
  • Automatically refreshes all snapshot tables to current state
  • Ongoing queries complete on the previous snapshot version

๐Ÿ” Security & Governance

  • Follows all permission rules from the parent warehouse
  • Strictly read-only – users cannot modify data
  • Security policies like GRANT/REVOKE instantly apply
  • ⚠️ Snapshot schema is not versioned — dropped tables disappear

⚠️ Preview Limitations

  • Works only on warehouses created after March 2025
  • No support via SSMS or Direct Lake integration
  • Does not support creating new views or DML operations
  • Unavailable for Lakehouse SQL Analytics Endpoints

๐ŸŽฌ Watch the Full Tutorial

Blog post written with the help of ChatGPT.