PySpark unpersist Explained How to Free Memory with unpersist Function With Examples #pyspark

PySpark unpersist() Explained | Free Up Memory in Spark

PySpark unpersist() Explained – How to Free Memory in Spark

In this tutorial, you'll learn how to use the unpersist() function in PySpark to release memory or disk used by persisted or cached DataFrames and RDDs. It is essential for optimizing memory and performance when working with large datasets in Spark.

Step 1: Create a Sample DataFrame

data = [("Aamir Shahzad",), ("Ali Raza",), ("Bob",), ("Lisa",)]
columns = ["name"]
df = spark.createDataFrame(data, columns)

print("๐Ÿ“Œ Original DataFrame:")
df.show()

Step 2: Persist the DataFrame

df.persist()
print("✅ DataFrame is now persisted (cached in memory).")
print(df.is_cached)  # Output: True

Step 3: Unpersist the DataFrame

df_unpersisted = df.unpersist()
print("✅ DataFrame is now unpersisted.")
df_unpersisted.show()

Step 4: Persist with Blocking = True

df_unpersisted_blocking = df.unpersist(blocking=True)
print("✅ DataFrame is now unpersisted with blocking=True (waits for cleanup).")

Summary

  • persist() stores a DataFrame in memory or disk to speed up performance.
  • unpersist() is used to clear cached/persisted data and release resources.
  • Always use unpersist() after you're done with cached data to avoid memory issues.
  • Use blocking=True if you want to wait for full cleanup before continuing.

๐Ÿ“บ Watch the Full Tutorial

What is Schema in Azure Synapse Analytics & How to Create It | Serverless SQL Pool Azure Synapse Analytics

What is Schema in Azure Synapse Analytics & How to Create It | Serverless SQL Tutorial

๐Ÿ“ What is Schema in Azure Synapse Analytics & How to Create It | Serverless SQL Tutorial

In Azure Synapse Analytics, a schema is a logical container within a database used to group related database objects such as tables, views, stored procedures, and functions. Schemas help organize and manage access to data in large data systems.

๐Ÿง  Why Use Schemas?

  • Improves organization and readability of your database
  • Supports role-based access control at the schema level
  • Allows multiple teams to work independently in the same database

๐Ÿ“˜ Example: Create Schema in Serverless SQL Pool

You can create a schema using the CREATE SCHEMA statement:


-- Switch to your database context first
USE myserverlessdb;
GO

-- Create a schema named 'reporting'
CREATE SCHEMA reporting;

๐Ÿ“ฆ Creating a Table in a Custom Schema


CREATE EXTERNAL TABLE reporting.CustomerData (
    CustomerID INT,
    Name NVARCHAR(100),
    Country NVARCHAR(50)
)
WITH (
    LOCATION = 'customer/',
    DATA_SOURCE = MyDataLake,
    FILE_FORMAT = CsvFormat
);

๐Ÿ›ก️ Permissions

To create a schema, the user must have CREATE SCHEMA permissions in the database or be a database owner.

๐Ÿ“Œ Best Practices

  • Use schemas to group business domains (e.g., sales, finance, reporting)
  • Assign permissions at the schema level for better access management
  • Prefix your objects with schema names (e.g., reporting.CustomerData) for clarity

๐Ÿ“บ Watch the Tutorial

Credit: This blog was created with the help of ChatGPT and Gemini.

How to Perform Cross Database Queries Using Serverless SQL Pool | Azure Synapse Analytics Tutorial

How to Perform Cross Database Queries Using Serverless SQL Pool - Azure Synapse Analytics Tutorial

๐Ÿ”„ How to Perform Cross Database Queries Using Serverless SQL Pool - Azure Synapse Analytics Tutorial

In Azure Synapse Analytics, Serverless SQL Pool supports querying across multiple databases in the same Synapse workspace. This is useful for combining data from different departments, functional areas, or staging/production environments without physically moving the data.

๐Ÿง  What Are Cross-Database Queries?

A cross-database query lets you join or query data between two or more databases by fully qualifying the object names using: [DatabaseName].[SchemaName].[TableName]

✅ Use Cases

  • Join reporting data from separate databases
  • Reference lookup tables maintained in a shared DB
  • Access historical logs stored in a different workspace DB

๐Ÿงช Example: Join Tables from Two Databases


SELECT a.CustomerID, a.Name, b.TotalSpent
FROM SalesDB.dbo.Customers a
JOIN FinanceDB.dbo.SpendSummary b
    ON a.CustomerID = b.CustomerID;

๐Ÿ“Œ Things to Remember

  • Both databases must reside in the same Synapse workspace
  • Use the correct database and schema names
  • Cross-database queries are supported only in Serverless SQL Pool (not in Dedicated SQL Pool)

๐Ÿ“บ Watch the Full Tutorial

Credit: This blog was created with the help of ChatGPT and Gemini.

How to Use toDF() in PySpark - Rename All DataFrame Columns Fast | PySpark Tutorial for Beginners

How to Use toDF() in PySpark | Rename All DataFrame Columns Fast

How to Use toDF() in PySpark – Rename All DataFrame Columns Fast

PySpark’s toDF() function allows you to rename all columns in a DataFrame in one go. This is especially helpful when dealing with raw datasets or when you want cleaner, more readable column names for downstream processing.

Step 1: Create a Sample DataFrame

data = [
  (1, "Aamir Shahzad"),
  (2, "Ali Raza"),
  (3, "Bob"),
  (4, "Lisa")
]
original_df = spark.createDataFrame(data, ["id", "name"])
print("๐Ÿ“Œ Original DataFrame with Default Column Names:")
original_df.show()

Step 2: Rename All Columns Using toDF()

renamed_df = original_df.toDF("user_id", "FirstName")
print("๐Ÿ“Œ DataFrame After Renaming Columns Using toDF():")
renamed_df.show()

Step 3: Compare Schema Before and After

print("๐Ÿ“Œ Schema Before:")
original_df.printSchema()

print("๐Ÿ“Œ Schema After:")
renamed_df.printSchema()

Step 4: Example with Mismatched Column Count (Error Case)

try:
    error_df = original_df.toDF("only_one_column")
except Exception as e:
    print("❌ Error: ", e)

Summary

  • toDF() is a quick way to rename all columns at once.
  • The number of new column names must exactly match the number of columns.
  • Helpful for schema clean-up and transforming raw ingestion data.

๐Ÿ“บ Watch the Full Tutorial

How to Join Parquet File with CSV Using OPENROWSET in Serverless SQL Pool-Azure Synapse Tutorial

How to Join Parquet File with CSV Using OPENROWSET in Serverless SQL Pool - Azure Synapse

๐Ÿ”— How to Join Parquet File with CSV Using OPENROWSET in Serverless SQL Pool - Azure Synapse

Azure Synapse Analytics Serverless SQL Pool allows you to query data stored in various file formats such as CSV and Parquet directly from Azure Data Lake using the OPENROWSET function. This blog post shows how you can join data between a Parquet file and a CSV file without ingesting them into a database.

๐Ÿง  File Format Overview

  • CSV (Comma Separated Values): A flat file format used to store tabular data, often with headers and delimited values.
  • Parquet: A columnar storage file format optimized for analytical workloads — fast to read specific columns and space-efficient.

๐Ÿ“˜ JOINing CSV and Parquet Files using OPENROWSET

Both files are read directly from Azure Data Lake using their respective OPENROWSET definitions, then joined using T-SQL.

๐Ÿงช Example Query


SELECT 
    csv.CustomerID,
    csv.Name,
    prq.Country,
    prq.TotalSpent
FROM 
    OPENROWSET(
        BULK 'https://yourlakehouse.dfs.core.windows.net/sales/customer.csv',
        FORMAT = 'CSV',
        PARSER_VERSION = '2.0',
        HEADER_ROW = TRUE
    ) AS csv
JOIN 
    OPENROWSET(
        BULK 'https://yourlakehouse.dfs.core.windows.net/sales/customer_spending.parquet',
        FORMAT = 'PARQUET'
    ) AS prq
ON csv.CustomerID = prq.CustomerID;

✅ Benefits of This Approach

  • No need to load files into tables
  • Low-cost, pay-per-query model
  • Supports ad hoc exploration of raw data

⚠️ Best Practices

  • Ensure that both files have compatible data types for join keys
  • Use SELECT only on required columns to reduce data scanned
  • Preview file schemas with TOP 10 or sp_describe_first_result_set before joining

๐Ÿ“บ Watch the Full Tutorial

Credit: This blog post was created with the help of ChatGPT and Gemini.

PySpark DataFrame sampleBy() Function - Group-Wise Sampling | PySpark Tutorial #pysparktutorial

PySpark sampleBy() Function | Group-Wise Sampling

PySpark DataFrame sampleBy() Function - Group-Wise Sampling

In this PySpark tutorial, learn how to use the sampleBy() function to perform group-wise sampling from a DataFrame. It's ideal for stratified sampling, testing models, or creating balanced subsets of data grouped by a specific column.

Step 1: Create a Sample DataFrame

data = [
  ("Aamir Shahzad", "Engineering"),
  ("Ali Raza", "HR"),
  ("Bob", "Engineering"),
  ("Lisa", "Marketing"),
  ("Ali Raza", "HR"),
  ("Aamir Shahzad", "Engineering"),
  ("Lisa", "Marketing"),
  ("Bob", "Engineering"),
  ("Aamir Shahzad", "Engineering"),
  ("Ali Raza", "HR")
]
columns = ["name", "department"]
df = spark.createDataFrame(data, columns)

print("๐Ÿ“Œ Original DataFrame:")
df.show()

Step 2: Count per Group

print("๐Ÿ“Š Count per name before sampling:")
df.groupBy("name").count().show()

Step 3: Apply sampleBy() with Different Sampling Fractions

fractions = {
  "Lisa": 1.0,
  "Ali Raza": 0.7,
  "Aamir Shahzad": 1.0
}
sampled_df = df.sampleBy("name", fractions=fractions, seed=42)

print("๐Ÿ“Š Sampled DataFrame:")
sampled_df.show()

Step 4: Count per Group After Sampling

print("๐Ÿ“Š Count per name after sampling:")
sampled_df.groupBy("name").count().show()

Step 5: Use Column Object Instead of String

from pyspark.sql.functions import col
bob_sample = df.sampleBy(col("name"), fractions={"Bob": 1.0}, seed=99)

print("๐Ÿ“Š Sample only 'Bob' using Column object:")
bob_sample.show()

Summary

  • sampleBy() is used for group-wise (stratified) sampling.
  • You define a sampling fraction per group key (e.g., name or category).
  • Great for testing, ML training balance, and subset analysis.

๐Ÿ“บ Watch the Full Tutorial

How to Connect to Serverless SQL Pool Using SQL Server Management Studio (SSMS) | Azure Synapse Analytics Tutorial

How to Connect to Serverless SQL Pool Using SQL Server Management Studio (SSMS)

๐Ÿ”Œ How to Connect to Serverless SQL Pool Using SQL Server Management Studio (SSMS)

Microsoft Azure Synapse Analytics offers a built-in serverless SQL pool that lets you run T-SQL queries on files stored in Azure Data Lake. You can conveniently connect to this pool using SQL Server Management Studio (SSMS) for executing queries, browsing metadata, and managing your scripts.

✅ Prerequisites

  • Installed SQL Server Management Studio (SSMS) – version 18.5 or later is recommended
  • Access to an Azure Synapse workspace
  • Azure Active Directory credentials (SSMS connects via AAD authentication)

๐Ÿ”‘ Connection Information

To connect from SSMS, you'll need the following:

  • Server name: <synapse-workspace-name>.sql.azuresynapse.net
  • Authentication: Azure Active Directory - Universal with MFA

๐Ÿ› ️ Steps to Connect

  1. Open SSMS
  2. Click Connect → Database Engine
  3. Enter the Server name (e.g., myworkspace.sql.azuresynapse.net)
  4. Select Authentication: Azure Active Directory - Universal with MFA
  5. Click Connect and complete the AAD login

๐Ÿ“Œ Tips

  • Use master or <your-database> to run queries
  • Execute views and external table queries as you would in Synapse Studio
  • Cannot run DDL to create dedicated SQL pools or Spark objects

๐Ÿ“บ Watch the Tutorial

Credit: This blog post was prepared with the help of ChatGPT and Gemini.

PySpark Tutorial_ Optimize Your Data with repartitionByRange() in PySpark | PySpark Tutorial

Optimize Data with repartitionByRange() in PySpark | PySpark Tutorial

Optimize Your Data with repartitionByRange() in PySpark

The repartitionByRange() function in PySpark is used for range-based repartitioning of data. It groups records based on specified column values and ensures even distribution for better performance.

Step 1: Create Sample DataFrame

data = [
  (1, "Aamir Shahzad"),
  (2, "Ali Raza"),
  (3, "Bob"),
  (4, "Lisa"),
  (5, "Ali Raza"),
  (6, "Aamir Shahzad"),
  (7, "Lisa"),
  (8, "Bob"),
  (9, "Aamir Shahzad"),
  (10, "Ali Raza")
]

df = spark.createDataFrame(data, ["id", "name"])

print("๐Ÿ“Œ Original DataFrame:")
df.show()

Step 2: Check Original Number of Partitions

original_partitions = df.rdd.getNumPartitions()
print(f"๐Ÿ“Š Original Number of Partitions: {original_partitions}")

Step 3: Repartition by Range on 'id'

df_repartitioned = df.repartitionByRange(3, "id")

Step 4: Check Number of Partitions After Repartitioning

new_partitions = df_repartitioned.rdd.getNumPartitions()
print(f"๐Ÿ“Š Number of Partitions after repartitionByRange: {new_partitions}")

Step 5: Add Partition Index Column to Inspect Distribution

from pyspark.sql.functions import spark_partition_id

df_with_partition_info = df_repartitioned.withColumn("partition_id", spark_partition_id())

print("๐Ÿ“Œ Partitioned Data Preview (Range Partitioned on id):")
df_with_partition_info.orderBy("id").show(truncate=False)

Summary

  • repartitionByRange() is ideal for range-based partitioning
  • Helps optimize performance for sorting, joins, and writes
  • Use sampling to estimate partition boundaries when needed
  • Useful in scenarios where hash repartitioning isn't efficient

๐Ÿ“บ Watch the Full Tutorial

How to Create Stored Procedure in Serverless SQL Pool Database -Azure Synapse Analytics Tutorial

How to Create Stored Procedure in Serverless SQL Pool | Azure Synapse Analytics

๐Ÿ› ️ How to Create Stored Procedure in Serverless SQL Pool | Azure Synapse Analytics

A Stored Procedure is a saved collection of T-SQL statements that you can execute as a unit. In Azure Synapse Analytics Serverless SQL Pool, you can use stored procedures to encapsulate business logic, complex queries, or reusable operations over data stored in your Data Lake.

✅ Why Use Stored Procedures?

  • Encapsulate logic and reuse easily
  • Improve manageability and organization of queries
  • Reduce query complexity for end users
  • Enable parameterized query execution

๐Ÿ“˜ Syntax Overview


CREATE PROCEDURE [schema].[ProcedureName]
AS
BEGIN
    -- your SQL logic here
END;

๐Ÿงช Example: Create and Execute a Stored Procedure

-- ==========================================
-- ๐Ÿ“š What is a Stored Procedure?
-- ==========================================
-- A Stored Procedure is a saved collection of SQL statements 
-- that can be executed as a program. 
-- Stored Procedures can accept parameters, perform queries, and return results.
-- In Serverless SQL Pool, Stored Procedures are used mainly for SELECT queries 
-- and metadata operations (no INSERT, UPDATE, DELETE).
-- You can not create the SP in master DB, Also you can see the SP from GUI.

-- ==========================================
-- ✅ How to Create a Stored Procedure in Serverless SQL Pool
-- ==========================================

CREATE OR ALTER PROCEDURE dbo.GetCustomerOrdersById
    @customer_id INT
AS
BEGIN
    SELECT 
        c.customer_id,
        c.fname,
        c.lastname
    FROM dbo.Customer c
    WHERE c.customer_id = @customer_id;
END;

-- ==========================================
-- ✅ How to Call (Execute) the Stored Procedure
-- ==========================================

-- Method 1: Using EXEC keyword
EXEC dbo.GetCustomerOrdersById @customer_id = 2;

-- Method 2: Short form (works in Synapse Studio)
dbo.GetCustomerOrdersById 1;

-- ==========================================
-- ✅ How to Check if a Stored Procedure Exists in Serverless SQL Pool
-- ==========================================

SELECT *
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_TYPE = 'PROCEDURE'
  AND ROUTINE_NAME = 'GetCustomerOrdersById';

-- ==========================================
-- ๐Ÿ”” Notes:
-- - Always use CREATE OR ALTER to safely update stored procedures.
-- - In Serverless SQL Pool, only SELECT and metadata operations are allowed inside procedures.
-- - Stored Procedures are metadata objects; no physical storage is used.
-- - Stored Procedures are not visible in Synapse Studio GUI — use queries to manage them.

๐Ÿ“Œ Notes

  • Stored procedures in Serverless SQL Pool are metadata-only and cannot modify physical data
  • You can execute them using EXEC [procedure_name]
  • They’re great for organizing logic for reports or dashboards

๐Ÿ“บ Watch the Full Tutorial

Credit: This post was created with help from ChatGPT and Gemini.

How to Perform JOINs Between Files in Azure Synapse Serverless SQL Pool | Azure Synapse Analytics Tutorial

How to Perform JOINs Between Files in Azure Synapse Serverless SQL Pool

๐Ÿ”— How to Perform JOINs Between Files in Azure Synapse Serverless SQL Pool

In Azure Synapse Analytics Serverless SQL Pool, you can join multiple CSV, Parquet, or JSON files from a Data Lake without loading them into traditional tables. This is a powerful feature for data exploration and building reports on top of raw data.

๐Ÿ” What is a JOIN?

A JOIN clause is used to combine rows from two or more sources based on a related column between them. Common types include:

  • INNER JOIN – returns only matching rows
  • LEFT JOIN – returns all rows from the left source and matched rows from the right
  • RIGHT JOIN – returns all rows from the right source and matched rows from the left
  • FULL JOIN – returns all matched and unmatched rows from both sides

๐Ÿ“‚ Joining External Files in Synapse

You can use OPENROWSET to read external files stored in Azure Data Lake and perform joins just like in regular T-SQL.

๐Ÿงช Example: INNER JOIN Between Two CSV Files

-- ==========================================
-- ๐Ÿ“š What is JOIN in SQL?
-- ==========================================
-- A JOIN clause is used to combine rows from two or more tables, 
-- based on a related column between them.

-- ==========================================
-- ๐Ÿ”ฅ Types of Joins Supported in Serverless SQL Pool (with Definitions)
-- ==========================================
-- ✅ INNER JOIN: Returns only the matching rows from both tables based on the join condition.

-- ✅ LEFT JOIN (LEFT OUTER JOIN): Returns all rows from the left table and matching rows from the right table; fills NULLs if no match.

-- ✅ RIGHT JOIN (RIGHT OUTER JOIN): Returns all rows from the right table and matching rows from the left table; fills NULLs if no match.

-- ✅ FULL OUTER JOIN: Returns all rows when there is a match in either the left or right table; fills NULLs where there is no match.

-- ❗ CROSS JOIN (supported but rarely used in files): Returns the Cartesian product of the two tables (every row from first table combined with every row from second table).

-- ==========================================
-- ✅ How to JOIN using External Tables
-- ==========================================
-- Assume we have two external tables:
-- dbo.Customer (customer_id, fname, lastname)
-- dbo.Orders (order_id, customer_id, order_date)

SELECT 
    c.customer_id,
    c.fname,
    c.lastname,
    o.order_id,
    o.order_date
FROM dbo.Customer c
INNER JOIN dbo.Orders o
ON c.customer_id = o.customer_id;

-- ==========================================
-- ✅ How to JOIN using OPENROWSET Directly
-- ==========================================
SELECT
    c.customer_id,
    c.fname,
    c.lastname,
    o.order_id,
     o.order_date
FROM
    OPENROWSET(
        BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/customer.csv',
        FORMAT = 'CSV',
        PARSER_VERSION = '2.0',
        HEADER_ROW=true
    ) AS c
LEFT JOIN
    OPENROWSET(
        BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/orders.csv',
        FORMAT = 'CSV',
        PARSER_VERSION = '2.0',
        HEADER_ROW=true
    ) AS o
ON
    c.customer_id = o.customer_id;

-- ==========================================
-- ⚡ Notes:
-- - For OPENROWSET, everything is read as VARCHAR, so CAST if needed.
-- - FIRSTROW = 2 means skipping the CSV header.
-- - Ensure customer_id formats match when joining.
-- - Use LEFT JOIN or FULL OUTER JOIN depending on your data requirement.

⚙️ Best Practices

  • Use filters to minimize scanned data
  • Select only necessary columns
  • Preview schemas before joining using TOP 10 or sp_describe_first_result_set

๐Ÿ“บ Watch the Tutorial

Credit: This post was assisted by ChatGPT and Gemini.

How to Convert PySpark DataFrame to RDD Using .rdd | PySpark RDD vs DataFrame #pysparktutorial

How to Convert PySpark DataFrame to RDD Using .rdd

How to Access RDD from DataFrame Using .rdd in PySpark

The .rdd property lets you access the underlying RDD (Resilient Distributed Dataset) of a DataFrame. This is useful when you need low-level RDD operations or more control than the DataFrame API provides.

Step 1: Create a Sample DataFrame

data = [
    ("Aamir Shahzad", "Engineering", 100000),
    ("Ali Raza", "HR", 70000),
    ("Bob", "Engineering", 80000),
    ("Lisa", "Marketing", 65000)
]

columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)

print("๐Ÿ“Œ Original DataFrame:")
df.show()

Step 2: Convert DataFrame to RDD

rdd_from_df = df.rdd

print("๐Ÿ“Œ Type of rdd_from_df:")
print(type(rdd_from_df))  # Output: <class 'pyspark.rdd.RDD'>

Step 3: Print RDD Contents

print("๐Ÿ“Œ RDD Contents:")
for row in rdd_from_df.collect():
    print(row)

Step 4: Use RDD Transformations

# Extract just names from RDD
name_rdd = df.rdd.map(lambda row: row['name'])

print("๐Ÿ“Œ Names from RDD:")
print(name_rdd.collect())

Summary

✅ Use .rdd to convert a DataFrame to an RDD of Row objects.
✅ You can then apply RDD operations like map, filter, and reduce.
✅ Useful when you need more flexibility than the DataFrame API allows.

๐Ÿ“บ Watch the Full Tutorial

Export Data to CSV, JSON or XML & Save Scripts in Folders | Azure Synapse Analytics Tutorial

Export Data to CSV, JSON or XML & Save Scripts in Folders | Azure Synapse Analytics Tutorial

๐Ÿ“ค Export Data to CSV, JSON or XML & Save Scripts in Folders | Azure Synapse Analytics Tutorial

Azure Synapse Analytics provides an easy way to export query results to file formats such as CSV, JSON, or XML using the built-in Export functionality in the Synapse Studio UI. This is especially helpful when using Serverless SQL Pools to analyze external files and then saving the results for reporting or sharing.

๐Ÿง  What Is Export in Synapse Studio?

After running a query in the Synapse SQL script editor using Serverless SQL Pools, you can directly export the results by clicking the Export button and choosing the desired format.

Supported Formats:

  • CSV – Standard format for tabular data
  • JSON – Structured data, often used in web and API systems
  • XML – Hierarchical markup format used for data exchange

๐Ÿงช Example


SELECT Country, COUNT(*) AS TotalCustomers
FROM OPENROWSET(
    BULK 'https://yourstorage.dfs.core.windows.net/container/customers/*.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) AS customers
GROUP BY Country;

Once this query runs, click the Export button above the results pane and choose CSV, JSON, or XML to download the output.

๐Ÿ“ Saving Scripts to Folders

Azure Synapse Studio also allows you to save your SQL scripts and notebooks in custom folders. This helps organize your workspace and makes it easier to collaborate or revisit queries later.

Steps to Save a Script:

  1. After writing or running a script, click the Save As icon
  2. Select the destination folder or create a new one
  3. Provide a name and confirm to save the script

๐Ÿ“Œ Benefits

  • Download query results with a single click
  • No manual data copying or transformation required
  • Scripts are versionable and organized in workspaces

๐Ÿ“บ Watch the Full Tutorial

Credit: This article was created with the assistance of ChatGPT and Gemini.

How to Use persist() Function in PySpark | PySpark Cache vs Persist Explained with Examples

How to Use persist() Function in PySpark | Cache vs Persist Explained

How to Use persist() Function in PySpark – Cache vs Persist Explained

The persist() function in PySpark is used to cache or store a DataFrame's intermediate results. It's especially useful when the same DataFrame is reused multiple times and you want to avoid recomputation for performance gains. Learn how it differs from cache() and how to use it effectively.

Step 1: Create a Sample DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = [
  ("Aamir Shahzad", "Engineering", 100000),
  ("Ali Raza", "HR", 70000),
  ("Bob", "Engineering", 80000),
  ("Lisa", "Marketing", 65000)
]
columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)

print("๐Ÿ“Œ Sample DataFrame:")
df.show()

Step 2: Use persist() to Store Intermediate Result

from pyspark.storagelevel import StorageLevel

df_cached = df.filter(df.salary > 70000).persist(StorageLevel.MEMORY_AND_DISK)
print("๐Ÿ“Œ Count of high earners (cached):", df_cached.count())
print("๐Ÿ“Œ Average salary (cached):", df_cached.groupBy().avg("salary").collect())

Step 3: Optionally Unpersist to Free Memory

df_cached.unpersist()

Summary

  • persist() allows finer control than cache() by letting you specify the storage level (memory, disk, or both).
  • Useful for long-running or reused DataFrames to reduce recomputation.
  • Always call unpersist() after you're done to free up resources.

๐Ÿ“บ Watch the Full Tutorial

SQL GROUP BY with External Table and CSV File | Azure Synapse Analytics Tutorial | Azure Synapse Analytics Tutorial

SQL GROUP BY with External Table and CSV File | Azure Synapse Analytics Tutorial

๐Ÿ“Š SQL GROUP BY with External Table and CSV File | Azure Synapse Analytics Tutorial

In Azure Synapse Analytics, especially when using Serverless SQL Pools, the GROUP BY clause is essential for summarizing data. It allows you to perform aggregations such as COUNT, SUM, AVG, and more — grouped by one or more columns.

✅ What is GROUP BY?

The GROUP BY clause groups rows that have the same values into summary rows, like "total sales by region". It's often used with aggregate functions.

๐Ÿ“‚ Using GROUP BY with External CSV Files

In Synapse Serverless SQL Pools, you can use OPENROWSET to query external files stored in Azure Data Lake without needing to load them into a database.

๐Ÿงช Example 1: Total Customers per Country from CSV File


SELECT Country, COUNT(*) AS TotalCustomers
FROM OPENROWSET(
    BULK 'https://yourstorage.dfs.core.windows.net/container/customers/*.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) AS [result]
GROUP BY Country;

๐Ÿงช Example 2: Total Revenue per Product from External Table


SELECT ProductID, SUM(Revenue) AS TotalRevenue
FROM dbo.ExternalSalesTable
GROUP BY ProductID;

๐Ÿ” Tips

  • Always include only columns in GROUP BY or aggregate functions in the SELECT clause
  • Use aliases for aggregate expressions for better readability
  • Test query performance — file format (CSV, Parquet) impacts it significantly

๐Ÿ“บ Watch the Tutorial

Credit: This blog was created with the help of ChatGPT and Gemini.

PySpark offset() Function: How to Skip Rows in Spark DataFrame | PySpark Tutorial for Beginners

PySpark offset() Function: Skip Rows in Spark DataFrame

Using offset() in PySpark

The offset() function is used to skip a number of rows before returning results from a DataFrame. It is commonly used with orderBy() and limit() for pagination or partial data fetching.

Step 1: Create Sample Data

data = [
  ("Aamir Shahzad", "Engineering", 100000),
  ("Ali Raza", "HR", 70000),
  ("Bob", "Engineering", 80000),
  ("Lisa", "Marketing", 65000),
  ("Aamir Shahzad", "Engineering", 95000),
  ("Ali Raza", "HR", 72000),
  ("Bob", "Engineering", 85000),
  ("Lisa", "Marketing", 66000)
]

columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)
df.show()

Step 2: Use offset() with orderBy() and limit()

# Example: Skip first 2 rows after sorting by salary
paginated_df = df.orderBy("salary", ascending=True).offset(2).limit(3)

print("๐Ÿ“Š Paginated Result (Skip 2, Take 3):")
paginated_df.show()

Use Cases of offset()

  • Pagination large DataFrames
  • Explore partial results
  • Implement front-end-style pagination in reports

๐Ÿ“บ Watch the Full Tutorial

Filter and Sort Data in Azure Synapse SQL | WHERE and ORDER BY with OPENROWSET | Azure Synapse Analytics Tutorial

Filter and Sort Data in Azure Synapse SQL | WHERE and ORDER BY Explained

๐Ÿ”Ž Filter and Sort Data in Azure Synapse SQL

In Azure Synapse SQL, the WHERE clause is used to filter rows based on specific conditions, and the ORDER BY clause is used to sort the result set based on one or more columns. These clauses are fundamental for querying structured datasets efficiently.

๐Ÿ“˜ Key Concepts

  • WHERE: Filters rows based on a condition
  • ORDER BY: Sorts the result by one or more columns in ascending (ASC) or descending (DESC) order

๐Ÿงช Example SQL Code


-- Filtering Data with WHERE Clause in Azure Synapse Analytics ( Serverless SQL Pool)
-- ==================================================================================

-- ๐Ÿ”น Definition:
-- The WHERE clause is used to filter records and retrieve only the rows that satisfy a specified condition.

-- ✅ Example: Filtering data from External Table
SELECT
    id,
    fname,
    lname,
    salary
FROM dbo.users
WHERE fname='Aamir' OR  salary>5000

-- ✅ Example: Filtering data from CSV File using OPENROWSET
SELECT
    id,
    fname,
    lname,salary
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/customer_20250310_090501_US.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) WITH (
    id NVARCHAR(4000),
    fname NVARCHAR(4000),
    lname NVARCHAR(4000),
    salary FLOAT
) AS customer_data
WHERE Salary > 4000 and id>1


-- ======================================================
-- Sorting and Ordering Data in Azure Synapse Analytics
-- ======================================================

-- ๐Ÿ”น Definition:
-- The ORDER BY clause is used to sort the result-set in ascending (ASC) or descending (DESC) order based on one or more columns.

-- ✅ Example: Sorting data from External Table
SELECT
    id,
    fname,
    lname,
    salary
FROM dbo.users
ORDER BY salary;

-- ✅ Example: Sorting data from CSV File using OPENROWSET
SELECT
    id,
    fname,
    lname,
    salary
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/customer_20250310_090501_US.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) WITH (
    id NVARCHAR(4000),
    fname NVARCHAR(4000),
    lname NVARCHAR(4000),
    salary NVARCHAR(4000)
) AS customer_data
ORDER BY lname, CAST(salary AS FLOAT) ASC

๐Ÿ“บ Watch the Tutorial

Credit: This blog post was assisted by ChatGPT and Gemini.

What is explain() in PySpark - Spark Logical vs Physical Plan - PySpark Tutorial for Beginners

How to Use explain() Function in PySpark | Logical vs Physical Plan

What is explain() in PySpark?

The explain() function in PySpark is used to understand how Spark plans to execute a query. It shows:

  • Logical Plan – what you want to do
  • Physical Plan – how Spark will do it

This is useful for debugging and performance tuning.

Step 1: Create Sample DataFrame

data = [
    ("Aamir Shahzad", "Engineering", 100000),
    ("Ali Raza", "HR", 70000),
    ("Bob", "Engineering", 80000),
    ("Lisa", "Marketing", 65000),
    ("Aamir Shahzad", "Engineering", 100000)
]

columns = ["name", "department", "salary"]
df = spark.createDataFrame(data, columns)
df.show()

Step 2: Run explain() on a Filter Operation

filtered_df = df.filter(df.salary > 75000)
print("๐Ÿ“Œ Physical Plan (default explain):")
filtered_df.explain()

Step 3: Use explain('extended') to view all plan stages

print("๐Ÿ“Œ Full Plan (explain with 'extended'):")
filtered_df.explain("extended")

Summary

  • Use explain() to debug and optimize your PySpark queries.
  • It reveals how Spark interprets and plans your DataFrame code.

๐Ÿ“บ Watch the Full Tutorial

Creating Views in Azure Synapse Analytics for Serverless SQL Pool | Azure Synapse Tutorial

Creating Views in Azure Synapse Analytics for Serverless SQL Pool

๐Ÿงฑ Creating Views in Azure Synapse Analytics for Serverless SQL Pool

Views are virtual tables that provide a logical abstraction over data. In Azure Synapse Analytics Serverless SQL Pool, creating views allows you to encapsulate complex queries, reuse logic, and build semantic layers on top of data stored in your Data Lake.

✅ Why Use Views in Serverless SQL Pool?

  • Encapsulate logic over flat files like CSV, Parquet, or JSON
  • Improve maintainability and reusability of T-SQL
  • Power BI and other tools can query views directly
  • Views can abstract complex joins and transformations

๐Ÿ“˜ Syntax to Create a View


CREATE VIEW dbo.MyView AS
SELECT *
FROM OPENROWSET(
    BULK 'https://.dfs.core.windows.net//folder/*.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) AS [result];

๐Ÿงช Example 1: Create View on CSV Files


CREATE VIEW dbo.CustomerView AS
SELECT *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/customers/*.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) AS [result];

๐Ÿงช Example 2: Create View with Projection


CREATE VIEW dbo.SalesSummary AS
SELECT 
    CustomerID,
    SUM(SalesAmount) AS TotalSales
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/sales/*.parquet',
    FORMAT = 'PARQUET'
) AS sales
GROUP BY CustomerID;

๐Ÿง  Tips

  • Views are metadata only — data is read on-demand
  • You can query views like any regular table
  • Use sys.views to list all views

๐Ÿ“บ Watch the Full Tutorial

Credit: This blog was assisted by ChatGPT and Gemini for generating structured explanations and examples.

PySpark localCheckpoint Explained | Improve Performance with localCheckpoint() in Spark | PySpark Tutorial

PySpark localCheckpoint Tutorial | Improve Performance in Spark

PySpark localCheckpoint Tutorial

localCheckpoint() in PySpark allows you to truncate the logical plan lineage of a DataFrame and save it in memory for performance improvements. Unlike checkpoint(), it does not require setting up a checkpoint directory on disk, making it faster but less fault-tolerant.

๐Ÿ”น Step 1: Set Up SparkSession


from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LocalCheckpointExample").getOrCreate()
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")

๐Ÿ”น Step 2: Create a Sample DataFrame


data = [("Aamir Shahzad", 35), ("Ali Raza", 40), ("Bob", 29), ("Lisa", 31)]
columns = ["name", "age"]

df = spark.createDataFrame(data, columns)
df.show()

Output:


+-------------+---+
| name        |age|
+-------------+---+
|Aamir Shahzad| 35|
|Ali Raza     | 40|
|Bob          | 29|
|Lisa         | 31|
+-------------+---+

๐Ÿ”น Step 3: Add Transformation


from pyspark.sql.functions import col

df_transformed = df.withColumn("age_plus_10", col("age") + 10)

๐Ÿ”น Step 4: Apply localCheckpoint()


df_checkpointed = df_transformed.localCheckpoint()

๐Ÿ”น Step 5: View Results


print("Transformed and Checkpointed DataFrame:")
df_checkpointed.show()

Output:


+-------------+---+-----------+
| name        |age|age_plus_10|
+-------------+---+-----------+
|Aamir Shahzad| 35|         45|
|Ali Raza     | 40|         50|
|Bob          | 29|         39|
|Lisa         | 31|         41|
+-------------+---+-----------+

๐Ÿ“บ Watch the Full Video Tutorial

Read Files from Folder and Subfolders Using Wildcards + Add FileName & FilePath By using Serverless SQL Pool | Azure Synapse Tutorial for Data Engineers

Read Files from Folder and Subfolders Using Wildcards | Azure Synapse

๐Ÿ“‚ Read Files from Folder and Subfolders Using Wildcards + Add FileName & FilePath | Azure Synapse

In Azure Synapse Analytics, you can use wildcards like * and ** inside OPENROWSET to read multiple CSV files from a folder or even nested subfolders. This feature simplifies data ingestion and reduces manual work in scenarios with partitioned or log-based folder structures.

✅ Example 1: Read All CSV Files from a Folder


SELECT *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/*.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) AS [result];
  

๐Ÿ“ Example 2: Read from Subfolders

Use the wildcard pattern in subdirectories to read files recursively:


SELECT *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/Region/**/*.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) AS [result];
  

๐Ÿง  Adding File Name and File Path to Results

You can also include the full path and filename using filepath() in WITH clause:


SELECT
    filepath() AS [FilePath],
    *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/Region/**/*.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    HEADER_ROW = TRUE
) AS [result];
  

๐Ÿ’ก Benefits

  • Zero-code pattern to ingest massive folder structures
  • Perfect for event-driven and incremental datasets
  • Supports Power BI, Dataflows, and external table scenarios

๐Ÿ“บ Watch the Full Tutorial

Credit: This article was assisted by ChatGPT and Gemini for technical drafting and formatting.

PySpark Joins Explained | Inner, Left, Right Join with Examples in PySpark DataFrames #pyspark | PySpark Tutorial

How to Use Joins in PySpark | Inner, Left, Right, and Outer Explained

How to Use Joins in PySpark: Inner, Left, Right & Outer Explained

In this tutorial, we’ll explore how different types of joins work in PySpark, including Inner Join, Left Join, Right Join, and Outer Join.

๐Ÿ”ธ Step 1: Create Sample DataFrames


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder.appName("PySparkJoinExample").getOrCreate()

data_people = [
    ("1", "Aamir Shahzad"),
    ("2", "Ali Raza"),
    ("3", "Bob"),
    ("4", "Lisa"),
    ("5", "John"),
    ("6", None)
]

schema_people = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True)
])

df_people = spark.createDataFrame(data_people, schema_people)
df_people.show()

Output:


+---+-------------+
| id|         name|
+---+-------------+
|  1| Aamir Shahzad|
|  2|      Ali Raza|
|  3|           Bob|
|  4|          Lisa|
|  5|          John|
|  6|          null|
+---+-------------+

๐Ÿ”ธ Step 2: Create Department DataFrame


data_dept = [
    ("1", "Engineering"),
    ("2", "Marketing"),
    ("4", "HR"),
    ("6", "Sales")
]

schema_dept = StructType([
    StructField("id", StringType(), True),
    StructField("department", StringType(), True)
])

df_dept = spark.createDataFrame(data_dept, schema_dept)
df_dept.show()

Output:


+---+-----------+
| id| department|
+---+-----------+
|  1|Engineering|
|  2| Marketing |
|  4|        HR |
|  6|     Sales |
+---+-----------+

๐Ÿ”ธ Example 1: INNER JOIN


df_inner = df_people.join(df_dept, on="id", how="inner")
df_inner.show()

Output: Only matching records


+---+-------------+-----------+
| id|         name| department|
+---+-------------+-----------+
|  1| Aamir Shahzad|Engineering|
|  2|      Ali Raza| Marketing |
|  4|          Lisa|        HR |
+---+-------------+-----------+

๐Ÿ”ธ Example 2: LEFT JOIN


df_left = df_people.join(df_dept, df_people.id == df_dept.id, how="left")
df_left.show()

Output: All people with departments if available

๐Ÿ”ธ Example 3: RIGHT JOIN


df_right = df_people.join(df_dept, on="id", how="right")
df_right.show()

Output: All departments with people if available

๐Ÿ”ธ Example 4: FULL OUTER JOIN


df_outer = df_people.join(df_dept, on="id", how="outer")
df_outer.show()

Output: All records from both DataFrames


+---+-------------+-----------+
| id|         name| department|
+---+-------------+-----------+
|  1| Aamir Shahzad|Engineering|
|  2|      Ali Raza| Marketing |
|  3|           Bob|       null|
|  4|          Lisa|        HR |
|  5|          John|       null|
|  6|          null|     Sales |
+---+-------------+-----------+

๐Ÿ“บ Watch Full Tutorial

Cost Management for Serverless SQL Pool in Azure Synapse Analytics | Optimize Your Budget | Azure Synapse Analytics Tutorial

Cost Management for Serverless SQL Pool in Azure Synapse Analytics

๐Ÿ’ฐ Cost Management for Serverless SQL Pool in Azure Synapse Analytics | Optimize Your Budget

Serverless SQL Pool in Azure Synapse Analytics offers flexibility and ease for querying big data directly from Azure Data Lake. But to make the most of its pay-per-query model, it's crucial to understand how costs accumulate and how to manage them effectively.


๐Ÿ“Š How Serverless SQL Pool Pricing Works

  • ๐Ÿ’ต Billed at $5 per TB of data processed (as of writing — subject to change)
  • ✅ No cost for metadata-only queries, failed queries, or cached query replays
  • ๐Ÿง  You only pay for the amount of data scanned — not storage or compute provisioning

๐Ÿ› ️ Track Usage with DMV

Use the built-in dynamic management view sys.dm_external_data_processed to monitor processed data per session or query.

SELECT
    session_id,
    user_name,
    start_time,
    end_time,
    external_data_source_name,
    total_bytes_processed / 1024.0 / 1024.0 AS MB_Processed,
    total_bytes_processed / 1024.0 / 1024.0 / 1024.0 AS GB_Processed
FROM sys.dm_external_data_processed
ORDER BY start_time DESC;

๐Ÿ’ก Tip: Monitor usage trends over time and identify high-cost queries for tuning.

๐Ÿง  Best Practices to Optimize Serverless SQL Costs

  • ๐ŸŽฏ Use SELECT with specific columns instead of *
  • ๐Ÿ” Apply WHERE filters early to minimize scanned rows
  • ๐Ÿ“ Partition files in folders for more efficient scanning
  • ๐Ÿ“ Cache results in Power BI or downstream tools instead of rerunning queries
  • ๐Ÿ“‚ Prefer Parquet format over CSV — it’s columnar and compresses well
  • ๐Ÿ”„ Reuse successful query results to avoid reprocessing

๐Ÿ“‰ Monitor Query Patterns

Use Synapse Studio’s built-in Monitor Hub to track executions and durations. You can also connect Synapse logs to Azure Monitor or Log Analytics for more detailed analysis.

๐Ÿ“Œ Quick Formula

Cost = (Bytes Processed ÷ 1 TB) × $5

For example, processing 200 GB:

  • 200 ÷ 1024 = 0.195 TB
  • 0.195 × $5 = $0.98

๐Ÿ“บ Watch the Full Tutorial

Learn how to control costs in Serverless SQL Pool step-by-step:

Create a Serverless SQL Pool Database and External Table in Azure Synapse | Azure Synapse Analytics Tutorial

What is Serverless SQL Pool? Create a Serverless SQL Pool Database and External Table

๐Ÿ” What is Serverless SQL Pool?
Create a Serverless SQL Pool Database and External Table in Azure Synapse

Serverless SQL pool in Azure Synapse is a pay-per-query distributed query engine that enables users to run T-SQL queries directly on data stored in Azure Data Lake Storage (ADLS) without the need to provision or manage dedicated compute resources.

✅ Key Features:

  • No infrastructure management
  • Query data in CSV, Parquet, or JSON formats
  • Cost-efficient: only pay for the data you query
  • Supports OPENROWSET and external tables

๐Ÿ“† How to Create a Serverless SQL Pool Database

Serverless SQL Pool is available by default in each Synapse workspace under the name Built-in or ServerlessSQLPool.

Steps:

  1. Open Azure Synapse Studio.
  2. Navigate to the Data tab.
  3. Choose the built-in pool.
  4. Run the following T-SQL to create a new database:
CREATE DATABASE demo_serverless_db;

You can now query data directly using T-SQL from this database.

๐Ÿ’ฐ Pricing Details

Serverless SQL pool uses a pay-per-query pricing model.

๐Ÿ“† Charges:

  • $5 per TB of data processed (latest pricing may vary)
  • No charges for:
    • Metadata-only queries
    • Failed queries
    • Repeated queries (cached)
  • Use sys.dm_external_data_processed to track usage.

๐Ÿ’ธ Cost Formula:

Processed Data (in TB) × $5 = Total Cost

๐Ÿ› ️ Creating External Tables in Serverless SQL Pool

External tables provide schema-on-read capability over data in Azure Data Lake.

✅ Prerequisites:

  • Data must reside in Azure Data Lake Storage Gen2
  • Create an External Data Source and a File Format

Example Setup:

-- Step 1: External Data Source
CREATE EXTERNAL DATA SOURCE MyDataLake
WITH (
    LOCATION = 'https://<yourstorage>.dfs.core.windows.net/<container>'
);

-- Step 2: File Format
CREATE EXTERNAL FILE FORMAT CsvFormat
WITH (
    FORMAT_TYPE = DELIMITEDTEXT,
    FORMAT_OPTIONS (
        FIELD_TERMINATOR = ',',
        STRING_DELIMITER = '"',
        FIRST_ROW = 2
    )
);

-- Step 3: External Table
CREATE EXTERNAL TABLE dbo.CustomerData (
    customer_id INT,
    fname       VARCHAR(50),
    lname       VARCHAR(50),
    salary      FLOAT
)
WITH (
    LOCATION = 'customer/',
    DATA_SOURCE = MyDataLake,
    FILE_FORMAT = CsvFormat
);

-- Query the Table
SELECT TOP 10 * FROM dbo.CustomerData;

๐Ÿ’ก Tips and Extras

Using OPENROWSET for Quick Queries:

SELECT * FROM OPENROWSET(
    BULK 'customer/file.csv',
    DATA_SOURCE = 'MyDataLake',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    FIRSTROW = 2
) AS data;
  • No need to ingest data
  • Always reads the most recent file content

๐Ÿš€ Summary

Serverless SQL pools offer a flexible, cost-effective way to query files directly in your Data Lake without complex infrastructure. Ideal for exploratory data analysis, quick reports, and hybrid data architecture use cases.

๐Ÿ“บ Watch the Full Tutorial

Learn how to set up and use Serverless SQL Pool in the video below:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure technical accuracy and clarity.

PySpark Tutorial: PySpark isLocal Function : Check If DataFrame Operations Run Locally #pyspark

PySpark isLocal() Function Tutorial | Check If DataFrame Operations Run Locally

PySpark isLocal() Function Tutorial

The isLocal() function in PySpark is used to check whether your DataFrame is running in local mode. This can be helpful for debugging or validating whether your code is running on a distributed cluster or a local machine.

Step 1: Import SparkSession


from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("isLocal Example").getOrCreate()

Step 2: Create Sample Data


data = [
    ("Aamir Shahzad", "Pakistan", 30),
    ("Ali Raza", "USA", 28),
    ("Bob", "UK", 45),
    ("Lisa", "Canada", 33)
]
columns = ["Name", "Country", "Age"]
df = spark.createDataFrame(data, schema=columns)

Step 3: Show the DataFrame


df.show()

Output:


+--------------+--------+---+
|         Name | Country|Age|
+--------------+--------+---+
|Aamir Shahzad |Pakistan| 30|
|Ali Raza      |USA     | 28|
|Bob           |UK      | 45|
|Lisa          |Canada  | 33|
+--------------+--------+---+

Step 4: Use isLocal() to Check Execution Mode


try:
    print("Is this DataFrame local? =", df.isLocal())
except Exception as e:
    print("⚠️ df.isLocal() is not supported in this environment:", str(e))

Output (example):


Is this DataFrame local? = False

๐Ÿ“บ Watch the Full Tutorial on YouTube

Using OPENROWSET with Explicit Schema in Azure Synapse | Azure Synapse Analytics Tutorial

Azure Synapse Analytics Demo: OPENROWSET with Schema

๐Ÿ’ก Azure Synapse Analytics Demo: OPENROWSET with Schema

Author: Aamir Shahzad

Description: Demonstrating how to explicitly define schema using the WITH clause in OPENROWSET for CSV, Parquet, and JSON files using Serverless SQL Pool in Azure Synapse.


๐Ÿ”น Example 1: Read Specific Columns from a CSV File

When reading CSV files, you can define schema using column positions. This avoids loading unnecessary columns and improves query performance.

SELECT TOP 10 *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/Europe/customer_20250313_194510_EMEA.csv',
    FORMAT = 'CSV',
    PARSER_VERSION = '2.0',
    FIRSTROW = 2
) WITH (
    customer_id INT 1,           -- Customer ID
    FName       VARCHAR(50) 2 ,  -- First Name
    EmployeeSalary VARCHAR(50) 4 -- Salary
) AS tbl;

Note: If ordinal positions are not provided, Synapse assumes left-to-right order starting at column 1.

๐Ÿ”น Example 2: Read Parquet File with Explicit Schema

Parquet files support named fields. You don’t need to specify positions — but your column names in the WITH clause must match the actual schema inside the Parquet file.

SELECT *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/titanic.parquet',
    FORMAT = 'PARQUET'
) WITH (
    Sex  VARCHAR(10),      -- Gender
    Age  FLOAT,            -- Age
    Name VARCHAR(200)      -- Full Name
) AS rows;

Tip: If the column names do not match exactly, the result will show NULLs.

๐Ÿ”น Example 3: Parse and Read Values from a JSON File

This method lets you extract values from NDJSON (newline-delimited JSON) using OPENROWSET combined with OPENJSON.

SELECT
    jsonContent,
    jsonData.customer_id,
    jsonData.name,
    jsonData.email
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/sample_customers.json',
    FORMAT = 'CSV',
    FIELDQUOTE = '0x0b',
    FIELDTERMINATOR = '0x0b',
    ROWTERMINATOR = '0x0a'
) WITH (
    jsonContent VARCHAR(MAX)  -- Read JSON as text blob
) AS raw
CROSS APPLY OPENJSON(jsonContent)
WITH (
    customer_id INT,
    name        VARCHAR(100),
    email       VARCHAR(100)
) AS jsonData;

Explanation: This approach tricks Synapse into treating the whole line as a single string using 0x0b delimiters, allowing JSON parsing with OPENJSON().


๐Ÿ“บ Watch the Full Video Tutorial

Here’s a complete walkthrough demo on YouTube:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure technical clarity and accuracy.

How to Use PySpark hint() for Join Optimization – Broadcast, Shuffle, Merge | PySpark Tutorial

PySpark hint() Function Tutorial – Optimize Joins with Broadcast and Merge

PySpark hint() Function Tutorial – Optimize Joins with Broadcast and Merge

Introduction

The hint() function in PySpark allows developers to influence the query optimizer by suggesting specific join strategies. This tutorial demonstrates how to use hints like broadcast, merge, and shuffle to improve join performance.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("HintFunctionDemo").getOrCreate()

2. Sample DataFrames

Create two small DataFrames for employees and departments.

data_employees = [
    (1, "Aamir Shahzad", "Sales"),
    (2, "Ali Raza", "Marketing"),
    (3, "Bob", "Sales"),
    (4, "Lisa", "Engineering"),
    (5, "Charlie", "Marketing"),
    (6, "David", "Sales")
]

columns_employees = ["employee_id", "employee_name", "department"]

employees_df = spark.createDataFrame(data_employees, columns_employees)

data_departments = [
    ("Sales", "New York"),
    ("Marketing", "San Francisco"),
    ("Engineering", "Seattle"),
    ("Finance", "Chicago")
]

columns_departments = ["dept_name", "location"]

departments_df = spark.createDataFrame(data_departments, columns_departments)

employees_df.show()

Output:

+-----------+--------------+-----------+
|employee_id|employee_name |department |
+-----------+--------------+-----------+
|1          |Aamir Shahzad |Sales      |
|2          |Ali Raza      |Marketing  |
|3          |Bob           |Sales      |
|4          |Lisa          |Engineering|
|5          |Charlie       |Marketing  |
|6          |David         |Sales      |
+-----------+--------------+-----------+

3. Example 1: Using the broadcast hint

from pyspark.sql.functions import col

joined_df_broadcast = employees_df.join(
    departments_df.hint("broadcast"),
    employees_df["department"] == departments_df["dept_name"]
)

joined_df_broadcast.explain()
joined_df_broadcast.show()

4. Example 2: Using the merge hint (for sort-merge join)

joined_df_merge = employees_df.join(
    departments_df.hint("merge"),
    employees_df["department"] == departments_df["dept_name"]
)

joined_df_merge.explain()
joined_df_merge.show()

5. Example 3: Using the shuffle hint

joined_df_shuffle = employees_df.join(
    departments_df.hint("shuffle"),
    employees_df["department"] == departments_df["dept_name"]
)

joined_df_shuffle.explain()
joined_df_shuffle.show()

6. Conclusion

Use hint() in PySpark wisely to control join strategies and optimize query performance. Always validate your hint with explain() to ensure Spark chooses the desired execution plan. These hints are especially useful for large datasets where default join strategies may not be optimal.

๐ŸŽฅ Watch the Full Video Tutorial

© 2025 Aamir Shahzad. All rights reserved.

How to Check File Schema Using Serverless SQL Pool in Azure Synapse | Azure Synapse Analytics Tutorial

How to Check File Schema Using Serverless SQL Pool in Azure Synapse

๐Ÿ“˜ How to Check File Schema Using Serverless SQL Pool in Azure Synapse

๐ŸŽฏ Objective:

Learn how to inspect the schema (columns and data types) of CSV, Parquet, and JSON files stored in Azure Data Lake Gen2 using Serverless SQL Pool — without importing the data.

๐Ÿงช Option 1: Use TOP 1 for a Quick Preview

✅ CSV File

SELECT TOP 1 *
FROM OPENROWSET(
    BULK 'https://<account>.dfs.core.windows.net/<container>/yourfile.csv',
    FORMAT = 'CSV',
    HEADER_ROW = TRUE,
    PARSER_VERSION = '2.0'
) AS rows;

✅ Parquet File

SELECT TOP 1 *
FROM OPENROWSET(
    BULK 'https://<account>.dfs.core.windows.net/<container>/yourfile.parquet',
    FORMAT = 'PARQUET'
) AS rows;

✅ JSON File (NDJSON)

To preview the schema of newline-delimited JSON (NDJSON), use the WITH clause and parse with OPENJSON:

SELECT TOP 1
    jsonData.customer_id,
    jsonData.name,
    jsonData.email
FROM OPENROWSET(
    BULK 'https://<account>.dfs.core.windows.net/<container>/yourfile.json',
    FORMAT = 'CSV',
    FIELDQUOTE = '0x0b',
    FIELDTERMINATOR = '0x0b',
    ROWTERMINATOR = '0x0a'
) WITH (
    jsonContent VARCHAR(MAX)
) AS raw
CROSS APPLY OPENJSON(jsonContent)
WITH (
    customer_id INT,
    name VARCHAR(100),
    email VARCHAR(100)
) AS jsonData;

๐Ÿงช Option 2: Use sp_describe_first_result_set (Schema Only)

This method returns only metadata without scanning actual data — great for cost-free inspection.

✅ CSV File Schema

EXEC sp_describe_first_result_set N'
SELECT *
FROM OPENROWSET(
    BULK ''https://<account>.dfs.core.windows.net/<container>/yourfile.csv'',
    FORMAT = ''CSV'',
    HEADER_ROW = TRUE,
    PARSER_VERSION = ''2.0''
) AS rows;';

✅ Parquet File Schema

EXEC sp_describe_first_result_set N'
SELECT *
FROM OPENROWSET(
    BULK ''https://<account>.dfs.core.windows.net/<container>/yourfile.parquet'',
    FORMAT = ''PARQUET''
) AS rows;';

✅ JSON File Schema (NDJSON)

EXEC sp_describe_first_result_set N'
SELECT
    jsonData.customer_id,
    jsonData.name,
    jsonData.email
FROM OPENROWSET(
    BULK ''https://<account>.dfs.core.windows.net/<container>/yourfile.json'',
    FORMAT = ''CSV'',
    FIELDQUOTE = ''0x0b'',
    FIELDTERMINATOR = ''0x0b'',
    ROWTERMINATOR = ''0x0a''
) WITH (
    jsonContent VARCHAR(MAX)
) AS raw
CROSS APPLY OPENJSON(jsonContent)
WITH (
    customer_id INT,
    name VARCHAR(100),
    email VARCHAR(100)
) AS jsonData;';

๐Ÿ“บ Watch the Full Tutorial

Learn how to inspect file schemas visually in the video below:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure technical accuracy and clarity.

PySpark Tutorial : PySpark foreachPartition Explained Process DataFrame Partitions Efficiently with Examples

PySpark foreachPartition() Explained – Process DataFrame Partitions Efficiently

PySpark foreachPartition() Explained – Process DataFrame Partitions Efficiently

Introduction

The foreachPartition() function in PySpark allows you to apply a custom function to each partition of a DataFrame. It's typically used for efficient bulk writes (e.g., to databases or files) per partition.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ForeachPartitionWithDataFrame") \
    .getOrCreate()

2. Create Sample DataFrame

data = [
    ("Aamir Shahzad", "Pakistan", 30),
    ("Ali Raza", "USA", 28),
    ("Bob", "UK", 45),
    ("Lisa", "Canada", 33)
]

columns = ["Name", "Country", "Age"]
df = spark.createDataFrame(data, columns)
df.show()

Output:

+--------------+--------+---+
|          Name| Country|Age|
+--------------+--------+---+
|Aamir Shahzad |Pakistan| 30|
|     Ali Raza|     USA| 28|
|          Bob|      UK| 45|
|         Lisa|  Canada| 33|
+--------------+--------+---+

3. Repartition the DataFrame

print(df.rdd.getNumPartitions())  # Check current number of partitions

df = df.repartition(2)  # Repartition into 2
print(df.rdd.getNumPartitions())

Output:

1
2

4. Define the Custom Function

def process_partition(rows):
    print("Processing a new partition:")
    for row in rows:
        print(f"Name: {row['Name']}, Country: {row['Country']}, Age: {row['Age']}")

5. Apply foreachPartition()

This will run on executors, and output may appear in logs (not notebook cells).

df.foreachPartition(process_partition)

6. Simulate foreachPartition with collect() (for local testing)

for row in df.collect():
    process_partition([row])

Output (Simulated):

Processing a new partition:
Name: Aamir Shahzad, Country: Pakistan, Age: 30
Processing a new partition:
Name: Ali Raza, Country: USA, Age: 28
...

๐ŸŽฅ Watch the Full Video Tutorial

© 2025 Aamir Shahzad. All rights reserved.

Querying CSV, Parquet, and JSON Files in ADLS Gen2 with Serverless SQL | Azure Synapse Analytics Tutorial

Querying CSV, Parquet, and JSON Files in ADLS Gen2 with Serverless SQL

05: Querying CSV, Parquet, and JSON Files in ADLS Gen2 with Serverless SQL

In Azure Synapse Analytics, you can query raw data files directly from Azure Data Lake Storage Gen2 using Serverless SQL Pools. Below are practical examples to query CSV, Parquet, and JSON formats efficiently.

๐Ÿ“ Querying CSV Files

This example reads a CSV file with headers directly from ADLS Gen2:

SELECT *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/sample_users.csv',
    FORMAT = 'CSV',
    HEADER_ROW = TRUE,
    PARSER_VERSION = '2.0'
) AS rows;

๐Ÿ’ก Tip: Use SELECT with specific columns instead of * to minimize scanned data and cost.

๐Ÿ“ Querying Parquet Files

This query fetches data from a Parquet file — optimized for analytical workloads:

SELECT TOP 4 *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/titanic.parquet',
    FORMAT = 'PARQUET'
) AS rows;

Reference: Sample Parquet file from Tablab

๐Ÿ“ Querying JSON Files (NDJSON - newline-delimited JSON)

This example demonstrates querying raw JSON documents by tricking Synapse into reading the entire line as a single column:

SELECT TOP 100
    jsonContent,
    JSON_VALUE(jsonContent, '$.customer_id') AS customer_id,
    JSON_VALUE(jsonContent, '$.name') AS customer_name
FROM
    OPENROWSET(
        BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/sample_customers.json',
        FORMAT = 'CSV',
        FIELDQUOTE = '0x0b',
        FIELDTERMINATOR = '0x0b',
        ROWTERMINATOR = '0x0b'
    )
    WITH (
        jsonContent VARCHAR(MAX)
    ) AS [result];

๐Ÿ›  Explanation of Parameters:

Parameter Normal Meaning Usage Here
FIELDQUOTE Character used to wrap a field (e.g. ") Set to 0x0b so nothing is quoted
FIELDTERMINATOR Character between fields (e.g. ,) Set to 0x0b to avoid splitting
ROWTERMINATOR End of line character (e.g. \n) Set to 0x0b to treat each line as one row

๐Ÿ“บ Watch the Full Tutorial

Visual walkthrough available in the video below:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure technical accuracy and clarity.