How to Use PySpark hint() for Join Optimization – Broadcast, Shuffle, Merge | PySpark Tutorial

PySpark hint() Function Tutorial – Optimize Joins with Broadcast and Merge

PySpark hint() Function Tutorial – Optimize Joins with Broadcast and Merge

Introduction

The hint() function in PySpark allows developers to influence the query optimizer by suggesting specific join strategies. This tutorial demonstrates how to use hints like broadcast, merge, and shuffle to improve join performance.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("HintFunctionDemo").getOrCreate()

2. Sample DataFrames

Create two small DataFrames for employees and departments.

data_employees = [
    (1, "Aamir Shahzad", "Sales"),
    (2, "Ali Raza", "Marketing"),
    (3, "Bob", "Sales"),
    (4, "Lisa", "Engineering"),
    (5, "Charlie", "Marketing"),
    (6, "David", "Sales")
]

columns_employees = ["employee_id", "employee_name", "department"]

employees_df = spark.createDataFrame(data_employees, columns_employees)

data_departments = [
    ("Sales", "New York"),
    ("Marketing", "San Francisco"),
    ("Engineering", "Seattle"),
    ("Finance", "Chicago")
]

columns_departments = ["dept_name", "location"]

departments_df = spark.createDataFrame(data_departments, columns_departments)

employees_df.show()

Output:

+-----------+--------------+-----------+
|employee_id|employee_name |department |
+-----------+--------------+-----------+
|1          |Aamir Shahzad |Sales      |
|2          |Ali Raza      |Marketing  |
|3          |Bob           |Sales      |
|4          |Lisa          |Engineering|
|5          |Charlie       |Marketing  |
|6          |David         |Sales      |
+-----------+--------------+-----------+

3. Example 1: Using the broadcast hint

from pyspark.sql.functions import col

joined_df_broadcast = employees_df.join(
    departments_df.hint("broadcast"),
    employees_df["department"] == departments_df["dept_name"]
)

joined_df_broadcast.explain()
joined_df_broadcast.show()

4. Example 2: Using the merge hint (for sort-merge join)

joined_df_merge = employees_df.join(
    departments_df.hint("merge"),
    employees_df["department"] == departments_df["dept_name"]
)

joined_df_merge.explain()
joined_df_merge.show()

5. Example 3: Using the shuffle hint

joined_df_shuffle = employees_df.join(
    departments_df.hint("shuffle"),
    employees_df["department"] == departments_df["dept_name"]
)

joined_df_shuffle.explain()
joined_df_shuffle.show()

6. Conclusion

Use hint() in PySpark wisely to control join strategies and optimize query performance. Always validate your hint with explain() to ensure Spark chooses the desired execution plan. These hints are especially useful for large datasets where default join strategies may not be optimal.

๐ŸŽฅ Watch the Full Video Tutorial

© 2025 Aamir Shahzad. All rights reserved.

How to Check File Schema Using Serverless SQL Pool in Azure Synapse | Azure Synapse Analytics Tutorial

How to Check File Schema Using Serverless SQL Pool in Azure Synapse

๐Ÿ“˜ How to Check File Schema Using Serverless SQL Pool in Azure Synapse

๐ŸŽฏ Objective:

Learn how to inspect the schema (columns and data types) of CSV, Parquet, and JSON files stored in Azure Data Lake Gen2 using Serverless SQL Pool — without importing the data.

๐Ÿงช Option 1: Use TOP 1 for a Quick Preview

✅ CSV File

SELECT TOP 1 *
FROM OPENROWSET(
    BULK 'https://<account>.dfs.core.windows.net/<container>/yourfile.csv',
    FORMAT = 'CSV',
    HEADER_ROW = TRUE,
    PARSER_VERSION = '2.0'
) AS rows;

✅ Parquet File

SELECT TOP 1 *
FROM OPENROWSET(
    BULK 'https://<account>.dfs.core.windows.net/<container>/yourfile.parquet',
    FORMAT = 'PARQUET'
) AS rows;

✅ JSON File (NDJSON)

To preview the schema of newline-delimited JSON (NDJSON), use the WITH clause and parse with OPENJSON:

SELECT TOP 1
    jsonData.customer_id,
    jsonData.name,
    jsonData.email
FROM OPENROWSET(
    BULK 'https://<account>.dfs.core.windows.net/<container>/yourfile.json',
    FORMAT = 'CSV',
    FIELDQUOTE = '0x0b',
    FIELDTERMINATOR = '0x0b',
    ROWTERMINATOR = '0x0a'
) WITH (
    jsonContent VARCHAR(MAX)
) AS raw
CROSS APPLY OPENJSON(jsonContent)
WITH (
    customer_id INT,
    name VARCHAR(100),
    email VARCHAR(100)
) AS jsonData;

๐Ÿงช Option 2: Use sp_describe_first_result_set (Schema Only)

This method returns only metadata without scanning actual data — great for cost-free inspection.

✅ CSV File Schema

EXEC sp_describe_first_result_set N'
SELECT *
FROM OPENROWSET(
    BULK ''https://<account>.dfs.core.windows.net/<container>/yourfile.csv'',
    FORMAT = ''CSV'',
    HEADER_ROW = TRUE,
    PARSER_VERSION = ''2.0''
) AS rows;';

✅ Parquet File Schema

EXEC sp_describe_first_result_set N'
SELECT *
FROM OPENROWSET(
    BULK ''https://<account>.dfs.core.windows.net/<container>/yourfile.parquet'',
    FORMAT = ''PARQUET''
) AS rows;';

✅ JSON File Schema (NDJSON)

EXEC sp_describe_first_result_set N'
SELECT
    jsonData.customer_id,
    jsonData.name,
    jsonData.email
FROM OPENROWSET(
    BULK ''https://<account>.dfs.core.windows.net/<container>/yourfile.json'',
    FORMAT = ''CSV'',
    FIELDQUOTE = ''0x0b'',
    FIELDTERMINATOR = ''0x0b'',
    ROWTERMINATOR = ''0x0a''
) WITH (
    jsonContent VARCHAR(MAX)
) AS raw
CROSS APPLY OPENJSON(jsonContent)
WITH (
    customer_id INT,
    name VARCHAR(100),
    email VARCHAR(100)
) AS jsonData;';

๐Ÿ“บ Watch the Full Tutorial

Learn how to inspect file schemas visually in the video below:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure technical accuracy and clarity.

PySpark Tutorial : PySpark foreachPartition Explained Process DataFrame Partitions Efficiently with Examples

PySpark foreachPartition() Explained – Process DataFrame Partitions Efficiently

PySpark foreachPartition() Explained – Process DataFrame Partitions Efficiently

Introduction

The foreachPartition() function in PySpark allows you to apply a custom function to each partition of a DataFrame. It's typically used for efficient bulk writes (e.g., to databases or files) per partition.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ForeachPartitionWithDataFrame") \
    .getOrCreate()

2. Create Sample DataFrame

data = [
    ("Aamir Shahzad", "Pakistan", 30),
    ("Ali Raza", "USA", 28),
    ("Bob", "UK", 45),
    ("Lisa", "Canada", 33)
]

columns = ["Name", "Country", "Age"]
df = spark.createDataFrame(data, columns)
df.show()

Output:

+--------------+--------+---+
|          Name| Country|Age|
+--------------+--------+---+
|Aamir Shahzad |Pakistan| 30|
|     Ali Raza|     USA| 28|
|          Bob|      UK| 45|
|         Lisa|  Canada| 33|
+--------------+--------+---+

3. Repartition the DataFrame

print(df.rdd.getNumPartitions())  # Check current number of partitions

df = df.repartition(2)  # Repartition into 2
print(df.rdd.getNumPartitions())

Output:

1
2

4. Define the Custom Function

def process_partition(rows):
    print("Processing a new partition:")
    for row in rows:
        print(f"Name: {row['Name']}, Country: {row['Country']}, Age: {row['Age']}")

5. Apply foreachPartition()

This will run on executors, and output may appear in logs (not notebook cells).

df.foreachPartition(process_partition)

6. Simulate foreachPartition with collect() (for local testing)

for row in df.collect():
    process_partition([row])

Output (Simulated):

Processing a new partition:
Name: Aamir Shahzad, Country: Pakistan, Age: 30
Processing a new partition:
Name: Ali Raza, Country: USA, Age: 28
...

๐ŸŽฅ Watch the Full Video Tutorial

© 2025 Aamir Shahzad. All rights reserved.

Querying CSV, Parquet, and JSON Files in ADLS Gen2 with Serverless SQL | Azure Synapse Analytics Tutorial

Querying CSV, Parquet, and JSON Files in ADLS Gen2 with Serverless SQL

05: Querying CSV, Parquet, and JSON Files in ADLS Gen2 with Serverless SQL

In Azure Synapse Analytics, you can query raw data files directly from Azure Data Lake Storage Gen2 using Serverless SQL Pools. Below are practical examples to query CSV, Parquet, and JSON formats efficiently.

๐Ÿ“ Querying CSV Files

This example reads a CSV file with headers directly from ADLS Gen2:

SELECT *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/sample_users.csv',
    FORMAT = 'CSV',
    HEADER_ROW = TRUE,
    PARSER_VERSION = '2.0'
) AS rows;

๐Ÿ’ก Tip: Use SELECT with specific columns instead of * to minimize scanned data and cost.

๐Ÿ“ Querying Parquet Files

This query fetches data from a Parquet file — optimized for analytical workloads:

SELECT TOP 4 *
FROM OPENROWSET(
    BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/titanic.parquet',
    FORMAT = 'PARQUET'
) AS rows;

Reference: Sample Parquet file from Tablab

๐Ÿ“ Querying JSON Files (NDJSON - newline-delimited JSON)

This example demonstrates querying raw JSON documents by tricking Synapse into reading the entire line as a single column:

SELECT TOP 100
    jsonContent,
    JSON_VALUE(jsonContent, '$.customer_id') AS customer_id,
    JSON_VALUE(jsonContent, '$.name') AS customer_name
FROM
    OPENROWSET(
        BULK 'https://techbrotherssynapsestg.dfs.core.windows.net/synpasecontainer/input/sample_customers.json',
        FORMAT = 'CSV',
        FIELDQUOTE = '0x0b',
        FIELDTERMINATOR = '0x0b',
        ROWTERMINATOR = '0x0b'
    )
    WITH (
        jsonContent VARCHAR(MAX)
    ) AS [result];

๐Ÿ›  Explanation of Parameters:

Parameter Normal Meaning Usage Here
FIELDQUOTE Character used to wrap a field (e.g. ") Set to 0x0b so nothing is quoted
FIELDTERMINATOR Character between fields (e.g. ,) Set to 0x0b to avoid splitting
ROWTERMINATOR End of line character (e.g. \n) Set to 0x0b to treat each line as one row

๐Ÿ“บ Watch the Full Tutorial

Visual walkthrough available in the video below:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure technical accuracy and clarity.

PySpark Tutorial : PySpark foreach Function Tutorial Apply Custom Logic to Each Row in a DataFrame

PySpark foreach() Function Tutorial – Apply Custom Logic to Each Row

PySpark foreach() Function Tutorial – Apply Custom Logic to Each Row

Introduction

The foreach() function in PySpark allows you to apply a custom Python function to each row of a DataFrame. This is commonly used when you want to perform actions like sending records to external databases or writing logs—operations that have side effects.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ForeachExample").getOrCreate()

2. Create Sample DataFrame

data = [
    ("Aamir Shahzad", "Pakistan", 30),
    ("Ali Raza", "USA", 28),
    ("Bob", "UK", 45),
    ("Lisa", "Canada", 33)
]

columns = ["Name", "Country", "Age"]

df = spark.createDataFrame(data, columns)
df.show()

Output:

+--------------+--------+---+
|          Name| Country|Age|
+--------------+--------+---+
|Aamir Shahzad |Pakistan| 30|
|     Ali Raza|     USA| 28|
|          Bob|      UK| 45|
|         Lisa|  Canada| 33|
+--------------+--------+---+

3. Define a foreach Function

def process_row(row):
    print(f"Processing: {row.Name} from {row.Country}, Age: {row.Age}")

4. Apply foreach() on the DataFrame

df.foreach(process_row)

Note: Output won’t appear in the notebook UI—it’s executed on worker nodes.

5. Optional: Simulate foreach using collect()

def print_row(row):
    print(f"Simulated: {row.Name} is {row.Age} years old from {row.Country}")

for row in df.collect():
    print_row(row)

Output:

Simulated: Aamir Shahzad is 30 years old from Pakistan
Simulated: Ali Raza is 28 years old from USA
Simulated: Bob is 45 years old from UK
Simulated: Lisa is 33 years old from Canada

๐ŸŽฅ Watch Full Video Tutorial

© 2025 Aamir Shahzad

PySpark Tutorial : PySpark stat Function Tutorial Perform Statistical Analysis on DataFrames Easily #pyspark

PySpark stat() Function Tutorial – Perform Statistical Analysis

PySpark stat() Function Tutorial – Perform Statistical Analysis

Introduction

In PySpark, the stat function provides access to a range of statistical methods including crosstab, freqItems, cov, and corr. This tutorial shows how to use these tools to perform basic statistical analysis directly on Spark DataFrames.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("StatFunctionDemo") \
    .getOrCreate()

2. Sample DataFrame

data = [
    ("Aamir", "Pakistan", 85),
    ("Ali", "USA", 78),
    ("Bob", "UK", 85),
    ("Lisa", "Canada", 92),
    ("Ali", "USA", 65)
]

columns = ["Name", "Country", "Score"]
df = spark.createDataFrame(data, columns)
df.show()

Output:

+------+---------+-----+
|  Name|  Country|Score|
+------+---------+-----+
|Aamir | Pakistan|   85|
| Ali  |     USA |   78|
| Bob  |     UK  |   85|
| Lisa |  Canada |   92|
| Ali  |     USA |   65|
+------+---------+-----+

3. Crosstab – Name vs Country

df.stat.crosstab("Name", "Country").show()

Output:

+-------------+-------+-----+-----+----+
|Name_Country |Canada |Pakistan|UK |USA|
+-------------+-------+--------+---+---+
|    Lisa     |   1   |   0    | 0 | 0 |
|    Ali      |   0   |   0    | 0 | 2 |
|   Aamir     |   0   |   1    | 0 | 0 |
|    Bob      |   0   |   0    | 1 | 0 |
+-------------+-------+--------+---+---+

4. Frequent Items in Name and Country

df.stat.freqItems(["Name", "Country"], support=0.3).show(truncate=False)

Output:

+------------------+------------------+
|Name_freqItems    |Country_freqItems |
+------------------+------------------+
|[Ali]             |[USA]             |
+------------------+------------------+

5. Covariance – Score & Bonus

# Add bonus column for demonstration
df2 = df.withColumn("Bonus", (df["Score"] * 0.1))

# Covariance between Score and Bonus
cov_val = df2.stat.cov("Score", "Bonus")
print(f"Covariance between Score and Bonus: {cov_val}")

Output:

Covariance between Score and Bonus: 9.628571428571425

6. Correlation – Score & Bonus

corr_val = df2.stat.corr("Score", "Bonus")
print(f"Correlation between Score and Bonus: {corr_val}")

Output:

Correlation between Score and Bonus: 0.9999999999999998

๐ŸŽฅ Watch Full Video Tutorial

© 2025 Aamir Shahzad. All rights reserved.

Azure Synapse Analytics Billing Explained | Control Azure Synapse Analytics Cost | Azure Synapse Free Training

Azure Synapse Analytics Billing Explained | Control Azure Synapse Analytics Cost

Azure Synapse Analytics Billing Explained | Control Azure Synapse Analytics Cost

Azure Synapse Analytics is a powerful analytics service that combines big data and data warehousing capabilities. However, if not managed properly, Synapse costs can quickly spiral out of control. In this post, we’ll break down how Azure Synapse billing works and how you can control your usage to save money effectively.

๐Ÿ” Key Billing Components in Azure Synapse

  • 1. Dedicated SQL Pools: Charged per DWU (Data Warehouse Unit) per hour, whether the pool is running or paused.
  • 2. Serverless SQL Pools: Charged per TB of data processed. Ideal for ad-hoc queries on data in your data lake.
  • 3. Apache Spark Pools: Charged based on vCore hours used by the Spark nodes.
  • 4. Data Movement & Storage: Additional costs apply for data storage, external tables, and staging operations.

๐Ÿ’ก Tips to Control Costs in Synapse

  • Pause Dedicated SQL Pools when not in use to stop billing for compute.
  • Use Serverless Pools for exploratory analysis to avoid unnecessary provisioning costs.
  • Monitor queries to avoid scanning large datasets—filter data early using WHERE clauses.
  • Leverage Cost Management Tools in Azure Portal to set budgets and alerts.
  • Enable Auto-Pause and Auto-Resume settings on Dedicated SQL Pools.
  • Review Workload Usage via built-in Synapse monitoring and diagnostic logs.

๐Ÿ“‰ Real World Cost Saving Strategy

One common strategy is to:

  1. Run heavy workloads using Dedicated SQL Pools during off-peak hours.
  2. Use Serverless SQL Pools for reporting dashboards and minimal data exploration.
  3. Consolidate storage using ADLS Gen2 and query data using OPENROWSET or external tables.
  4. Automate pause/resume with Logic Apps or PowerShell scripts.

๐Ÿ“บ Watch the Full Tutorial

For a detailed walkthrough and demo, check out the video below:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure technical accuracy and clarity.

How to use withColumnsRenamed function in PySpark to Rename Multiple Columns in DataFrame

How to use withColumnsRenamed function in PySpark to Rename Multiple Columns in DataFrame

PySpark Tutorial: How to use withColumnsRenamed function in PySpark

This tutorial covers how to rename multiple columns in a PySpark DataFrame using the withColumnsRenamed() function. It’s a cleaner, reusable alternative to chaining multiple withColumnRenamed() calls.

What is withColumnsRenamed() in PySpark?

PySpark introduced the withColumnsRenamed() function in version 3.4.0 to allow renaming multiple columns at once. You provide a dictionary mapping old column names to new ones.

Step 1: Create Sample Data

data = [
    ("Aamir Shahzad", "Pakistan", 25),
    ("Ali Raza", "USA", 30),
    ("Bob", "UK", 45),
    ("Lisa", "Canada", 35)
]

df = spark.createDataFrame(data, ["FullName", "Country", "AgeYears"])

print("Original DataFrame:")
df.show()
Original DataFrame:
+--------------+--------+--------+
| FullName | Country|AgeYears|
+--------------+--------+--------+
|Aamir Shahzad |Pakistan| 25|
| Ali Raza | USA | 30|
| Bob | UK | 45|
| Lisa | Canada | 35|
+--------------+--------+--------+

Step 2: Rename Multiple Columns using withColumnsRenamed()

renamed_df = df.withColumnsRenamed({
    "FullName": "Name",
    "Country": "Nationality",
    "AgeYears": "Age"
})

print("DataFrame with Renamed Columns:")
renamed_df.show()
DataFrame with Renamed Columns:
+--------------+------------+---+
| Name | Nationality|Age|
+--------------+------------+---+
|Aamir Shahzad | Pakistan | 25|
| Ali Raza | USA | 30|
| Bob | UK | 45|
| Lisa | Canada | 35|
+--------------+------------+---+

Why use withColumnsRenamed()?

  • Cleaner syntax for renaming multiple columns.
  • More readable and concise than chaining multiple withColumnRenamed() calls.
  • Reduces the chance of human error when renaming several columns.

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

Intro to Serverless SQL Pool in Azure Synapse | Azure Synapse Analytics Tutorial

Intro to Serverless SQL Pool in Azure Synapse

Intro to Serverless SQL Pool in Azure Synapse

๐Ÿง  What is Serverless SQL Pool?

Serverless SQL Pool is a pay-per-query, on-demand query service in Azure Synapse Analytics. It lets you query data stored in Azure Data Lake (CSV, Parquet, JSON, etc.) without moving or loading it into a traditional database.

Think of it as running SQL queries directly on files — no need to pre-load data.

๐Ÿงญ Serverless SQL Pool Architecture (Detailed Overview)

  • Query is submitted via Synapse Studio or external tools (Power BI, Azure Data Studio)
  • Control Node receives the query, compiles and optimizes it
  • Polaris Engine breaks the query into parts
  • Compute Nodes are dynamically assigned to execute these parts in parallel
  • Query scans data directly from ADLS Gen2, Blob Storage, Cosmos DB, etc.
  • Results are assembled and returned to the user

✅ No cluster setup required — compute is allocated temporarily and only when needed

⚙️ How It Works:

  • No need to provision or manage compute resources
  • When you run a query, Synapse dynamically assigns compute power
  • You're billed only for the data scanned — per TB

Supported File Formats:

  • CSV
  • Parquet
  • JSON
  • Delta Lake (preview support)

๐Ÿ’ต Pay-per-Query Model

Metric Description
Billing unit Per terabyte (TB) scanned
Minimum billed 10 MB per query
Optimization tip Use SELECT only needed columns & WHERE filters to save

๐Ÿ“ฆ Key Benefits:

  • ✅ No infrastructure to manage
  • ✅ Ideal for ad-hoc or exploratory analysis
  • ✅ Great for querying large files in data lakes
  • ✅ Works well with Synapse Studio and Power BI

๐Ÿ” When to Use:

  • You want to explore raw files in your data lake
  • You don't need preloaded data or full warehousing
  • You want cost-effective, on-demand querying

๐ŸŽ‰ Summary:

Serverless SQL Pool in Synapse makes it easy to query large datasets stored in Azure without provisioning infrastructure. It’s fast, flexible, and billed only when you use it — perfect for quick exploration and modern data lakehouse scenarios.

๐Ÿ“บ Watch the Full Tutorial

Learn visually with our full video walkthrough below:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure accuracy and clarity.

How to Create an Azure Synapse Analytics Workspace | Azure Synapse Analytics Tutorial

How to Create an Azure Synapse Analytics Workspace

๐Ÿš€ How to Create an Azure Synapse Analytics Workspace

A Synapse Workspace is your main control center — the place where you can:

  • ๐Ÿ”ง Build, manage, and monitor all your analytics tasks (SQL, Spark, pipelines, reports) in one unified experience.

Creating an Azure Synapse workspace is the first step toward building a modern analytics platform. Here's a simple step-by-step process anyone can follow:

✅ Step 1: Go to Azure Portal

  • Visit https://portal.azure.com
  • In the top search bar, type “Synapse Analytics”
  • Click “Azure Synapse Analytics” under Services

✅ Step 2: Click "+ Create"

You’ll now fill in some basic workspace details:

  • Subscription: Choose your Azure subscription
  • Resource group: Create a new one or use an existing one
  • Workspace name: e.g., my-synapse-demo
  • Region: Choose the closest Azure region for best performance

✅ Step 3: Configure Storage

  • Choose a Data Lake Storage Gen2 account (or create a new one)
  • Select or create a file system (container), e.g., synapsestorage
  • This is where your data, notebooks, logs, etc., will be stored

✅ Step 4: Security Settings (Optional for Now)

  • Leave default for Managed Identity and Network access
  • You can come back later to configure private endpoints, firewall, etc.

✅ Step 5: Review + Create

  • Click “Review + Create”
  • Wait for validation → then hit “Create”
  • Azure will deploy the Synapse workspace in a few minutes

✅ Step 6: Launch Synapse Studio

  • After deployment, click “Go to Resource”
  • Then click “Open Synapse Studio”
  • You’ll land in the Synapse development environment to:
    • Ingest data
    • Run SQL or Spark queries
    • Create pipelines
    • Build reports

▶️ Watch Tutorial Video

๐Ÿ“บ Watch on YouTube

Written by: Aamir Shahzad

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

How to Use withColumnRenamed() to Rename DataFrame Columns | PySpark Tutorial #pysparktutorial

How to Use withColumnRenamed() in PySpark | Rename DataFrame Columns

How to Use withColumnRenamed() in PySpark

In this PySpark tutorial, you'll learn how to use the withColumnRenamed() function to rename columns in a DataFrame. It’s useful when you want to clean or standardize column names.

๐Ÿ“Œ Why Use withColumnRenamed()?

  • Rename existing column to a new column name
  • Helps make DataFrames cleaner and easier to understand
  • Useful for renaming multiple columns using chaining

๐Ÿ”ง Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark Rename Column Example") \
    .getOrCreate()

๐Ÿ”ง Step 2: Create a Sample DataFrame

data = [
    ("Aamir Shahzad", "Pakistan", 25),
    ("Ali Raza", "USA", 30),
    ("Bob", "UK", 45),
    ("Lisa", "Canada", 35)
]

df = spark.createDataFrame(data, ["FullName", "Country", "Age"])
print("Original DataFrame:")
df.show()
+--------------+--------+---+
| FullName | Country|Age|
+--------------+--------+---+
| Aamir Shahzad|Pakistan| 25|
| Ali Raza | USA | 30|
| Bob | UK | 45|
| Lisa | Canada | 35|
+--------------+--------+---+

๐Ÿ”ง Step 3: Rename Columns Using withColumnRenamed()

# Rename 'FullName' to 'Name' and 'Country' to 'Nationality'
renamed_df = df.withColumnRenamed("FullName", "Name") \
               .withColumnRenamed("Country", "Nationality")

print("DataFrame with Renamed Columns:")
renamed_df.show()
+--------------+------------+---+
| Name | Nationality|Age|
+--------------+------------+---+
| Aamir Shahzad| Pakistan | 25|
| Ali Raza | USA | 30|
| Bob | UK | 45|
| Lisa | Canada | 35|
+--------------+------------+---+

๐ŸŽฅ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

Drop External Table, Data Source, File Format and List Metadata for Auditing | Azure Synapse Analytics Tutorial

Azure Synapse - Drop & Inspect External Objects Script

๐Ÿ“˜ Azure Synapse - Drop & Inspect External Objects Script

Author: Aamir Shahzad

Purpose: Drop external table, data source, file format and list metadata for auditing

Usage: Use in Synapse Serverless or Dedicated SQL pools for external object maintenance


๐Ÿ”ป Drop External Objects

Drop External Table (if exists)

IF OBJECT_ID('dbo.euuser', 'U') IS NOT NULL
    DROP EXTERNAL TABLE dbo.users;

Drop External File Format (if exists)

IF OBJECT_ID('SynapseDelimitedTextFormat', 'U') IS NOT NULL
    DROP EXTERNAL FILE FORMAT SynapseDelimitedTextFormat;

Drop External Data Source (if exists)

IF OBJECT_ID('synpasecontainer_techbrotherssynapsestg_dfs_core_windows_net', 'U') IS NOT NULL
    DROP EXTERNAL DATA SOURCE synpasecontainer_techbrotherssynapsestg_dfs_core_windows_net;

๐Ÿ“‹ Inspect External Object Metadata

๐Ÿ” List All External Tables

SELECT * 
FROM sys.external_tables;

๐Ÿ” List All External Data Sources

SELECT * 
FROM sys.external_data_sources;

๐Ÿ” List All External File Formats

SELECT * 
FROM sys.external_file_formats;

๐Ÿ” List Columns of a Specific External Table

SELECT 
    TABLE_NAME     AS TableName,
    COLUMN_NAME    AS ColumnName,
    ORDINAL_POSITION,
    DATA_TYPE,
    CHARACTER_MAXIMUM_LENGTH
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'users';

๐Ÿ“บ Watch the Full Tutorial

Explore the full walkthrough in this YouTube video:



This blog post was created with assistance from ChatGPT and Gemini AI to ensure accuracy and clarity.

What is Azure Synapse Analytics ? Azure Synapse Analytics Tutorial

What is Azure Synapse Analytics? | Blog + Video

๐ŸŒ What is Azure Synapse Analytics?

One Platform. Multiple Engines. Unified Experience.

Azure Synapse Analytics is an enterprise-grade analytics platform that combines the best of Microsoft's big data and data warehouse technologies into a single unified experience. It empowers data professionals to ingest, explore, prepare, manage, and visualize data at any scale—all in one place.

⚙️ Core Compute Engines Inside Synapse

1. Industry-Leading SQL

  • Dedicated SQL Pools for high-performance, predictable workloads
  • Serverless SQL Pools for on-demand querying of files in the Data Lake
  • Supports T-SQL PREDICT for inline ML model scoring
  • Query structured, stream, and external data with ease

2. Apache Spark for Big Data & ML

  • Built-in Apache Spark 3.1 engine
  • Supports Python, Scala, SparkSQL, and .NET for Spark
  • Auto-scalable Spark pools with fast startup times
  • Train models using SparkML and integrate with Azure ML

3. Data Explorer Runtime (Preview)

  • Optimized for log and telemetry analytics
  • Full-text search and semi-structured data indexing
  • Great for IoT, logs, anomaly detection, and real-time insights

๐Ÿ’พ Deep Integration with Data Lake

  • Stores structured, semi-structured, and unstructured data at scale
  • Supports formats like Parquet, CSV, JSON, and TSV in ADLS Gen2
  • Create tables on top of raw files, accessible by Spark and SQL
  • Seamless data exchange between Spark and SQL engines

๐Ÿ”— Built-In Data Integration (ETL/ELT)

  • Powered by the same engine as Azure Data Factory
  • Code-free ETL with Mapping Data Flows
  • Ingest from 90+ sources (SQL, REST APIs, SaaS, on-prem/cloud)
  • Orchestrate Spark jobs, notebooks, pipelines, stored procedures

๐Ÿง  Unified Analytics Experience

  • Synapse Studio: a single interface for ingestion, prep, orchestration, and visualization
  • Native integration with:
    • Power BI for dashboards
    • Azure ML for predictive models
    • Azure Cosmos DB, Azure Purview, and more

✅ Final Thoughts

Azure Synapse Analytics is not just a data warehouse—it's a comprehensive, flexible analytics platform that allows you to:

  • Query files and databases using SQL, Spark, and KQL
  • Build a true lakehouse architecture
  • Unify structured and unstructured data processing
  • Create ML-powered dashboards and insights—all in one place

▶️ Watch the Tutorial Video

๐Ÿ“บ Watch on YouTube

Published by: Aamir Shahzad | Azure Synapse Analytics Tutorial Series

Some of the contents in this website were created with assistance from ChatGPT and Gemini.

How to Use sample() to Randomly Select Data from DataFrames | PySpark Tutorial #pyspark

How to Use sample() in PySpark | Randomly Select Data from DataFrames

How to Use sample() in PySpark | Randomly Select Data from DataFrames

In this tutorial, you will learn how to use the sample() function in PySpark to retrieve a random subset of rows from a DataFrame. This is useful for testing, performance tuning, or dealing with large datasets.

Step 1: Create Sample Data

data = [
    ("Aamir Shahzad", "Pakistan", 25),
    ("Ali Raza", "USA", 30),
    ("Bob", "UK", 45),
    ("Lisa", "Canada", 35),
    ("Aamir Shahzad", "Pakistan", 50),
    ("Aamir Shahzad", "Pakistan", 50)
]

Step 2: Create a DataFrame

df = spark.createDataFrame(data, ["Name", "Country", "Age"])

print("Original DataFrame:")
df.show()
Original DataFrame:
+--------------+---------+---+
| Name| Country|Age|
+--------------+---------+---+
|Aamir Shahzad | Pakistan| 25|
| Ali Raza| USA | 30|
| Bob| UK | 45|
| Lisa| Canada | 35|
|Aamir Shahzad | Pakistan| 50|
|Aamir Shahzad | Pakistan| 50|
+--------------+---------+---+

Step 3: Use sample() to Randomly Select Rows

# Sample 50% of the data, without replacement and fixed seed
sampled_df = df.sample(withReplacement=False, fraction=0.5, seed=70)

print("Sampled DataFrame (50% of data):")
sampled_df.show()
Sampled DataFrame (output will vary):
+--------------+---------+---+
| Name| Country|Age|
+--------------+---------+---+
| Bob| UK | 45|
| Lisa| Canada | 35|
+--------------+---------+---+

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

How to Use unionByName() to Join DataFrames by Column Names | PySpark Tutorial Learn by Doing it

PySpark Tutorial: unionByName() Function for Joining DataFrames

PySpark Tutorial: unionByName() Function for Joining DataFrames

This tutorial demonstrates how to use the unionByName() function in PySpark to combine two DataFrames by matching column names.

What is unionByName() in PySpark?

The unionByName() function combines two DataFrames by aligning columns with the same name, regardless of their order.

  • Column names must match
  • Useful when schemas are the same but column orders differ

Step 1: Create a Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark unionByName Example").getOrCreate()

Step 2: Create Sample DataFrames

# DataFrame 1
data1 = [("Aamir Shahzad", "Pakistan", 25),
         ("Ali Raza", "USA", 30)]
df1 = spark.createDataFrame(data1, ["Name", "Country", "Age"])

# DataFrame 2 (Different column order)
data2 = [("Bob", 45, "UK"),
         ("Lisa", 35, "Canada")]
df2 = spark.createDataFrame(data2, ["Name", "Age", "Country"])
DataFrame 1:
+--------------+---------+---+
| Name | Country | Age |
+--------------+---------+---+
| Aamir Shahzad| Pakistan| 25 |
| Ali Raza | USA | 30 |
+--------------+---------+---+

DataFrame 2:
+-----+---+--------+
|Name |Age|Country |
+-----+---+--------+
|Bob | 45|UK |
|Lisa | 35|Canada |
+-----+---+--------+

Step 3: Use unionByName() to Combine DataFrames

union_df = df1.unionByName(df2)

Step 4: Show Result

print("Union Result:")
union_df.show()
Union Result:
+--------------+---------+---+
| Name | Country | Age |
+--------------+---------+---+
| Aamir Shahzad| Pakistan| 25 |
| Ali Raza | USA | 30 |
| Bob | UK | 45 |
| Lisa | Canada | 35 |
+--------------+---------+---+

Why Use unionByName()?

  • Safer alternative to union() when column orders might differ
  • Prevents data from being mismatched due to incorrect column alignment
  • Great for combining data from different sources with consistent column names

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

How to Use crossJoin() Function for Cartesian Product | PySpark Tutorial #pysparktutorial

PySpark crossJoin() Function | Cartesian Product of DataFrames

PySpark crossJoin() Function Tutorial

Cartesian Product of DataFrames in PySpark

In this tutorial, you will learn how to use the crossJoin() function in PySpark to generate a Cartesian product between two DataFrames. This operation combines every row of the first DataFrame with every row of the second one.

Step 1: Import and Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark crossJoin Example") \
    .getOrCreate()

Step 2: Create Sample Data

# DataFrame 1: People
data_people = [
    ("Aamir Shahzad", "Pakistan"),
    ("Ali Raza", "USA"),
    ("Bob", "UK"),
    ("Lisa", "Canada")
]
df_people = spark.createDataFrame(data_people, ["Name", "Country"])

# DataFrame 2: Hobbies
data_hobbies = [
    ("Reading",),
    ("Traveling",),
    ("Cricket",)
]
df_hobbies = spark.createDataFrame(data_hobbies, ["Hobby"])

People DataFrame Output:

Name | Country
------------------------
Aamir Shahzad | Pakistan
Ali Raza | USA
Bob | UK
Lisa | Canada

Hobbies DataFrame Output:

Hobby
------
Reading
Traveling
Cricket

Step 3: Perform crossJoin()

# Perform Cartesian join
cross_join_result = df_people.crossJoin(df_hobbies)

# Show result
cross_join_result.show(truncate=False)

Output:

Name | Country | Hobby
------------------------------------
Aamir Shahzad | Pakistan | Reading
Aamir Shahzad | Pakistan | Traveling
Aamir Shahzad | Pakistan | Cricket
Ali Raza | USA | Reading
Ali Raza | USA | Traveling
Ali Raza | USA | Cricket
Bob | UK | Reading
Bob | UK | Traveling
Bob | UK | Cricket
Lisa | Canada | Reading
Lisa | Canada | Traveling
Lisa | Canada | Cricket

๐Ÿ“บ Watch the Full Tutorial Video

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark cov() Tutorial | Covariance Analysis for Numerical Columns | Pyspark Tutorial

PySpark Tutorial: How to Use cov() Function | Covariance Analysis

PySpark Tutorial: How to Use cov() Function

This tutorial explains how to use the cov() function in PySpark to calculate the covariance between two numeric columns of a DataFrame.

๐Ÿ“Œ What is cov() in PySpark?

The cov() function is used to compute the sample covariance between two numeric columns in a PySpark DataFrame.

  • Positive covariance: both variables increase together.
  • Negative covariance: one increases while the other decreases.

Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark cov() Example") \
    .getOrCreate()

Step 2: Create Sample Data

data = [
    ("Aamir Shahzad", 25, 150000.0),
    ("Ali Raza", 30, 160000.0),
    ("Bob", 45, 120000.0),
    ("Lisa", 35, 75000.0),
    ("Aamir Shahzad", 50, 110000.0)
]

df = spark.createDataFrame(data, ["Name", "Age", "Salary"])

df.show()
+--------------+---+--------+
| Name|Age| Salary|
+--------------+---+--------+
| Aamir Shahzad| 25|150000.0|
| Ali Raza| 30|160000.0|
| Bob| 45|120000.0|
| Lisa| 35| 75000.0|
| Aamir Shahzad| 50|110000.0|
+--------------+---+--------+

Step 3: Calculate Covariance

covariance_result = df.cov("Age", "Salary")
print("Covariance between Age and Salary:", covariance_result)
Covariance between Age and Salary: -170000.0

✅ Why Use cov()?

  • Quickly identify the direction of the relationship between two variables.
  • Useful for exploratory data analysis and feature selection.

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

How to Use corr() Function in PySpark : Finding Correlation Between Columns with corr() #pyspark | PySpark Tutorial

PySpark corr() Function Tutorial | Find Correlation Between Columns

PySpark corr() Function Tutorial

Finding Correlation Between Columns

In this tutorial, you'll learn how to use PySpark’s corr() function to find the Pearson correlation coefficient between two columns of a DataFrame.

๐Ÿ“Œ What is corr() in PySpark?

The corr() function computes the Pearson Correlation Coefficient between two numeric columns of a DataFrame. Currently, only the Pearson method is supported.

๐Ÿงช Step-by-Step Example

Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark corr() Example") \
    .getOrCreate()

Step 2: Sample Data

data = [
    ("Aamir Shahzad", 25, 150000.0),
    ("Ali Raza", 30, 160000.0),
    ("Bob", 45, 120000.0),
    ("Lisa", 35, 75000.0),
    ("Aamir Shahzad", 50, 110000.0)
]

df = spark.createDataFrame(data, ["Name", "Age", "Salary"])
df.show()
+--------------+---+--------+
| Name |Age| Salary |
+--------------+---+--------+
|Aamir Shahzad | 25|150000.0|
|Ali Raza | 30|160000.0|
|Bob | 45|120000.0|
|Lisa | 35| 75000.0|
|Aamir Shahzad | 50|110000.0|
+--------------+---+--------+

Step 3: Use corr() Function

result = df.corr("Age", "Salary")

Step 4: Print the Result

print("Correlation between Age and Salary:", result)
Correlation between Age and Salary: -0.4845537354461136

✅ Why Use corr()?

  • Quickly identify relationships between two numeric variables.
  • Helps in feature selection and exploratory data analysis (EDA).
  • Returns a single float value: the Pearson correlation coefficient.

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark collect() Function Tutorial : Retrieve Entire DataFrame to Driver with Examples #pyspark

PySpark collect() Function Tutorial | Retrieve Entire DataFrame to Driver

PySpark collect() Function Tutorial

Retrieve Entire DataFrame to Driver with Examples

Learn how to use the collect() function in PySpark to retrieve all rows from a DataFrame into the driver node for local processing or debugging.

1. What is collect() in PySpark?

The collect() function gathers all rows from a PySpark DataFrame or RDD and returns them to the driver as a Python list.

  • ✅ Useful for small datasets or debugging
  • ⚠️ Avoid on large datasets to prevent memory overload

2. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark collect() Example") \\
    .getOrCreate()

3. Create a Sample DataFrame

data = [
    (1, "Aamir Shahzad", 5000),
    (2, "Ali Raza", 6000),
    (3, "Bob", 5500),
    (4, "Lisa", 7000)
]

columns = ["ID", "Name", "Salary"]

df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+------+
| ID| Name|Salary|
+---+--------------+------+
| 1| Aamir Shahzad| 5000|
| 2| Ali Raza| 6000|
| 3| Bob| 5500|
| 4| Lisa| 7000|
+---+--------------+------+

4. Using collect() to Retrieve Data

collected_data = df.collect()

for row in collected_data:
    print(row)
Row(ID=1, Name='Aamir Shahzad', Salary=5000)
Row(ID=2, Name='Ali Raza', Salary=6000)
Row(ID=3, Name='Bob', Salary=5500)
Row(ID=4, Name='Lisa', Salary=7000)

5. Accessing Individual Column Values

for row in collected_data:
    print(f"ID: {row['ID']}, Name: {row['Name']}, Salary: {row['Salary']}")

6. When to Use collect()

  • To inspect data locally during development
  • For exporting or logging small result sets
  • Avoid in production for large datasets

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark repartition() Function Tutorial: Optimize Data Partitioning for Better Performance

PySpark repartition() Function Tutorial: Optimize Data Partitioning for Better Performance

PySpark repartition() Function Tutorial: Optimize Data Partitioning for Better Performance

The repartition() function in PySpark allows you to increase or decrease the number of partitions of a DataFrame. This helps in improving parallelism, managing data skew, and optimizing performance in distributed Spark jobs.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark repartition() Example") \
    .getOrCreate()

2. Create Sample DataFrame

data = [
    (1, "Aamir Shahzad", 35),
    (2, "Ali Raza", 30),
    (3, "Bob", 25),
    (4, "Lisa", 28)
]

columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)

print("Original DataFrame:")
df.show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+

3. Check Number of Partitions Before repartition()

print("Partitions before repartition():", df.rdd.getNumPartitions())
Partitions before repartition(): 1

4. Apply repartition()

# Example 1: Increase partitions to 4
df_repartitioned_4 = df.repartition(4)
print("Partitions after repartition(4):", df_repartitioned_4.rdd.getNumPartitions())
Partitions after repartition(4): 4
# Example 2: Decrease partitions to 2
df_repartitioned_2 = df.repartition(2)
print("Partitions after repartition(2):", df_repartitioned_2.rdd.getNumPartitions())
Partitions after repartition(2): 2

5. repartition() by Column (Optional)

# repartition by "age"
df_by_age = df.repartition(2, "age")
print("Partitions after repartition by age:", df_by_age.rdd.getNumPartitions())
Partitions after repartition by age: 2

6. Show Data after Repartition

# Show content to verify repartition didn't alter data
df_repartitioned_2.show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+

✅ When to Use repartition()

  • When you need balanced partitions for parallel processing
  • To increase partitions for better parallelism
  • To decrease partitions for optimized small output files

๐Ÿ“Š Summary

  • repartition() can increase or decrease partitions
  • Helps with load balancing and performance tuning
  • Good for parallel writes and skewed data correction

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

PySpark coalesce() Function Tutorial - Optimize Partitioning for Faster Spark Jobs #pysparktutorial

PySpark coalesce() Function Tutorial: Optimize Partitioning for Faster Spark Jobs

PySpark coalesce() Function Tutorial: Optimize Partitioning for Faster Spark Jobs

This tutorial will help you understand how to use the coalesce() function in PySpark to reduce the number of partitions in your DataFrame and improve performance.

1. What is coalesce() in PySpark?

  • coalesce() reduces the number of partitions in a DataFrame.
  • It is preferred over repartition() when reducing partitions because it avoids full data shuffle.
  • Ideal for optimizing small files and preparing data for output operations.

2. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark coalesce() Example") \\
    .getOrCreate()

3. Create Sample DataFrame

data = [
    (1, "Aamir Shahzad", 35),
    (2, "Ali Raza", 30),
    (3, "Bob", 25),
    (4, "Lisa", 28)
]

columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+

4. Check Number of Partitions Before coalesce()

print("Partitions before coalesce:", df.rdd.getNumPartitions())
Partitions before coalesce: 4

5. Apply coalesce() to Reduce Partitions

df_coalesced = df.coalesce(1)

6. Check Number of Partitions After coalesce()

print("Partitions after coalesce:", df_coalesced.rdd.getNumPartitions())
Partitions after coalesce: 1

7. Show Transformed Data

df_coalesced.show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad | TechBrothersIT

© 2025 PySpark Tutorials. All rights reserved.

PySpark checkpoint() : Improve Fault Tolerance & Speed in Spark Jobs -PySpark Tutorial for Beginners

PySpark checkpoint(): Improve Fault Tolerance & Speed in Spark Jobs

PySpark checkpoint(): Improve Fault Tolerance & Speed in Spark Jobs

In this tutorial, you'll learn how to use checkpoint() in PySpark to improve the performance and reliability of your Spark jobs. This technique is especially useful when dealing with long lineage or iterative transformations in your data pipelines.

1. What is checkpoint() in PySpark?

  • checkpoint() saves a copy of a DataFrame (or RDD) to reliable storage like DBFS or HDFS.
  • This cuts off Spark’s lineage (history of transformations) leading to the current data.
  • Helps with:
    • Improving performance
    • Adding fault tolerance
    • Protecting against recomputation if Spark needs to restart a job

2. Create Spark Session and Set Checkpoint Directory

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark checkpoint() Example") \\
    .getOrCreate()

# Set checkpoint directory (required before using checkpoint)
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")

3. Create Sample DataFrame

data = [
    (1, "Aamir Shahzad", 35),
    (2, "Ali Raza", 30),
    (3, "Bob", 25),
    (4, "Lisa", 28)
]

columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+

4. Apply a Transformation

# Example transformation: Filter people under age 28
df_filtered = df.filter(df["age"] > 28)

5. Checkpoint the Filtered DataFrame

# Apply checkpoint to save current state and cut lineage
df_checkpointed = df_filtered.checkpoint(eager=True)

6. Perform Further Operations on Checkpointed Data

# Group by name and calculate average age
result = df_checkpointed.groupBy("name").avg("age")
result.show()
+--------------+--------+
| name|avg(age)|
+--------------+--------+
| Aamir Shahzad| 35.0|
| Ali Raza| 30.0|
+--------------+--------+

7. Why Use checkpoint()?

  • Breaks long lineage chains to avoid stack overflow or slow jobs.
  • Saves a known good state to reliable storage (disk).
  • Protects against recomputation if Spark needs to restart tasks.
  • Helps with complex or iterative jobs that involve looping.

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials

How to Use Cache in PySpark to Improve Spark Performance | PySpark Tutorial #pysparktutorial

PySpark Tutorial: How to Use cache() to Improve Spark Performance

PySpark Tutorial: How to Use cache() to Improve Spark Performance

In this tutorial, you'll learn how to use the cache() function in PySpark to optimize performance by storing intermediate results in memory.

๐Ÿ“Œ What is cache() in PySpark?

cache() is an optimization technique that stores a DataFrame (RDD) in memory after an action is triggered. It avoids recomputing the DataFrame for future actions, improving performance.

๐Ÿ”ฅ Why Use cache()? (Benefits)

  • Speeds up jobs that reuse the same DataFrame
  • Saves recomputation time in iterative algorithms
  • Useful for exploratory data analysis (EDA)
  • Optimizes performance in joins and repeated filters

Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark cache() Example") \\
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    (1, "Aamir Shahzad", 35),
    (2, "Ali Raza", 30),
    (3, "Bob", 25),
    (4, "Lisa", 28)
]

columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+

Step 3: Cache the DataFrame

# Apply cache
df.cache()

Step 4: Trigger Action to Materialize Cache

# Action like count() triggers caching
df.count()
Output: 4

Step 5: Perform Actions on Cached Data

df.show()
df.filter(df.age > 28).show()
+---+--------------+---+
| id| name|age|
+---+--------------+---+
| 1| Aamir Shahzad| 35|
| 2| Ali Raza| 30|
| 3| Bob| 25|
| 4| Lisa| 28|
+---+--------------+---+

Step 6: Check if DataFrame is Cached

print("Is DataFrame cached?", df.is_cached)
Is DataFrame cached? True

Step 7: Remove Cache (Unpersist)

df.unpersist()
print("Is DataFrame cached after unpersist?", df.is_cached)
Is DataFrame cached after unpersist? False

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

How to Use approxQuantile() in PySpark | Quick Guide to Percentiles & Median #pysparktutorial

How to Use approxQuantile() in PySpark | Quick Guide to Percentiles & Median

How to Use approxQuantile() in PySpark

Quick Guide to Percentiles & Median

The approxQuantile() function in PySpark helps you estimate percentiles and median values quickly and efficiently. This is especially useful for large datasets when full scans are costly.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("approxQuantile Example") \
    .getOrCreate()

2. Create Sample DataFrame

data = [
    (1, "Aamir Shahzad", 35),
    (2, "Ali Raza", 30),
    (3, "Bob", 25),
    (4, "Lisa", 28),
    (5, "John", 40),
    (6, "Sara", 50)
]

columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+---+ | id| name|age| +---+--------------+---+ | 1| Aamir Shahzad| 35| | 2| Ali Raza| 30| | 3| Bob| 25| | 4| Lisa| 28| | 5| John| 40| | 6| Sara| 50| +---+--------------+---+

3. Use approxQuantile()

Example 1: Median (50th percentile)

median_age = df.approxQuantile("age", [0.5], 0.01)
print("Median Age:", median_age)
Median Age: [30.0]

Example 2: 25th, 50th, and 75th Percentiles

quantiles = df.approxQuantile("age", [0.25, 0.5, 0.75], 0.01)
print("25th, 50th, and 75th Percentiles:", quantiles)
25th, 50th, and 75th Percentiles: [28.0, 30.0, 40.0]

Example 3: Min, Median, Max

min_median_max = df.approxQuantile("age", [0.0, 0.5, 1.0], 0.01)
print("Min, Median, and Max Age:", min_median_max)
Min, Median, and Max Age: [25.0, 30.0, 50.0]

4. Control Accuracy with relativeError

# Lower relativeError = more accurate but slower
# Higher relativeError = less accurate but faster

# Example: Set relativeError to 0.1 (faster but less accurate)
quantiles_fast = df.approxQuantile("age", [0.25, 0.5, 0.75], 0.1)
print("Quantiles with higher relative error:", quantiles_fast)
Quantiles with higher relative error: [28.0, 30.0, 40.0]

๐Ÿ“บ Watch Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

PySpark Tutorial : How to Get a Column in PySpark by using DataFrame with Dot or DataFrame with Square Brackets

How to Get a Column in PySpark using Dot or Square Brackets

How to Get a Column in PySpark using Dot or Square Brackets

In PySpark, you can access DataFrame columns using either dot notation or square brackets. This tutorial shows both methods with examples and outputs.

1. Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Get Columns in PySpark") \
    .getOrCreate()

2. Create Sample DataFrame

data = [
    (1, "Aamir Shahzad", 35),
    (2, "Ali Raza", 30),
    (3, "Bob", 25),
    (4, "Lisa", 28)
]

columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+---+ | id| name|age| +---+--------------+---+ | 1| Aamir Shahzad| 35| | 2| Ali Raza| 30| | 3| Bob| 25| | 4| Lisa| 28| +---+--------------+---+

3. Get a Column in PySpark

Method 1: Using Dot Notation

df.select(df.name).show()
+--------------+ | name| +--------------+ | Aamir Shahzad| | Ali Raza| | Bob| | Lisa| +--------------+

Method 2: Using Square Brackets

df.select(df["name"]).show()
+--------------+ | name| +--------------+ | Aamir Shahzad| | Ali Raza| | Bob| | Lisa| +--------------+

4. Filter with Column Reference

Using Dot Notation

df.filter(df.age > 28).show()
+---+--------------+---+ | id| name|age| +---+--------------+---+ | 1| Aamir Shahzad| 35| | 2| Ali Raza| 30| +---+--------------+---+

Using Square Brackets

df.filter(df["age"] == 30).show()
+---+---------+---+ | id| name|age| +---+---------+---+ | 2| Ali Raza| 30| +---+---------+---+

5. View All Column Names

df.columns
['id', 'name', 'age']

6. Summary

  • df.name is easier and more concise, great for simple column access.
  • df["name"] is more flexible and safer when column names include spaces or special characters.
  • df.columns returns a list of all column names.

๐Ÿ“บ Watch the Full Tutorial Video

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.