PySpark Tutorial: How to Use rollup() to Aggregate Data by Groups and Subtotals #pyspark

PySpark Tutorial: How to Use rollup() to Aggregate Data by Groups and Subtotals

PySpark Tutorial: How to Use rollup() to Aggregate Data by Groups and Subtotals

In this PySpark tutorial, we’ll explore how to use the rollup() function to perform multi-level aggregations (group subtotals and grand totals). It’s very useful when you want to analyze hierarchical data grouped by multiple columns.

1. What is rollup() in PySpark?

The rollup() function in PySpark allows you to create subtotals and a grand total in grouped aggregations. It’s similar to SQL’s ROLLUP.

2. Import Required Libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum

3. Create Sample DataFrame

data = [
    ("Aamir Shahzad", "Pakistan", 5000),
    ("Ali Raza", "Pakistan", 6000),
    ("Bob", "USA", 5500),
    ("Lisa", "Canada", 7000),
    ("Aamir Shahzad", "Pakistan", 8000),
    ("Ali Raza", "Pakistan", 6500),
    ("Bob", "USA", 5200),
    ("Lisa", "Canada", 7200)
]

columns = ["Name", "Country", "Salary"]

df = spark.createDataFrame(data, columns)

print("Original DataFrame:")
df.show()

Output:

+--------------+--------+------+
|          Name| Country|Salary|
+--------------+--------+------+
| Aamir Shahzad|Pakistan|  5000|
|       Ali Raza|Pakistan|  6000|
|            Bob|    USA|  5500|
|           Lisa| Canada|  7000|
| Aamir Shahzad|Pakistan|  8000|
|       Ali Raza|Pakistan|  6500|
|            Bob|    USA|  5200|
|           Lisa| Canada|  7200|
+--------------+--------+------+

4. rollup() on Country and Name

df_rollup = df.rollup("Country", "Name") \
    .agg(_sum("Salary").alias("Total_Salary")) \
    .orderBy("Country", "Name")

print("Rollup Aggregation by Country and Name:")
df_rollup.show(truncate=False)

Output:

+--------+--------------+------------+
|Country |Name          |Total_Salary|
+--------+--------------+------------+
|Canada  |Lisa          |14200       |
|Canada  |null          |14200       |
|Pakistan|Aamir Shahzad |13000       |
|Pakistan|Ali Raza      |12500       |
|Pakistan|null          |25500       |
|USA     |Bob           |10700       |
|USA     |null          |10700       |
|null    |null          |50400       |
+--------+--------------+------------+

5. rollup() on Country Only

df_rollup_country = df.rollup("Country") \
    .agg(_sum("Salary").alias("Total_Salary")) \
    .orderBy("Country")

print("Rollup Aggregation by Country with Grand Total:")
df_rollup_country.show(truncate=False)

Output:

+--------+------------+
|Country |Total_Salary|
+--------+------------+
|Canada  |14200       |
|Pakistan|25500       |
|USA     |10700       |
|null    |50400       |
+--------+------------+

Conclusion

The rollup() function is a powerful tool for hierarchical aggregation in PySpark. It allows you to create subtotals at multiple levels as well as an overall total. It’s great for building summarized reports or dashboards directly from Spark DataFrames.

๐Ÿ“บ Watch the Full Video Tutorial

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials

PySpark freqItems() Function : Identify Frequent Items in Columns Fast | PySpark Tutorial

PySpark freqItems() Function | Identify Frequent Items in Columns Fast

PySpark freqItems() Function | Identify Frequent Items in Columns Fast

In this tutorial, you'll learn how to use PySpark's freqItems() function to identify frequent items in one or multiple DataFrame columns. This method is helpful when performing data analysis or finding patterns in datasets.

What is freqItems() in PySpark?

freqItems() is a PySpark DataFrame function that returns frequent items (values) in a column or multiple columns. It helps identify commonly occurring values. It's useful for:

  • Exploratory data analysis (EDA)
  • Data quality checks
  • Understanding data distributions

Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark freqItems() Example") \\
    .getOrCreate()

Step 2: Create Sample DataFrame

data = [
    (1, "Aamir Shahzad", "Pakistan"),
    (2, "Ali Raza", "Pakistan"),
    (3, "Bob", "USA"),
    (4, "Lisa", "Canada"),
    (5, "Aamir Shahzad", "Pakistan"),
    (6, "Ali Raza", "Pakistan"),
    (7, "Bob", "USA"),
    (8, "Lisa", "Canada"),
    (9, "Lisa", "Canada"),
    (10, "Aamir Shahzad", "Pakistan")
]

columns = ["ID", "Name", "Country"]

df = spark.createDataFrame(data, columns)

print("Original DataFrame:")
df.show()
+---+--------------+--------+
| ID| Name| Country|
+---+--------------+--------+
| 1| Aamir Shahzad|Pakistan|
| 2| Ali Raza|Pakistan|
| 3| Bob| USA|
| 4| Lisa| Canada|
| 5| Aamir Shahzad|Pakistan|
| 6| Ali Raza|Pakistan|
| 7| Bob| USA|
| 8| Lisa| Canada|
| 9| Lisa| Canada|
| 10| Aamir Shahzad|Pakistan|
+---+--------------+--------+

Step 3: Identify Frequent Items in a Single Column

freq_name = df.freqItems(["Name"])

print("Frequent Items in 'Name' Column:")
freq_name.show(truncate=False)
+--------------------+
|Name_freqItems |
+--------------------+
|[Lisa, Bob, Aamir Shahzad, Ali Raza]|
+--------------------+

Step 4: Identify Frequent Items in Multiple Columns

freq_name_country = df.freqItems(["Name", "Country"])

print("Frequent Items in 'Name' and 'Country' Columns:")
freq_name_country.show(truncate=False)
+--------------------+--------------------+
|Name_freqItems |Country_freqItems |
+--------------------+--------------------+
|[Lisa, Bob, Aamir Shahzad, Ali Raza]|[Pakistan, USA, Canada]|
+--------------------+--------------------+

Step 5: Adjust Support Threshold (Optional)

By default, the support threshold is 1%. You can increase it to find only more frequent items:

freq_with_support = df.freqItems(["Name"], support=0.4)

print("Frequent Items in 'Name' Column with Support = 0.4 (40%):")
freq_with_support.show(truncate=False)
+--------------------+
|Name_freqItems |
+--------------------+
|[Aamir Shahzad, Lisa]|
+--------------------+

๐Ÿ“บ Watch the Full Tutorial Video

For a complete step-by-step video guide, watch the tutorial below:

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

PySpark replace() Function Tutorial | Replace Values in DataFrames Easily | PySpark Tutorial

PySpark replace() Function Tutorial | Replace Values in DataFrames Easily

PySpark replace() Function Tutorial | Replace Values in DataFrames Easily

Learn how to use the replace() function in PySpark to replace values in one or more columns of a DataFrame. This guide explains everything step-by-step with examples and expected outputs.

What is replace() in PySpark?

The replace() function in PySpark allows you to replace specific values in one or more columns of a DataFrame. It is helpful for:

  • Data cleaning and preparation
  • Handling inconsistent data
  • Replacing multiple values at once

Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark replace() Example") \\
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    (1, "Aamir Shahzad", "Pakistan", 5000),
    (2, "Ali Raza", "Pakistan", 6000),
    (3, "Bob", "USA", 5500),
    (4, "Lisa", "Canada", 7000),
    (5, "Unknown", "Unknown", None)
]

columns = ["ID", "Name", "Country", "Salary"]

df = spark.createDataFrame(data, columns)
df.show()
Original DataFrame: +---+--------------+--------+------+ | ID| Name| Country|Salary| +---+--------------+--------+------+ | 1| Aamir Shahzad| Pakistan| 5000| | 2| Ali Raza| Pakistan| 6000| | 3| Bob| USA| 5500| | 4| Lisa| Canada| 7000| | 5| Unknown| Unknown| null| +---+--------------+--------+------+

Step 3: Replace a Single Value in One Column

# Replace 'Unknown' with 'Not Provided' in the Name column
df_replaced_name = df.replace("Unknown", "Not Provided", subset=["Name"])
df_replaced_name.show()
Expected Output: +---+--------------+--------+------+ | ID| Name| Country|Salary| +---+--------------+--------+------+ | 1| Aamir Shahzad| Pakistan| 5000| | 2| Ali Raza| Pakistan| 6000| | 3| Bob| USA| 5500| | 4| Lisa| Canada| 7000| | 5| Not Provided| Unknown| null| +---+--------------+--------+------+

Step 4: Replace Multiple Values in a Single Column

# Replace country names: Pakistan -> PK, USA -> US, Canada -> CA
df_replaced_country = df.replace({
    "Pakistan": "PK",
    "USA": "US",
    "Canada": "CA"
}, subset=["Country"])
df_replaced_country.show()
Expected Output: +---+--------------+--------+------+ | ID| Name| Country|Salary| +---+--------------+--------+------+ | 1| Aamir Shahzad| PK| 5000| | 2| Ali Raza| PK| 6000| | 3| Bob| US| 5500| | 4| Lisa| CA| 7000| | 5| Unknown| Unknown| null| +---+--------------+--------+------+

Step 5: Replace a Single Value Across Multiple Columns

# Replace 'Unknown' with 'Not Provided' in Name and Country columns
df_replaced_multiple = df.replace("Unknown", "Not Provided", subset=["Name", "Country"])
df_replaced_multiple.show()
Expected Output: +---+--------------+-------------+------+ | ID| Name| Country|Salary| +---+--------------+-------------+------+ | 1| Aamir Shahzad| Pakistan| 5000| | 2| Ali Raza| Pakistan| 6000| | 3| Bob| USA| 5500| | 4| Lisa| Canada| 7000| | 5| Not Provided| Not Provided| null| +---+--------------+-------------+------+

๐Ÿ“บ Watch the Full Tutorial Video

For a complete step-by-step video guide, watch the tutorial below:

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

PySpark Tutorial : PySpark transform() Function | How to Apply Custom Transformations to DataFrames

PySpark Tutorial: How to Use transform() for Custom DataFrame Transformations

PySpark Tutorial: How to Use transform() for Custom DataFrame Transformations

In this tutorial, you will learn how to use the transform() function in PySpark to apply custom reusable transformations on DataFrames. This is a great way to simplify complex logic and make your code cleaner!

What is transform() in PySpark?

The transform() function allows you to apply a custom function to a DataFrame. It’s a cleaner way to chain multiple operations, especially when applying reusable logic.

  • Helps make your code more reusable.
  • Applies custom transformations on DataFrames.
  • Keeps your DataFrame pipelines clean and modular.

Step 1: Create Spark Session

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper

spark = SparkSession.builder \\
    .appName("PySpark transform() Example") \\
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    (1, "Aamir Shahzad", 5000),
    (2, "Ali Raza", 6000),
    (3, "Bob", 5500),
    (4, "Lisa", 7000)
]

columns = ["ID", "Name", "Salary"]

df = spark.createDataFrame(data, columns)
print("Original DataFrame:")
df.show()
Original DataFrame: +---+--------------+------+ | ID| Name|Salary| +---+--------------+------+ | 1| Aamir Shahzad| 5000| | 2| Ali Raza| 6000| | 3| Bob| 5500| | 4| Lisa| 7000| +---+--------------+------+

Step 3: Define a Transformation Function

def add_bonus(input_df):
    # 1. Uppercase the Name column
    # 2. Add a new column "Bonus" which is 10% of Salary
    return input_df.withColumn("Name_Upper", upper(col("Name"))) \\
                   .withColumn("Bonus", col("Salary") * 0.10)

Step 4: Apply transform() Function

df_transformed = df.transform(add_bonus)

print("Transformed DataFrame:")
df_transformed.show()
Transformed DataFrame: +---+--------------+------+--------------+------+ | ID| Name|Salary| Name_Upper| Bonus| +---+--------------+------+--------------+------+ | 1| Aamir Shahzad| 5000| AAMIR SHAHZAD| 500.0| | 2| Ali Raza| 6000| ALI RAZA| 600.0| | 3| Bob| 5500| BOB| 550.0| | 4| Lisa| 7000| LISA| 700.0| +---+--------------+------+--------------+------+

Why Use transform()?

  • Cleaner, modular code with reusable logic.
  • Perfect for applying consistent transformations across multiple DataFrames.
  • Improves readability and testability in complex pipelines.

๐Ÿ“บ Watch the Full Tutorial Video

For a complete walkthrough, watch the video below:

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

PySpark Tutorial : How to Use subtract() to Compare and Filter DataFrames

PySpark Tutorial: How to Use subtract() to Compare and Filter DataFrames

PySpark Tutorial: How to Use subtract() to Compare and Filter DataFrames

In this tutorial, you'll learn how to use the subtract() function in PySpark to find differences between two DataFrames. A simple way to compare and filter rows in big data!

What is subtract() in PySpark?

The subtract() function returns rows that exist in the first DataFrame but not in the second. It works like the EXCEPT operator in SQL.

  • Both DataFrames must have the same schema.
  • Commonly used to compare datasets or filter out rows.

Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark Subtract Example") \\
    .getOrCreate()

Step 2: Create Sample DataFrames

# First DataFrame
data1 = [
    (29, "Aamir Shahzad"),
    (35, "Ali Raza"),
    (40, "Bob"),
    (25, "Lisa")
]

columns = ["age", "name"]

df1 = spark.createDataFrame(data1, columns)

print("DataFrame 1:")
df1.show()

# Second DataFrame
data2 = [
    (40, "Bob"),
    (25, "Lisa")
]

df2 = spark.createDataFrame(data2, columns)

print("DataFrame 2:")
df2.show()
DataFrame 1: +---+--------------+ |age| name| +---+--------------+ | 29| Aamir Shahzad| | 35| Ali Raza| | 40| Bob| | 25| Lisa| +---+--------------+ DataFrame 2: +---+----+ |age|name| +---+----+ | 40| Bob| | 25|Lisa| +---+----+

Step 3: Using subtract() Function in PySpark

# Subtract df2 from df1
result_df = df1.subtract(df2)

print("Result after subtracting df2 from df1:")
result_df.show()
Expected Output: +---+--------------+ |age| name| +---+--------------+ | 29| Aamir Shahzad| | 35| Ali Raza| +---+--------------+

Why Use subtract()?

  • Helps identify differences between two DataFrames.
  • Useful for change data capture (CDC) or data validation.
  • Removes all rows from df1 that also exist in df2 (based on complete row match).

๐Ÿ“บ Watch the Full Tutorial Video

For a detailed walkthrough, watch the video below:

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2024 PySpark Tutorials. All rights reserved.

PySpark Tutorial for Beginners and Advanced Users

PySpark Video Tutorial Series

The Complete Video Tutorial Series (Beginner to Advanced)

Chapter 1: Introduction to PySpark

  1. What is PySpark ? What is Apache Spark | Apache Spark vs PySpark
  2. How to Create Azure Databricks Service | PySpark Tutorial | PySpark Beginners Tutorial
  3. How to Format Notebooks | Top Markdown Formatting Tips & Tricks | PySpark Tutorial
  4. How to Use Comments in PySpark | How to Write Comments Like a Pro | PySpark Tutorial

Chapter 2: Core DataFrame Operations

  1. What is a DataFrame in PySpark? | How to create DataFrame from Static Values
  2. How to Add Columns and Check Schema in PySpark DataFrame
  3. How to Use createDataFrame Function with Schema in PySpark to create DataFrame
  4. How to Display Data in PySpark Using show() Function
  5. How to Use the display() Function in Databricks
  6. How to Use select(), selectExpr(), col(), expr(), when(), and lit() in PySpark
  7. How to Use withColumn() Function in PySpark to Add & Update Columns
  8. Use distinct() to Remove Duplicates from DataFrames | Get Unique Rows with distinct() in PySpark
  9. How to Use drop() to Remove Columns from DataFrame
  10. How to use dropDuplicates Function in PySpark
  11. How to Use dropna() Function in PySpark | Remove Null Values Easily
  12. How to use PySpark count() Function | Count Rows & Records Easily
  13. filter() vs where() | Filter DataFrames
  14. How to Use subtract to Compare and Filter DataFrames
  15. How to Use transform() for Custom DataFrame Transformations
  16. PySpark replace() Function | Replace Values in DataFrames Easily

Chapter 3: Reading and Writing DataFrames

  1. How to Read CSV File into DataFrame from Azure Blob Storage
  2. How to Read JSON File into DataFrame from Azure Blob Storage
  3. How to Use toJSON() - Convert DataFrame Rows to JSON Strings
  4. writeTo() Explained: Save, Append, Overwrite DataFrames to Tables
  5. How to use Write function to Create Single CSV file in Blob Storage from DataFrame
  6. How to Write DataFrame to Parquet File in Azure Blob Storage
  7. How to Write DataFrame to JSON File in Azure Blob Storage
  8. How to Write DataFrame to Azure SQL Table Using JDBC write() Function
  9. How to Read Data from Azure SQL Table and Write to JSON File in Blob Storage
  10. Read Multiple CSV Files from Blob Storage and Write to Azure SQL Table with Filenames

Chapter 4: Advanced DataFrame Operations

  1. How to Use the display() Function in Databricks
  2. How to Sort DataFrames Using orderBy()
  3. limit() Function to Display Limited Rows
  4. How to Use describe() for DataFrame Statistics
  5. Difference Between union() and unionAll() | union() vs unionAll()
  6. fillna() Function to Replace Null or Missing Values
  7. groupBy() Function | Group & Summarize DataFrames
  8. first(), head(), and tail() Functions | Retrieve Data Efficiently
  9. exceptAll() Function Explained | Subtract and Find Differences Between DataFrames
  10. How to Use Intersect() and IntersectAll() | Compare DataFrames Easily
  11. na Functions and isEmpty Explained with Examples
  12. How to Use Cube for GroupBy and Aggregations
  13. How to Aggregate Data Using agg() Function
  14. How to Use colRegex() for Column Selection
  15. How to Get Column Names Using columns Function
  16. How to Use Pivot Function | Transform and Summarize Data Easily
  17. How to Perform Unpivot | Convert Columns to Rows Easily
  18. How to Use createTempView | Run SQL Queries on DataFrames
  19. How to Use createGlobalTempView | Share Views Across Sessions
  20. Melt Function Explained | Reshape & Unpivot DataFrames Step by Step
  21. How to Use dtypes | Get DataFrame Column Names and Types
  22. How to Use randomSplit | Split Your DataFrame into Train and Test Sets
  23. DataFrame.to Function | Schema Reconciliation & Column Reordering Made Easy
  24. take Function | Get First N Rows from DataFrame Fast
  25. DataFrame summary for Statistical Summary in One Command
  26. freqItems() Function | Identify Frequent Items in Columns Fast
  27. How to Use rollup() to Aggregate Data by Groups and Subtotals
  28. How to Use crosstab() to Analyze Relationships Between Columns
  29. How to Use unionByName() to Join DataFrames by Column Names
  30. How to Use sample() to Randomly Select Data
  31. How to Use withColumnRenamed() to Rename Columns
  32. How to use withColumnsRenamed to Rename Multiple Columns
  33. stat Function Tutorial | Perform Statistical Analysis on DataFrames

Chapter 5: DataFrame Performance and Optimization

  1. How to Get a Column in PySpark by using DataFrame with Dot or DataFrame with Square Brackets
  2. How to Use approxQuantile() in PySpark | Quick Guide to Percentiles & Median
  3. How to Use Cache in PySpark to Improve Spark Performance
  4. checkpoint() : Improve Fault Tolerance & Speed in Spark Jobs - Tutorial for Beginners
  5. coalesce() Function Tutorial - Optimize Partitioning for Faster Spark Jobs
  6. repartition() Function Tutorial: Optimize Data Partitioning for Better Performance
  7. collect() Function Tutorial : Retrieve Entire DataFrame to Driver with Examples
  8. How to Use hint() for Join Optimization – Broadcast, Shuffle, Merge
  9. localCheckpoint Explained | Improve Performance with localCheckpoint()
  10. How to Use persist() Function | Cache vs Persist Explained with Examples
  11. Optimize Your Data with repartitionByRange()
  12. unpersist Explained | How to Free Memory with unpersist Function With Examples

Chapter 6: DataFrame Analysis

  1. How to Use corr() Function in PySpark : Finding Correlation Between Columns with corr()
  2. cov() Tutorial | Covariance Analysis for Numerical Columns

Chapter 7: DataFrame Joins

  1. How to Use crossJoin() Function for Cartesian Product
  2. Joins Explained | Inner, Left, Right Join with Examples in PySpark DataFrames

Chapter 8: RDDs in PySpark

  1. How to Convert PySpark DataFrame to RDD Using .rdd | PySpark RDD vs DataFrame
  2. What is RDD in PySpark? | A Beginner’s Guide to Apache Spark’s Core Data Structure
  3. Different Ways to create RDD in PySpark
  4. What Are RDD Partitions in PySpark? | How Spark Partitioning works
  5. PySpark PairRDD Transformations | groupByKey, reduceByKey, sortByKey Explained with Real Data
  6. Understanding RDD Actions in PySpark - collect() vs count() vs reduce()
  7. RDD Persistence in PySpark Explained | MEMORY_ONLY vs MEMORY_AND_DISK with Examples
  8. Optimize Spark Shuffles: Internals of groupByKey vs reduceByKey
  9. Boost PySpark Performance with Broadcast Variables & Accumulators
  10. RDD vs DataFrame in PySpark – Key Differences with Real Examples

Chapter 9: Working with Functions

  1. PySpark foreach Function Tutorial Apply Custom Logic to Each Row in a DataFrame
  2. PySpark foreachPartition Explained Process DataFrame Partitions Efficiently with Examples
  3. How to Use call_function() in PySpark – Dynamically Apply SQL Functions in Your Code
  4. How to Use call_udf() in PySpark | Dynamically Apply UDFs in Real-Time
  5. Boost PySpark Performance with pandas_udf
  6. Create Custom Column Logic in PySpark Using udf() | Easy Guide with Real Examples
  7. How to Build User-Defined Table Functions (UDTFs) in PySpark | Split Rows

Chapter 10: SQL and Metadata

  1. PySpark inputFiles Function - How to Get Source File Paths from DataFrame
  2. PySpark isLocal Function : Check If DataFrame Operations Run Locally
  3. What is explain() in PySpark - Spark Logical vs Physical Plan - PySpark Tutorial for Beginners
  4. PySpark offset() Function : How to Skip Rows in Spark DataFrame
  5. Explore Databases and Tables in PySpark with spark.catalog | Guide to Metadata Management

Chapter 11: Sorting and Data Types

  1. PySpark Sorting Explained | ASC vs DESC | Handling NULLs with asc_nulls_first & desc_nulls_last
  2. PySpark cast() vs astype() Explained | Convert String to Int, Float & Double in DataFrame
  3. Core Data Types in PySpark Explained - IntegerType, FloatType, DoubleType, DecimalType, StringType
  4. PySpark Complex Data Types Explained : ArrayType, MapType, StructType & StructField for Beginners

Chapter 12: Nulls and Complex Types

  1. PySpark Null & Comparison Functions : Between(), isNull(), isin(), like(), rlike(), ilike()
  2. PySpark when() and otherwise() Explained | Apply If-Else Conditions to DataFrames
  3. PySpark String Functions Explained | contains(), startswith(), substr(), endswith()
  4. Working with Structs and Nested Fields in PySpark getField, getItem, withField, dropFields
  5. PySpark Date and Timestamp Types - DateType, TimestampType, Interval Types
  6. PySpark Array Functions : array(), array_contains(), sort_array(), array_size()
  7. PySpark Set-Like Array Functions : arrays_overlap(), array_union(), flatten(), array_distinct()
  8. Advanced Array Manipulations in PySpark _ slice(), concat(), element_at(), sequence()
  9. PySpark Map Functions: create_map(), map_keys(), map_concat(), map_values
  10. Transforming Arrays and Maps in PySpark : Advanced Functions_ transform(), filter(), zip_with()
  11. Flatten Arrays & Structs with explode(), inline(), and struct()

Chapter 13: Dates and Times

  1. Top PySpark Built-in DataFrame Functions Explained | col(), lit(), when(), expr(), rand() & More
  2. Top PySpark Math Functions Explained | abs(), round(), log(), pow(), sin(), degrees() & More
  3. Getting Current Date & Time in PySpark | curdate(), now(), current_timestamp()
  4. Date Calculations in PySpark | Add, Subtract, Datediff, Months Between & More
  5. PySpark Date Formatting & Conversion Tutorial : to_date(), to_timestamp(), unix_timestamp()
  6. PySpark Date & Time Creation : make_date(), make_timestamp(), make_interval()
  7. PySpark Date and Time Extraction Tutorial _ year(), hour(), dayofweek(), date_part()
  8. PySpark Date Truncation Functions: trunc(), date_trunc(), last_day()

Chapter 14: String Manipulation

  1. How to perform String Cleaning in PySpark lower, trim, initcap Explained with Real Data
  2. Substring Functions in PySpark: substr(), substring(), overlay(), left(), right()
  3. String Search in PySpark | contains, startswith, endswith, like, rlike, locate
  4. String Formatting in PySpark | concat_ws, format_number, printf, repeat, lpad, rpad Explained
  5. Split Strings in PySpark | split(str, pattern, limit) Function Explained with Examples
  6. Split_part() in PySpark Explained | Split Strings by Delimiter and Extract Specific Part
  7. Using find_in_set() in PySpark | Search String Position in a Delimited List
  8. Extract Data with regexp_extract() in PySpark | Regex Patterns Made Easy
  9. Clean & Transform Strings in PySpark Using regexp_replace() Replace Text with Regex
  10. Extract Substrings Easily in PySpark with regexp_substr()

Chapter 15: JSON

  1. PySpark JSON Functions Explained | How to Parse, Transform & Extract JSON Fields in PySpark

Chapter 16: Aggregations and Window Functions

  1. PySpark Aggregations: count(), count_distinct(), first(), last() Explained with Examples
  2. Statistical Aggregations in PySpark | avg(), mean(), median(), mode()
  3. Summarizing Data with Aggregate Functions in PySpark _ sum(), sum_distinct(), bit_and()
  4. PySpark Window Functions | Rank, Dense_Rank, Lead, Lag, NTILE | Real-World Demo
  5. Master PySpark Sorting: sort(), asc(), desc() Explained with Examples

Chapter 17: Additional DataFrame Functions

  1. PySpark toLocalIterator Explained: Efficiently Convert DataFrame to Iterator
  2. PySpark sequence Function Explained: Generate Sequences of Numbers and Dates

© 2025 Aamir Shahzad. All rights reserved.

PySpark Tutorial : DataFrame.summary() for Statistical Summary in One Command

PySpark Tutorial: DataFrame.summary() for Statistical Summary in One Command

PySpark Tutorial: DataFrame.summary() for Statistical Summary in One Command

In this tutorial, you will learn how to use summary() in PySpark to quickly generate statistical summaries of your data. Perfect for data exploration and quick analysis!

What is summary() in PySpark?

The summary() function provides descriptive statistics for numeric and string columns in a PySpark DataFrame.

By default, it shows:

  • count: Number of records
  • mean: Average for numeric columns
  • stddev: Standard deviation
  • min: Minimum value
  • max: Maximum value

Step 1: Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark Summary Example") \\
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    (29, "Aamir Shahzad", 5000),
    (35, "Ali Raza", 6000),
    (40, "Bob", 5500),
    (25, "Lisa", 5200)
]

columns = ["age", "name", "salary"]

df = spark.createDataFrame(data, columns)
df.show()
+---+--------------+------+
|age| name|salary|
+---+--------------+------+
| 29| Aamir Shahzad| 5000|
| 35| Ali Raza| 6000|
| 40| Bob| 5500|
| 25| Lisa| 5200|
+---+--------------+------+

Step 3: Use summary() Function

Example 1: Default Summary Statistics

df.summary().show()
+-------+-----+--------------+--------+
|summary| age | name | salary |
+-------+-----+--------------+--------+
| count | 4 | 4 | 4 |
| mean |32.25| null |5425.00 |
|stddev |7.50 | null |567.64 |
| min | 25 |Aamir Shahzad | 5000 |
| max | 40 | Lisa | 6000 |
+-------+-----+--------------+--------+

Example 2: Custom Summary Statistics

df.summary("count", "min", "max").show()
+-------+-----+--------------+--------+
|summary| age | name | salary |
+-------+-----+--------------+--------+
| count | 4 | 4 | 4 |
| min | 25 |Aamir Shahzad | 5000 |
| max | 40 | Lisa | 6000 |
+-------+-----+--------------+--------+

Best Practices

  • summary() is quick and easy for descriptive statistics.
  • Use it for numeric columns and get counts/min/max for string columns.
  • Combine summary() with describe() for a complete statistical overview.

๐Ÿ“บ Watch the Full Tutorial Video

For a detailed walkthrough, watch the video below:

▶️ Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark Tutorial : PySpark take Function | Get First N Rows from DataFrame Fast #pysparktutorial

PySpark take() Function | Get First N Rows from DataFrame Fast

PySpark take() Function

Get First N Rows from DataFrame Fast

In this tutorial, you'll learn how to use the take() function in PySpark to quickly retrieve the first N rows from a DataFrame. It's a handy method for previewing and debugging data.

Introduction

When working with PySpark DataFrames, it's often useful to retrieve a small sample of rows for inspection or validation. The take() function allows you to pull the first N rows efficiently, returning them as a list of Row objects.

PySpark Code Example

from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("PySpark take() Example").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 85, "Math"),
    ("Ali Raza", 78, "Science"),
    ("Bob", 92, "History"),
    ("Lisa", 80, "Math"),
    ("John", 88, "Science")
]

# Define columns
columns = ["Name", "Score", "Subject"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show complete DataFrame
print("Complete DataFrame:")
df.show()

# Example 1: Take the first 2 rows
first_two_rows = df.take(2)
print("First 2 rows using take():")
for row in first_two_rows:
    print(row)

# Example 2: Take more rows than exist (request 10 rows, only 5 in DataFrame)
more_rows = df.take(10)
print("Taking more rows than available (requested 10 rows):")
for row in more_rows:
    print(row)

Expected Output: Complete DataFrame

+-------------+-----+--------+
| Name        |Score|Subject |
+-------------+-----+--------+
|Aamir Shahzad|   85|    Math|
|Ali Raza     |   78| Science|
|Bob          |   92| History|
|Lisa         |   80|    Math|
|John         |   88| Science|
+-------------+-----+--------+

Expected Output: First 2 Rows Using take()

Row(Name='Aamir Shahzad', Score=85, Subject='Math')
Row(Name='Ali Raza', Score=78, Subject='Science')

Expected Output: Taking More Rows Than Exist (Requested 10 Rows)

Row(Name='Aamir Shahzad', Score=85, Subject='Math')
Row(Name='Ali Raza', Score=78, Subject='Science')
Row(Name='Bob', Score=92, Subject='History')
Row(Name='Lisa', Score=80, Subject='Math')
Row(Name='John', Score=88, Subject='Science')

๐ŸŽฅ Watch the Video Tutorial

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark Tutorial : PySpark DataFrame.to Function | Schema Reconciliation and Column Reordering Made Easy

PySpark DataFrame.to() Function | Schema Reconciliation and Column Reordering Made Easy

๐Ÿงฉ PySpark DataFrame.to() Function

Schema Reconciliation and Column Reordering Made Easy

Learn how to use the DataFrame.to() function introduced in PySpark 3.4.0 to reorder columns and reconcile schema effortlessly.

๐Ÿ“˜ Introduction

PySpark’s DataFrame.to() function helps with schema alignment, column reordering, and type casting — all in one step. This is especially useful when you're writing to tables, preparing data for joins, or ensuring schema compliance.

๐Ÿ”ง PySpark Code Example

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

# Create SparkSession
spark = SparkSession.builder.appName("DataFrame.to Example").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "Math", 85, True),
    ("Ali Raza", "Science", 78, False),
    ("Bob", "History", 92, True),
    ("Lisa", "Math", 80, False)
]

columns = ["Name", "Subject", "Score", "Passed"]
df = spark.createDataFrame(data, columns)

# Print original schema and data
df.show(truncate=False)
df.printSchema()

# Define new schema with reordered and altered types
schema = StructType([
    StructField("Passed", BooleanType(), True),
    StructField("Score", StringType(), True),  # Cast from int to string
    StructField("Name", StringType(), True),
    StructField("Subject", StringType(), True)
])

# Apply .to() transformation
df2 = df.to(schema)

# Show transformed DataFrame
df2.show(truncate=False)
df2.printSchema()

๐Ÿ“Š Original DataFrame Output

+-------------+--------+-----+-------+
| Name        |Subject |Score|Passed |
+-------------+--------+-----+-------+
|Aamir Shahzad|Math    |   85|   true|
|Ali Raza     |Science |   78|  false|
|Bob          |History |   92|   true|
|Lisa         |Math    |   80|  false|
+-------------+--------+-----+-------+

root
 |-- Name: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Score: long (nullable = true)
 |-- Passed: boolean (nullable = true)

✅ New DataFrame Output (After .to())

+-------+-----+-------------+--------+
|Passed |Score|Name         |Subject |
+-------+-----+-------------+--------+
|   true|85   |Aamir Shahzad|Math    |
|  false|78   |Ali Raza     |Science |
|   true|92   |Bob          |History |
|  false|80   |Lisa         |Math    |
+-------+-----+-------------+--------+

root
 |-- Passed: boolean (nullable = true)
 |-- Score: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Subject: string (nullable = true)

๐Ÿ’ก Key Takeaways

  • DataFrame.to() allows column reordering and type conversion.
  • It is especially useful when writing to pre-defined tables or aligning multiple datasets.
  • Introduced in PySpark 3.4.0 and above.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

How to Use randomSplit in PySpark | Split Your DataFrame into Train and Test Sets | PySpark Tutorial

How to Use randomSplit() in PySpark | Split Your DataFrame into Train and Test Sets

How to Use randomSplit() in PySpark

Split Your DataFrame into Train and Test Sets | PySpark Tutorial

Learn how to efficiently split your DataFrame into training and testing datasets using PySpark's randomSplit() method. A must-have step when preparing data for machine learning workflows.

๐Ÿ“˜ Introduction

In machine learning and data science, splitting data into training and testing sets is a fundamental step. PySpark’s randomSplit() function allows you to do this easily and reproducibly across distributed data.

๐Ÿ”ง PySpark Code Example

from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("PySpark randomSplit Example").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 85, "Math"),
    ("Ali Raza", 78, "Science"),
    ("Bob", 92, "History"),
    ("Lisa", 80, "Math"),
    ("John", 88, "Science"),
    ("Emma", 75, "History"),
    ("Sophia", 90, "Math"),
    ("Daniel", 83, "Science"),
    ("David", 95, "History"),
    ("Olivia", 77, "Math")
]

columns = ["Name", "Score", "Subject"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

# Split DataFrame (70% train, 30% test)
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

# Show results
print("Training Set:")
train_df.show()

print("Testing Set:")
test_df.show()

๐Ÿ“Š Original DataFrame Output

+-------------+-----+--------+
| Name        |Score|Subject |
+-------------+-----+--------+
|Aamir Shahzad|   85|    Math|
|Ali Raza     |   78| Science|
|Bob          |   92| History|
|Lisa         |   80|    Math|
|John         |   88| Science|
|Emma         |   75| History|
|Sophia       |   90|    Math|
|Daniel       |   83| Science|
|David        |   95| History|
|Olivia       |   77|    Math|
+-------------+-----+--------+

✅ Training Dataset Output (Approx. 70%)

+-------------+-----+--------+
| Name        |Score|Subject |
+-------------+-----+--------+
|Ali Raza     |   78| Science|
|Daniel       |   83| Science|
|Emma         |   75| History|
|John         |   88| Science|
|Lisa         |   80|    Math|
|Sophia       |   90|    Math|
+-------------+-----+--------+

✅ Testing Dataset Output (Approx. 30%)

+-------------+-----+--------+
| Name        |Score|Subject |
+-------------+-----+--------+
|Aamir Shahzad|   85|    Math|
|Bob          |   92| History|
|David        |   95| History|
|Olivia       |   77|    Math|
+-------------+-----+--------+

๐Ÿ’ก Key Notes

  • randomSplit([0.7, 0.3]) divides your data into 70% and 30% splits.
  • You can split into more than two sets, e.g., `[0.6, 0.2, 0.2]` for train/test/val.
  • The seed parameter ensures reproducible results.
  • This function is efficient for large datasets processed in Spark clusters.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark Tutorial : How to Use dtypes in PySpark | Get DataFrame Column Names and Types

PySpark Tutorial: How to Use dtypes in PySpark | Get DataFrame Column Names and Types

PySpark Tutorial: How to Use dtypes in PySpark

Get DataFrame Column Names and Types | Step-by-Step Guide

Learn how to use dtypes in PySpark to quickly inspect column names and their corresponding data types — a must-know feature for any data engineer or analyst.

๐Ÿ“˜ Introduction

Understanding the structure of your DataFrame is essential for building reliable data pipelines. The dtypes attribute helps you list out each column with its corresponding data type in a quick and readable format.

๐Ÿ”ง PySpark Code Example

from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("PySpark dtypes Example").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 85, 90.5, True),
    ("Ali Raza", 78, 83.0, False),
    ("Bob", 92, 95.2, True),
    ("Lisa", 80, 87.8, False)
]

# Define columns
columns = ["Name", "Math_Score", "Science_Score", "Passed"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

# Get column names and data types
print("Column Names and Data Types (dtypes):")
print(df.dtypes)

# Formatted output
print("\nFormatted Output of Column Data Types:")
for col_name, data_type in df.dtypes:
    print(f"Column: {col_name}, Type: {data_type}")

๐Ÿ“Š Original DataFrame Output

+-------------+----------+-------------+-------+
| Name        |Math_Score|Science_Score|Passed |
+-------------+----------+-------------+-------+
|Aamir Shahzad|        85|         90.5|   true|
|Ali Raza     |        78|         83.0|  false|
|Bob          |        92|         95.2|   true|
|Lisa         |        80|         87.8|  false|
+-------------+----------+-------------+-------+

๐Ÿ“ฅ dtypes Attribute Output

[('Name', 'string'), 
 ('Math_Score', 'bigint'), 
 ('Science_Score', 'double'), 
 ('Passed', 'boolean')]

✅ Formatted Column Type Output

Column: Name, Type: string
Column: Math_Score, Type: bigint
Column: Science_Score, Type: double
Column: Passed, Type: boolean

๐Ÿ’ก Why Use dtypes?

  • Quickly inspect DataFrame schema during exploration and debugging.
  • Useful in dynamic transformations where data types matter (e.g., casting).
  • A faster alternative to printSchema() when you just need names and types.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark Melt Function Explained | Reshape & Unpivot DataFrames Step by Step | PySpark Tutorial

How to Use Melt Function in PySpark | Step-by-Step Guide

How to Use melt() Function in PySpark

Reshape & Unpivot DataFrames | Step-by-Step Guide

Learn how to reshape and unpivot PySpark DataFrames using a custom melt function. Ideal for data engineering and analytics workflows.

๐Ÿ“˜ Introduction

PySpark doesn’t have a native melt() function like pandas. However, we can mimic this behavior by combining explode(), struct(), and array(). This transformation is useful when converting a wide table (with many columns) into a long format (more rows, fewer columns).

๐Ÿ”ง PySpark Code Example

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create SparkSession
spark = SparkSession.builder.appName("PySpark Melt Function Example").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 85, 90, 88),
    ("Ali Raza", 78, 83, 89),
    ("Bob", 92, 95, 91),
    ("Lisa", 80, 87, 85)
]

columns = ["Name", "Math", "Science", "History"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

# Custom melt function
def melt(df, id_vars, value_vars, var_name="Subject", value_name="Score"):
    from pyspark.sql.functions import array, struct, explode, lit
    _vars_and_vals = array(*[
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars
    ])
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    cols = id_vars + [
        col("_vars_and_vals")[var_name].alias(var_name),
        col("_vars_and_vals")[value_name].alias(value_name)
    ]
    return _tmp.select(*cols)

# Apply melt
melted_df = melt(df, ["Name"], ["Math", "Science", "History"])
melted_df.show()

๐Ÿ“Š Original DataFrame Output

+-------------+----+-------+-------+
| Name        |Math|Science|History|
+-------------+----+-------+-------+
|Aamir Shahzad|  85|     90|     88|
|Ali Raza     |  78|     83|     89|
|Bob          |  92|     95|     91|
|Lisa         |  80|     87|     85|
+-------------+----+-------+-------+

๐Ÿ“Ž Melted (Unpivoted) DataFrame Output

+-------------+--------+-----+
| Name        |Subject |Score|
+-------------+--------+-----+
|Aamir Shahzad|   Math |   85|
|Aamir Shahzad|Science |   90|
|Aamir Shahzad|History |   88|
|Ali Raza     |   Math |   78|
|Ali Raza     |Science |   83|
|Ali Raza     |History |   89|
|Bob          |   Math |   92|
|Bob          |Science |   95|
|Bob          |History |   91|
|Lisa         |   Math |   80|
|Lisa         |Science |   87|
|Lisa         |History |   85|
+-------------+--------+-----+

๐Ÿ’ก Explanation

  • The melt function uses array() and struct() to combine columns and values into an array of structs.
  • explode() is used to flatten that array into rows.
  • This technique is equivalent to the pandas melt() and is very useful for reshaping DataFrames before writing to analytics layers.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark Tutorial : How to Use createGlobalTempView in PySpark | Share Views Across Sessions

How to Use createGlobalTempView() in PySpark | Step-by-Step Guide

How to Use createGlobalTempView() in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The createGlobalTempView() function in PySpark allows you to register a DataFrame as a global temporary view that can be accessed across multiple Spark sessions. Unlike createTempView(), it persists for the lifetime of the Spark application and is stored in the global_temp database.

๐Ÿงพ Sample Dataset

Name            Department     Salary
Aamir Shahzad   Engineering     5000
Ali             Sales           4000
Raza            Marketing       3500
Bob             Sales           4200
Lisa            Engineering     6000

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("createGlobalTempViewExample").getOrCreate()

data = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
df.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ“Œ Create a Global Temporary View

df.createOrReplaceGlobalTempView("employee_global_view")

๐Ÿ“Š Query the Global Temp View in Current Session

result1 = spark.sql("SELECT * FROM global_temp.employee_global_view")
result1.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ” Access the Global View from a New Session

new_session = SparkSession.builder.appName("AnotherSession").getOrCreate()

result2 = new_session.sql("SELECT * FROM global_temp.employee_global_view")
result2.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ“ˆ Aggregate Example: Average Salary by Department

result3 = spark.sql("""
  SELECT Department, AVG(Salary) AS Avg_Salary
  FROM global_temp.employee_global_view
  GROUP BY Department
""")
result3.show()

✅ Expected Output

+-----------+----------+
| Department|Avg_Salary|
+-----------+----------+
|  Marketing|    3500.0|
|Engineering|    5500.0|
|      Sales|    4100.0|
+-----------+----------+

๐Ÿ’ก Key Points

  • createGlobalTempView() enables view sharing across sessions.
  • The view must be accessed using the global_temp database.
  • Global temp views last until the Spark application terminates.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Use createTempView in PySpark | Run SQL Queries on DataFrames | PySpark Tutorial

How to Use createTempView() in PySpark | Step-by-Step Guide

How to Use createTempView() in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The createTempView() function registers a PySpark DataFrame as a temporary SQL view. This allows you to execute SQL queries using Spark SQL, combining the power of SQL with the flexibility of DataFrames.

๐Ÿงพ Sample Dataset

Name           Department     Salary
Aamir Shahzad   Engineering     5000
Ali             Sales           4000
Raza            Marketing       3500
Bob             Sales           4200
Lisa            Engineering     6000

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("createTempViewExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
df.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ“Œ Register DataFrame as a Temp View

# Register as temporary SQL view
df.createOrReplaceTempView("employee_view")

๐Ÿ“Š Run SQL Queries on the Temp View

Example 1: Select All Records

result1 = spark.sql("SELECT * FROM employee_view")
result1.show()

✅ Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Example 2: Filter by Department

result2 = spark.sql("""
  SELECT Name, Salary
  FROM employee_view
  WHERE Department = 'Sales'
""")
result2.show()

✅ Output

+-----+------+
| Name|Salary|
+-----+------+
|  Ali|  4000|
|  Bob|  4200|
+-----+------+

Example 3: Average Salary by Department

result3 = spark.sql("""
  SELECT Department, AVG(Salary) AS Avg_Salary
  FROM employee_view
  GROUP BY Department
""")
result3.show()

✅ Output

+-----------+----------+
| Department|Avg_Salary|
+-----------+----------+
|  Marketing|    3500.0|
|Engineering|    5500.0|
|      Sales|    4100.0|
+-----------+----------+

๐Ÿ’ก Key Points

  • createTempView() is used to register a DataFrame as a temporary view.
  • You can use spark.sql() to run standard SQL queries on that view.
  • The view is session-scoped and will be removed when the Spark session ends.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Perform Unpivot in PySpark | Convert Columns to Rows Easily | PySpark Tutorial

How to Perform Unpivot in PySpark | Step-by-Step Guide

How to Perform Unpivot in PySpark | Convert Columns to Rows

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

In PySpark, unpivoting means converting columns into rows — the opposite of a pivot operation. While PySpark doesn't have a built-in unpivot() function, you can easily achieve this using selectExpr() and stack().

๐Ÿงพ Sample Dataset

We’ll work with a DataFrame that contains sales data across three years:

Name           Sales_2023   Sales_2024   Sales_2025
Aamir Shahzad     800           900          1000
Ali               500           600          1001
Raza              450           550          102
Bob               700           750          103
Lisa              620           None         109

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("UnpivotExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 800, 900, 1000),
    ("Ali", 500, 600, 1001),
    ("Raza", 450, 550, 102),
    ("Bob", 700, 750, 103),
    ("Lisa", 620, None, 109)
]

columns = ["Name", "Sales_2023", "Sales_2024", "Sales_2025"]

df = spark.createDataFrame(data, columns)
df.show()

✅ Expected Output

+-------------+-----------+-----------+-----------+
|         Name|Sales_2023|Sales_2024|Sales_2025|
+-------------+-----------+-----------+-----------+
|Aamir Shahzad|        800|        900|       1000|
|          Ali|        500|        600|       1001|
|         Raza|        450|        550|        102|
|          Bob|        700|        750|        103|
|         Lisa|        620|       null|        109|
+-------------+-----------+-----------+-----------+

๐Ÿ” Unpivot Using stack() and selectExpr()

Convert sales columns into rows using stack() for each year:

unpivotDF = df.selectExpr(
    "Name",
    "stack(3, '2023', Sales_2023, '2024', Sales_2024, '2025', Sales_2025) as (Year, Sales)"
)

unpivotDF.show()

✅ Expected Output

+-------------+----+-----+
|         Name|Year|Sales|
+-------------+----+-----+
|Aamir Shahzad|2023|  800|
|Aamir Shahzad|2024|  900|
|Aamir Shahzad|2025| 1000|
|          Ali|2023|  500|
|          Ali|2024|  600|
|          Ali|2025| 1001|
|         Raza|2023|  450|
|         Raza|2024|  550|
|         Raza|2025|  102|
|          Bob|2023|  700|
|          Bob|2024|  750|
|          Bob|2025|  103|
|         Lisa|2023|  620|
|         Lisa|2024| null|
|         Lisa|2025|  109|
+-------------+----+-----+

๐Ÿ“Œ Explanation

  • stack(3, ...) tells PySpark to generate 3 rows per input row.
  • Each pair of values (e.g., '2023', Sales_2023) is used to populate new rows.
  • The result is a normalized table format that's easier to process and analyze.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Use Pivot Function in PySpark | Transform and Summarize Data Easily | PySpark Tutorial

How to Use Pivot Function in PySpark | Step-by-Step Guide

How to Use pivot() Function in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The pivot() function in PySpark lets you rotate data in a DataFrame. It's used to transform rows into columns—helpful when summarizing data for reporting and analytics.

๐Ÿงพ Sample Dataset

Name            Year    Product     Revenue
Aamir Shahzad   2023    Product A   500
Aamir Shahzad   2023    Product B   300
Ali             2023    Product A   400
Raza            2023    Product B   200
Bob             2024    Product A   700
Lisa            2024    Product B   600
Ali             2024    Product A   300
Raza            2024    Product B   500
Aamir Shahzad   2024    Product A   800
Lisa            2023    Product A   650
Lisa            2025    Product A   650
Lisa            2026    Product C   1100

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("PivotFunctionExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 2023, "Product A", 500),
    ("Aamir Shahzad", 2023, "Product B", 300),
    ("Ali", 2023, "Product A", 400),
    ("Raza", 2023, "Product B", 200),
    ("Bob", 2024, "Product A", 700),
    ("Lisa", 2024, "Product B", 600),
    ("Ali", 2024, "Product A", 300),
    ("Raza", 2024, "Product B", 500),
    ("Aamir Shahzad", 2024, "Product A", 800),
    ("Lisa", 2023, "Product A", 650),
    ("Lisa", 2025, "Product A", 650),
    ("Lisa", 2026, "Product C", 1100)
]

columns = ["Name", "Year", "Product", "Revenue"]
df = spark.createDataFrame(data, columns)

# Show original DataFrame
df.show()

๐Ÿ“Š Pivot Example 1: Pivot on Product

Rotate the Product column into individual columns and summarize Revenue using sum().

from pyspark.sql.functions import sum

pivot_df = df.groupBy("Name", "Year").pivot("Product").agg(sum("Revenue"))
pivot_df.show()

✅ Expected Output

+-------------+----+---------+---------+---------+
|         Name|Year|Product A|Product B|Product C|
+-------------+----+---------+---------+---------+
|          Bob|2024|      700|     null|     null|
|Aamir Shahzad|2023|      500|      300|     null|
|Aamir Shahzad|2024|      800|     null|     null|
|          Ali|2023|      400|     null|     null|
|          Ali|2024|      300|     null|     null|
|         Raza|2023|     null|      200|     null|
|         Raza|2024|     null|      500|     null|
|         Lisa|2023|      650|     null|     null|
|         Lisa|2024|     null|      600|     null|
|         Lisa|2025|      650|     null|     null|
|         Lisa|2026|     null|     null|     1100|
+-------------+----+---------+---------+---------+

๐Ÿ“Š Pivot Example 2: Pivot on Year

Rotate the Year column to display annual revenue for each person across all years.

pivot_df_year = df.groupBy("Name").pivot("Year").agg(sum("Revenue"))
pivot_df_year.show()

✅ Expected Output

+-------------+----+----+----+----+
|         Name|2023|2024|2025|2026|
+-------------+----+----+----+----+
|          Bob|null| 700|null|null|
|Aamir Shahzad| 800| 800|null|null|
|          Ali| 400| 300|null|null|
|         Raza| 200| 500|null|null|
|         Lisa| 650| 600| 650|1100|
+-------------+----+----+----+----+

๐Ÿ“Œ Explanation

  • pivot("Product") creates a column for each product type.
  • pivot("Year") creates a column for each year.
  • agg(sum("Revenue")) aggregates revenue values during the pivot.
  • Missing values appear as null.

๐ŸŽฅ Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Get Column Names in PySpark DataFrames Using columns Function

How to Use columns Function in PySpark | Step-by-Step Guide

How to Use columns Function in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The columns attribute in PySpark is a quick and effective way to retrieve the list of column names from a DataFrame. This is useful when you're dynamically working with column names in big data pipelines.

๐Ÿ“Œ What is columns() in PySpark?

The columns attribute returns a Python list of column names from the DataFrame. You can use this list to inspect, iterate, or programmatically manipulate columns in your PySpark applications.

๐Ÿงพ Sample Dataset

Name           Department    Salary
Aamir Shahzad   IT            5000
Ali Raza        HR            4000
Bob             Finance       4500
Lisa            HR            4000

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("columnsFunctionExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "IT", 5000),
    ("Ali Raza", "HR", 4000),
    ("Bob", "Finance", 4500),
    ("Lisa", "HR", 4000)
]

# Create DataFrame
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

# Show DataFrame
df.show()

๐Ÿ“Š Using columns Attribute

# Get list of column names
print("List of columns in the DataFrame:")
print(df.columns)

# Loop through column names
print("\nLooping through columns:")
for col_name in df.columns:
    print(col_name)

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|         IT|  5000|
|     Ali Raza|         HR|  4000|
|          Bob|     Finance| 4500|
|         Lisa|         HR|  4000|
+-------------+-----------+------+

List of columns in the DataFrame:
['Name', 'Department', 'Salary']

Looping through columns:
Name
Department
Salary

๐Ÿ“Œ Explanation

  • df.columns returns a list of all column names in the DataFrame.
  • Useful for dynamic column operations like renaming, filtering, or applying transformations.
  • You can loop over df.columns for custom logic on each column.

๐ŸŽฅ Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

PySpark Tutorial: How to Use colRegex() for Column Selection

How to Use colRegex() Function in PySpark | Step-by-Step Guide

How to Use colRegex() Function in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The colRegex() function in PySpark allows you to select multiple columns using regular expressions. It is especially useful when dealing with DataFrames that have dynamic or similar column naming patterns.

๐Ÿ“Œ What is colRegex() in PySpark?

The colRegex() method is part of the DataFrame API. It enables you to use regular expressions to match and select column names based on patterns — such as prefix, suffix, or substring — instead of specifying each column manually.

๐Ÿงพ Sample Dataset

Name           Department    Salary     Country
Aamir Shahzad   IT            5000       US
Ali Raza        HR            4000       CA
Bob             Finance       4500       UK
Lisa            HR            4000       CA

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("colRegexExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "IT", 5000, "US"),
    ("Ali Raza", "HR", 4000, "CA"),
    ("Bob", "Finance", 4500, "UK"),
    ("Lisa", "HR", 4000, "CA")
]

# Create DataFrame
columns = ["Name", "Department", "Salary", "Country"]
df = spark.createDataFrame(data, columns)

# Show DataFrame
df.show()

๐Ÿ“Š Using colRegex() for Column Selection

# Example 1: Select columns starting with 'Dep'
df.select(df.colRegex("`^Dep.*`")).show()

# Example 2: Select columns ending with 'Name'
df.select(df.colRegex("`.*Name$`")).show()

# Example 3: Select columns containing 'try'
df.select(df.colRegex("`.*try.*`")).show()

✅ Expected Output - Example 1

+-----------+
|Department |
+-----------+
|IT         |
|HR         |
|Finance    |
|HR         |
+-----------+

✅ Expected Output - Example 2

+-------------+
|         Name|
+-------------+
|Aamir Shahzad|
|     Ali Raza|
|          Bob|
|         Lisa|
+-------------+

✅ Expected Output - Example 3

+--------+
|Country |
+--------+
|US      |
|CA      |
|UK      |
|CA      |
+--------+

๐Ÿ“Œ Explanation

  • ^Dep.* matches any column name starting with 'Dep'.
  • .*Name$ matches any column name ending with 'Name'.
  • .*try.* matches any column name that contains 'try' anywhere.

๐ŸŽฅ Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.