PySpark Melt Function Explained | Reshape & Unpivot DataFrames Step by Step | PySpark Tutorial

How to Use Melt Function in PySpark | Step-by-Step Guide

How to Use melt() Function in PySpark

Reshape & Unpivot DataFrames | Step-by-Step Guide

Learn how to reshape and unpivot PySpark DataFrames using a custom melt function. Ideal for data engineering and analytics workflows.

๐Ÿ“˜ Introduction

PySpark doesn’t have a native melt() function like pandas. However, we can mimic this behavior by combining explode(), struct(), and array(). This transformation is useful when converting a wide table (with many columns) into a long format (more rows, fewer columns).

๐Ÿ”ง PySpark Code Example

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create SparkSession
spark = SparkSession.builder.appName("PySpark Melt Function Example").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 85, 90, 88),
    ("Ali Raza", 78, 83, 89),
    ("Bob", 92, 95, 91),
    ("Lisa", 80, 87, 85)
]

columns = ["Name", "Math", "Science", "History"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

# Custom melt function
def melt(df, id_vars, value_vars, var_name="Subject", value_name="Score"):
    from pyspark.sql.functions import array, struct, explode, lit
    _vars_and_vals = array(*[
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars
    ])
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    cols = id_vars + [
        col("_vars_and_vals")[var_name].alias(var_name),
        col("_vars_and_vals")[value_name].alias(value_name)
    ]
    return _tmp.select(*cols)

# Apply melt
melted_df = melt(df, ["Name"], ["Math", "Science", "History"])
melted_df.show()

๐Ÿ“Š Original DataFrame Output

+-------------+----+-------+-------+
| Name        |Math|Science|History|
+-------------+----+-------+-------+
|Aamir Shahzad|  85|     90|     88|
|Ali Raza     |  78|     83|     89|
|Bob          |  92|     95|     91|
|Lisa         |  80|     87|     85|
+-------------+----+-------+-------+

๐Ÿ“Ž Melted (Unpivoted) DataFrame Output

+-------------+--------+-----+
| Name        |Subject |Score|
+-------------+--------+-----+
|Aamir Shahzad|   Math |   85|
|Aamir Shahzad|Science |   90|
|Aamir Shahzad|History |   88|
|Ali Raza     |   Math |   78|
|Ali Raza     |Science |   83|
|Ali Raza     |History |   89|
|Bob          |   Math |   92|
|Bob          |Science |   95|
|Bob          |History |   91|
|Lisa         |   Math |   80|
|Lisa         |Science |   87|
|Lisa         |History |   85|
+-------------+--------+-----+

๐Ÿ’ก Explanation

  • The melt function uses array() and struct() to combine columns and values into an array of structs.
  • explode() is used to flatten that array into rows.
  • This technique is equivalent to the pandas melt() and is very useful for reshaping DataFrames before writing to analytics layers.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

Author: Aamir Shahzad

© 2025 PySpark Tutorials. All rights reserved.

PySpark Tutorial : How to Use createGlobalTempView in PySpark | Share Views Across Sessions

How to Use createGlobalTempView() in PySpark | Step-by-Step Guide

How to Use createGlobalTempView() in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The createGlobalTempView() function in PySpark allows you to register a DataFrame as a global temporary view that can be accessed across multiple Spark sessions. Unlike createTempView(), it persists for the lifetime of the Spark application and is stored in the global_temp database.

๐Ÿงพ Sample Dataset

Name            Department     Salary
Aamir Shahzad   Engineering     5000
Ali             Sales           4000
Raza            Marketing       3500
Bob             Sales           4200
Lisa            Engineering     6000

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("createGlobalTempViewExample").getOrCreate()

data = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
df.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ“Œ Create a Global Temporary View

df.createOrReplaceGlobalTempView("employee_global_view")

๐Ÿ“Š Query the Global Temp View in Current Session

result1 = spark.sql("SELECT * FROM global_temp.employee_global_view")
result1.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ” Access the Global View from a New Session

new_session = SparkSession.builder.appName("AnotherSession").getOrCreate()

result2 = new_session.sql("SELECT * FROM global_temp.employee_global_view")
result2.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ“ˆ Aggregate Example: Average Salary by Department

result3 = spark.sql("""
  SELECT Department, AVG(Salary) AS Avg_Salary
  FROM global_temp.employee_global_view
  GROUP BY Department
""")
result3.show()

✅ Expected Output

+-----------+----------+
| Department|Avg_Salary|
+-----------+----------+
|  Marketing|    3500.0|
|Engineering|    5500.0|
|      Sales|    4100.0|
+-----------+----------+

๐Ÿ’ก Key Points

  • createGlobalTempView() enables view sharing across sessions.
  • The view must be accessed using the global_temp database.
  • Global temp views last until the Spark application terminates.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Use createTempView in PySpark | Run SQL Queries on DataFrames | PySpark Tutorial

How to Use createTempView() in PySpark | Step-by-Step Guide

How to Use createTempView() in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The createTempView() function registers a PySpark DataFrame as a temporary SQL view. This allows you to execute SQL queries using Spark SQL, combining the power of SQL with the flexibility of DataFrames.

๐Ÿงพ Sample Dataset

Name           Department     Salary
Aamir Shahzad   Engineering     5000
Ali             Sales           4000
Raza            Marketing       3500
Bob             Sales           4200
Lisa            Engineering     6000

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("createTempViewExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
df.show()

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

๐Ÿ“Œ Register DataFrame as a Temp View

# Register as temporary SQL view
df.createOrReplaceTempView("employee_view")

๐Ÿ“Š Run SQL Queries on the Temp View

Example 1: Select All Records

result1 = spark.sql("SELECT * FROM employee_view")
result1.show()

✅ Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Example 2: Filter by Department

result2 = spark.sql("""
  SELECT Name, Salary
  FROM employee_view
  WHERE Department = 'Sales'
""")
result2.show()

✅ Output

+-----+------+
| Name|Salary|
+-----+------+
|  Ali|  4000|
|  Bob|  4200|
+-----+------+

Example 3: Average Salary by Department

result3 = spark.sql("""
  SELECT Department, AVG(Salary) AS Avg_Salary
  FROM employee_view
  GROUP BY Department
""")
result3.show()

✅ Output

+-----------+----------+
| Department|Avg_Salary|
+-----------+----------+
|  Marketing|    3500.0|
|Engineering|    5500.0|
|      Sales|    4100.0|
+-----------+----------+

๐Ÿ’ก Key Points

  • createTempView() is used to register a DataFrame as a temporary view.
  • You can use spark.sql() to run standard SQL queries on that view.
  • The view is session-scoped and will be removed when the Spark session ends.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Perform Unpivot in PySpark | Convert Columns to Rows Easily | PySpark Tutorial

How to Perform Unpivot in PySpark | Step-by-Step Guide

How to Perform Unpivot in PySpark | Convert Columns to Rows

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

In PySpark, unpivoting means converting columns into rows — the opposite of a pivot operation. While PySpark doesn't have a built-in unpivot() function, you can easily achieve this using selectExpr() and stack().

๐Ÿงพ Sample Dataset

We’ll work with a DataFrame that contains sales data across three years:

Name           Sales_2023   Sales_2024   Sales_2025
Aamir Shahzad     800           900          1000
Ali               500           600          1001
Raza              450           550          102
Bob               700           750          103
Lisa              620           None         109

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("UnpivotExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 800, 900, 1000),
    ("Ali", 500, 600, 1001),
    ("Raza", 450, 550, 102),
    ("Bob", 700, 750, 103),
    ("Lisa", 620, None, 109)
]

columns = ["Name", "Sales_2023", "Sales_2024", "Sales_2025"]

df = spark.createDataFrame(data, columns)
df.show()

✅ Expected Output

+-------------+-----------+-----------+-----------+
|         Name|Sales_2023|Sales_2024|Sales_2025|
+-------------+-----------+-----------+-----------+
|Aamir Shahzad|        800|        900|       1000|
|          Ali|        500|        600|       1001|
|         Raza|        450|        550|        102|
|          Bob|        700|        750|        103|
|         Lisa|        620|       null|        109|
+-------------+-----------+-----------+-----------+

๐Ÿ” Unpivot Using stack() and selectExpr()

Convert sales columns into rows using stack() for each year:

unpivotDF = df.selectExpr(
    "Name",
    "stack(3, '2023', Sales_2023, '2024', Sales_2024, '2025', Sales_2025) as (Year, Sales)"
)

unpivotDF.show()

✅ Expected Output

+-------------+----+-----+
|         Name|Year|Sales|
+-------------+----+-----+
|Aamir Shahzad|2023|  800|
|Aamir Shahzad|2024|  900|
|Aamir Shahzad|2025| 1000|
|          Ali|2023|  500|
|          Ali|2024|  600|
|          Ali|2025| 1001|
|         Raza|2023|  450|
|         Raza|2024|  550|
|         Raza|2025|  102|
|          Bob|2023|  700|
|          Bob|2024|  750|
|          Bob|2025|  103|
|         Lisa|2023|  620|
|         Lisa|2024| null|
|         Lisa|2025|  109|
+-------------+----+-----+

๐Ÿ“Œ Explanation

  • stack(3, ...) tells PySpark to generate 3 rows per input row.
  • Each pair of values (e.g., '2023', Sales_2023) is used to populate new rows.
  • The result is a normalized table format that's easier to process and analyze.

๐ŸŽฅ Watch the Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Use Pivot Function in PySpark | Transform and Summarize Data Easily | PySpark Tutorial

How to Use Pivot Function in PySpark | Step-by-Step Guide

How to Use pivot() Function in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The pivot() function in PySpark lets you rotate data in a DataFrame. It's used to transform rows into columns—helpful when summarizing data for reporting and analytics.

๐Ÿงพ Sample Dataset

Name            Year    Product     Revenue
Aamir Shahzad   2023    Product A   500
Aamir Shahzad   2023    Product B   300
Ali             2023    Product A   400
Raza            2023    Product B   200
Bob             2024    Product A   700
Lisa            2024    Product B   600
Ali             2024    Product A   300
Raza            2024    Product B   500
Aamir Shahzad   2024    Product A   800
Lisa            2023    Product A   650
Lisa            2025    Product A   650
Lisa            2026    Product C   1100

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("PivotFunctionExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", 2023, "Product A", 500),
    ("Aamir Shahzad", 2023, "Product B", 300),
    ("Ali", 2023, "Product A", 400),
    ("Raza", 2023, "Product B", 200),
    ("Bob", 2024, "Product A", 700),
    ("Lisa", 2024, "Product B", 600),
    ("Ali", 2024, "Product A", 300),
    ("Raza", 2024, "Product B", 500),
    ("Aamir Shahzad", 2024, "Product A", 800),
    ("Lisa", 2023, "Product A", 650),
    ("Lisa", 2025, "Product A", 650),
    ("Lisa", 2026, "Product C", 1100)
]

columns = ["Name", "Year", "Product", "Revenue"]
df = spark.createDataFrame(data, columns)

# Show original DataFrame
df.show()

๐Ÿ“Š Pivot Example 1: Pivot on Product

Rotate the Product column into individual columns and summarize Revenue using sum().

from pyspark.sql.functions import sum

pivot_df = df.groupBy("Name", "Year").pivot("Product").agg(sum("Revenue"))
pivot_df.show()

✅ Expected Output

+-------------+----+---------+---------+---------+
|         Name|Year|Product A|Product B|Product C|
+-------------+----+---------+---------+---------+
|          Bob|2024|      700|     null|     null|
|Aamir Shahzad|2023|      500|      300|     null|
|Aamir Shahzad|2024|      800|     null|     null|
|          Ali|2023|      400|     null|     null|
|          Ali|2024|      300|     null|     null|
|         Raza|2023|     null|      200|     null|
|         Raza|2024|     null|      500|     null|
|         Lisa|2023|      650|     null|     null|
|         Lisa|2024|     null|      600|     null|
|         Lisa|2025|      650|     null|     null|
|         Lisa|2026|     null|     null|     1100|
+-------------+----+---------+---------+---------+

๐Ÿ“Š Pivot Example 2: Pivot on Year

Rotate the Year column to display annual revenue for each person across all years.

pivot_df_year = df.groupBy("Name").pivot("Year").agg(sum("Revenue"))
pivot_df_year.show()

✅ Expected Output

+-------------+----+----+----+----+
|         Name|2023|2024|2025|2026|
+-------------+----+----+----+----+
|          Bob|null| 700|null|null|
|Aamir Shahzad| 800| 800|null|null|
|          Ali| 400| 300|null|null|
|         Raza| 200| 500|null|null|
|         Lisa| 650| 600| 650|1100|
+-------------+----+----+----+----+

๐Ÿ“Œ Explanation

  • pivot("Product") creates a column for each product type.
  • pivot("Year") creates a column for each year.
  • agg(sum("Revenue")) aggregates revenue values during the pivot.
  • Missing values appear as null.

๐ŸŽฅ Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

How to Get Column Names in PySpark DataFrames Using columns Function

How to Use columns Function in PySpark | Step-by-Step Guide

How to Use columns Function in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The columns attribute in PySpark is a quick and effective way to retrieve the list of column names from a DataFrame. This is useful when you're dynamically working with column names in big data pipelines.

๐Ÿ“Œ What is columns() in PySpark?

The columns attribute returns a Python list of column names from the DataFrame. You can use this list to inspect, iterate, or programmatically manipulate columns in your PySpark applications.

๐Ÿงพ Sample Dataset

Name           Department    Salary
Aamir Shahzad   IT            5000
Ali Raza        HR            4000
Bob             Finance       4500
Lisa            HR            4000

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("columnsFunctionExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "IT", 5000),
    ("Ali Raza", "HR", 4000),
    ("Bob", "Finance", 4500),
    ("Lisa", "HR", 4000)
]

# Create DataFrame
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

# Show DataFrame
df.show()

๐Ÿ“Š Using columns Attribute

# Get list of column names
print("List of columns in the DataFrame:")
print(df.columns)

# Loop through column names
print("\nLooping through columns:")
for col_name in df.columns:
    print(col_name)

✅ Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|         IT|  5000|
|     Ali Raza|         HR|  4000|
|          Bob|     Finance| 4500|
|         Lisa|         HR|  4000|
+-------------+-----------+------+

List of columns in the DataFrame:
['Name', 'Department', 'Salary']

Looping through columns:
Name
Department
Salary

๐Ÿ“Œ Explanation

  • df.columns returns a list of all column names in the DataFrame.
  • Useful for dynamic column operations like renaming, filtering, or applying transformations.
  • You can loop over df.columns for custom logic on each column.

๐ŸŽฅ Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

PySpark Tutorial: How to Use colRegex() for Column Selection

How to Use colRegex() Function in PySpark | Step-by-Step Guide

How to Use colRegex() Function in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The colRegex() function in PySpark allows you to select multiple columns using regular expressions. It is especially useful when dealing with DataFrames that have dynamic or similar column naming patterns.

๐Ÿ“Œ What is colRegex() in PySpark?

The colRegex() method is part of the DataFrame API. It enables you to use regular expressions to match and select column names based on patterns — such as prefix, suffix, or substring — instead of specifying each column manually.

๐Ÿงพ Sample Dataset

Name           Department    Salary     Country
Aamir Shahzad   IT            5000       US
Ali Raza        HR            4000       CA
Bob             Finance       4500       UK
Lisa            HR            4000       CA

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("colRegexExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "IT", 5000, "US"),
    ("Ali Raza", "HR", 4000, "CA"),
    ("Bob", "Finance", 4500, "UK"),
    ("Lisa", "HR", 4000, "CA")
]

# Create DataFrame
columns = ["Name", "Department", "Salary", "Country"]
df = spark.createDataFrame(data, columns)

# Show DataFrame
df.show()

๐Ÿ“Š Using colRegex() for Column Selection

# Example 1: Select columns starting with 'Dep'
df.select(df.colRegex("`^Dep.*`")).show()

# Example 2: Select columns ending with 'Name'
df.select(df.colRegex("`.*Name$`")).show()

# Example 3: Select columns containing 'try'
df.select(df.colRegex("`.*try.*`")).show()

✅ Expected Output - Example 1

+-----------+
|Department |
+-----------+
|IT         |
|HR         |
|Finance    |
|HR         |
+-----------+

✅ Expected Output - Example 2

+-------------+
|         Name|
+-------------+
|Aamir Shahzad|
|     Ali Raza|
|          Bob|
|         Lisa|
+-------------+

✅ Expected Output - Example 3

+--------+
|Country |
+--------+
|US      |
|CA      |
|UK      |
|CA      |
+--------+

๐Ÿ“Œ Explanation

  • ^Dep.* matches any column name starting with 'Dep'.
  • .*Name$ matches any column name ending with 'Name'.
  • .*try.* matches any column name that contains 'try' anywhere.

๐ŸŽฅ Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

Difference Between Standard and Data-Driven Subscriptions in SSR

๐Ÿ“ฉ Blog: Difference Between Standard and Data-Driven Subscriptions in SSRS

When working with SSRS (SQL Server Reporting Services), delivering reports efficiently is critical. Microsoft SSRS provides two powerful subscription options to automate report delivery: Standard Subscriptions and Data-Driven Subscriptions. Here’s how they compare:

๐Ÿงพ What is a Standard Subscription?

A standard subscription is ideal for simple, fixed scenarios. You predefine the report parameters, recipients, delivery method (email, file share), and schedule. All recipients get the same report with the same data each time.

๐Ÿ”„ What is a Data-Driven Subscription?

Data-driven subscriptions are more flexible and dynamic. They fetch recipient details, parameters, and delivery settings from a query. You can personalize content and automate delivery to hundreds or thousands of recipients—each with tailored data.

๐Ÿ” Key Differences

  • Recipients: Fixed (Standard) vs Dynamic (Data-Driven)
  • Parameters: Static vs Custom per recipient
  • Delivery: Predefined vs Based on data query
  • Use Case: Small teams (Standard) vs Enterprise-scale reporting (Data-Driven)

๐ŸŽฏ Real-World Example

Imagine a regional sales report that needs to go out weekly:

  • Standard: All sales managers receive the same report.
  • Data-Driven: Each manager receives their region’s data only, based on a user/region mapping table.

๐Ÿ’ก Conclusion

Choose Standard Subscriptions for simple, static delivery, and Data-Driven Subscriptions when you need flexibility, scalability, and personalized automation at scale.

Credit: Some of the contents in this website are created with assistance from ChatGPT and Gemini.

How to Aggregate Data Using agg() Function in PySpark | PySpark Tutorial

How to Use agg() Function in PySpark | Step-by-Step Guide

How to Use agg() Function in PySpark | Step-by-Step Guide

Author: Aamir Shahzad

Published: March 2025

๐Ÿ“˜ Introduction

The agg() function in PySpark is used to apply multiple aggregate functions at once on grouped data. It is part of the DataFrame API and works in conjunction with the groupBy() method.

๐Ÿ“Œ What is agg() in PySpark?

The agg() method is ideal when you want to compute multiple statistics like sum(), avg(), min(), max(), and count() in a single transformation. It makes code cleaner and more efficient when working with grouped datasets.

๐Ÿงพ Sample Dataset

Name           Department    Salary
Aamir Shahzad   IT            5000
Ali Raza        HR            4000
Bob             Finance       4500
Lisa            HR            4000

๐Ÿ”ง Create DataFrame in PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, min, max, count

# Create Spark session
spark = SparkSession.builder.appName("AggFunctionExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "IT", 5000),
    ("Ali Raza", "HR", 4000),
    ("Bob", "Finance", 4500),
    ("Lisa", "HR", 4000)
]

# Create DataFrame
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

๐Ÿ“Š Apply Multiple Aggregations Using agg()

# Group by Department and apply multiple aggregations
agg_df = df.groupBy("Department").agg(
    sum("Salary").alias("Total_Salary"),
    avg("Salary").alias("Average_Salary"),
    min("Salary").alias("Min_Salary"),
    max("Salary").alias("Max_Salary"),
    count("Name").alias("Employee_Count")
)

# Show results
agg_df.show()

✅ Expected Output

+-----------+------------+--------------+----------+----------+--------------+
|Department |Total_Salary|Average_Salary|Min_Salary|Max_Salary|Employee_Count|
+-----------+------------+--------------+----------+----------+--------------+
|Finance    |        4500|        4500.0|      4500|      4500|             1|
|HR         |        8000|        4000.0|      4000|      4000|             2|
|IT         |        5000|        5000.0|      5000|      5000|             1|
+-----------+------------+--------------+----------+----------+--------------+

๐Ÿ“Œ Explanation

  • sum("Salary"): Total salary for each department
  • avg("Salary"): Average salary for each department
  • min("Salary"): Minimum salary in each department
  • max("Salary"): Maximum salary in each department
  • count("Name"): Number of employees in each department

๐ŸŽฅ Video Tutorial

Watch on YouTube

© 2025 Aamir Shahzad. All rights reserved.

Visit TechBrothersIT for more tutorials.

PySpark Tutorial: How to Use Cube for GroupBy and Aggregations

How to Use cube() Function in PySpark | Multi-dimensional Grouping and Aggregations

How to Use cube() Function in PySpark

Author: Aamir Shahzad

Published on: March 2025

Introduction

In this tutorial, you will learn how to use the cube() function in PySpark. The cube function is useful when performing multi-dimensional aggregations, similar to OLAP cube operations, providing powerful analytics capabilities.

What is cube() in PySpark?

The cube() function computes aggregates for all combinations of a group of columns, including subtotals and grand totals. It is ideal when you need multi-level aggregations in a single query, simplifying complex data analysis tasks.

Sample Dataset

Here is the sample dataset that we'll be using for this tutorial:

Name           Department    Salary
Aamir Shahzad   IT            5000
Ali Raza        HR            4000
Bob             Finance       4500
Lisa            HR            4000

Create DataFrame in PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Create Spark session
spark = SparkSession.builder.appName("CubeFunctionExample").getOrCreate()

# Sample data
data = [
    ("Aamir Shahzad", "IT", 5000),
    ("Ali Raza", "HR", 4000),
    ("Bob", "Finance", 4500),
    ("Lisa", "HR", 4000)
]

# Create DataFrame
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

# Show DataFrame
df.show()

Using cube() Function in PySpark

cube_df = df.cube("Department", "Name").agg(sum("Salary").alias("Total_Salary"))

# Show results
cube_df.orderBy("Department", "Name").show()

Expected Output

+-----------+--------------+------------+
|Department | Name         |Total_Salary|
+-----------+--------------+------------+
|Finance    | Bob          | 4500       |
|Finance    | null         | 4500       |
|HR         | Ali Raza     | 4000       |
|HR         | Lisa         | 4000       |
|HR         | null         | 8000       |
|IT         | Aamir Shahzad| 5000       |
|IT         | null         | 5000       |
|null       | null         | 17500      |
+-----------+--------------+------------+

Explanation

This cube function generates aggregations at all levels. For example:

  • Total salary by each Name within each Department
  • Total salary for each Department (with Name as null)
  • Grand total of all salaries (both Department and Name as null)

Watch the Video Tutorial

For a complete walkthrough of the cube() function in PySpark, check out this video tutorial:

© 2025 Aamir Shahzad. All rights reserved.

PySpark Tutorial: na Functions and isEmpty Explained with Examples

How to use na() and isEmpty() Function in PySpark | PySpark Tutorial

How to Use na() and isEmpty() Functions in PySpark

Author: Aamir Shahzad

Published on: March 2025

Introduction

In this blog post, you’ll learn how to use na() and isEmpty() functions in PySpark to handle missing data and validate whether a DataFrame is empty. These functions are crucial for data preprocessing and validation in big data pipelines.

What is na() in PySpark?

The na() function returns an object of DataFrameNaFunctions, which is used to handle null values in a DataFrame. Common methods include:

  • fill() - Replace null values with a specified value.
  • drop() - Remove rows containing null values.
  • replace() - Replace specific values.

Example: Using na() Function

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("PySpark_na_and_isEmpty").getOrCreate()

# Sample data with nulls
data = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", None, 4000),
    ("Raza", "Marketing", None),
    ("Bob", "Sales", 4200),
    ("Lisa", None, None)
]

columns = ["Name", "Department", "Salary"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show original DataFrame
df.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|       null|  4000|
|         Raza|  Marketing|  null|
|          Bob|      Sales|  4200|
|         Lisa|       null|  null|
+-------------+-----------+------+

Fill null values in Department and Salary columns

df_filled = df.na.fill({
    "Department": "Not Assigned",
    "Salary": 0
})

df_filled.show()

Expected Output

+-------------+-------------+------+
|         Name|   Department|Salary|
+-------------+-------------+------+
|Aamir Shahzad|   Engineering|  5000|
|          Ali| Not Assigned|  4000|
|         Raza|    Marketing|     0|
|          Bob|        Sales|  4200|
|         Lisa| Not Assigned|     0|
+-------------+-------------+------+

Drop rows with any null values

df_dropped = df.na.drop()
df_dropped.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Bob|      Sales|  4200|
+-------------+-----------+------+

Replace a specific value

df_replaced = df.na.replace("Sales", "Business Development")
df_replaced.show()

Expected Output

+-------------+----------------------+------+
|         Name|           Department |Salary|
+-------------+----------------------+------+
|Aamir Shahzad|           Engineering|  5000|
|          Ali|                  null|  4000|
|         Raza|             Marketing|  null|
|          Bob|Business Development  |  4200|
|         Lisa|                  null|  null|
+-------------+----------------------+------+

What is isEmpty() in PySpark?

The isEmpty() function checks whether a DataFrame is empty (has no rows). This is helpful to validate results of filters, joins, or transformations.

Example: Using isEmpty() Function

# Filter rows with Salary greater than 10000
df_filtered = df.filter(df.Salary > 10000)

# Check if DataFrame is empty
if df_filtered.isEmpty():
    print("The DataFrame is empty!")
else:
    df_filtered.show()

Expected Output

The DataFrame is empty!

Explanation: There are no rows in the DataFrame where Salary > 10000, so isEmpty() returns True.

Watch the Video Tutorial

For a complete walkthrough of the na() and isEmpty() functions in PySpark, check out the video tutorial below:

© 2025 Aamir Shahzad. All rights reserved.

How to Use Intersect() and IntersectAll() in PySpark | Compare DataFrames Easily | PySpark Tutorial

How to Use Intersect() and IntersectAll() Functions in PySpark | Compare DataFrames Easily

How to Use Intersect() and IntersectAll() Functions in PySpark

Author: Aamir Shahzad

Date: March 2025

Introduction

In this tutorial, you will learn how to use the intersect() and intersectAll() functions in PySpark to find common rows between two DataFrames. The intersect() function returns distinct common rows, while intersectAll() returns all common rows, including duplicates.

Why Use Intersect() and IntersectAll()?

  • intersect(): Returns distinct rows that are present in both DataFrames.
  • intersectAll(): Returns all rows, including duplicates, that are present in both DataFrames.
  • Helpful for comparing datasets and finding overlaps.

Step 1: Import SparkSession and Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkIntersectFunction") \
    .getOrCreate()

Step 2: Create Sample DataFrames

# DataFrame 1
data1 = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000),
    ("Aamir Shahzad", "Engineering", 5000)  # duplicate row
]

# DataFrame 2
data2 = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]

df1 = spark.createDataFrame(data1, schema=columns)
df2 = spark.createDataFrame(data2, schema=columns)

df1.show()

Expected Output (df1)

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
|Aamir Shahzad|Engineering|  5000|
+-------------+-----------+------+
df2.show()

Expected Output (df2)

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Step 3: Using intersect()

# Returns only distinct common rows
intersect_df = df1.intersect(df2)

intersect_df.show()

Expected Output (intersect_df)

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Note: Even though there is a duplicate row in df1 for "Aamir Shahzad", intersect() returns distinct matches only.

Step 4: Using intersectAll()

# Returns all common rows, including duplicates
intersect_all_df = df1.intersectAll(df2)

intersect_all_df.show()

Expected Output (intersectAll_df)

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|Aamir Shahzad|Engineering|  5000|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Explanation: - df1 has two records for "Aamir Shahzad". - df2 has one record for "Aamir Shahzad". - intersectAll() keeps duplicates from df1 that match the rows in df2.

Conclusion

The intersect() and intersectAll() functions in PySpark are useful for comparing two DataFrames and finding common rows. Use intersect() when you only need distinct matches, and intersectAll() when you want to include duplicates for more detailed comparisons.

Watch the Video Tutorial

For a complete walkthrough on using intersect() and intersectAll() functions in PySpark, watch the video below:

© 2025 Aamir Shahzad | PySpark Tutorials

PySpark exceptAll() Function Explained | Subtract and Find Differences Between DataFrames | PySpark Tutorial

How to Use exceptAll() Function in PySpark | Subtract DataFrames with Duplicates

How to Use exceptAll() Function in PySpark

Author: Aamir Shahzad

Date: March 2025

Introduction

The exceptAll() function in PySpark returns the rows from one DataFrame that are not present in another DataFrame. Unlike except(), it keeps duplicate rows. This makes exceptAll() a powerful tool when you want to subtract one DataFrame from another while preserving duplicates.

Why Use exceptAll()?

  • Removes records present in the second DataFrame from the first one.
  • Keeps duplicates from the first DataFrame that don't have matches in the second.
  • Useful when you need an exact subtraction while considering duplicates.

Step 1: Import SparkSession and Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkExceptAllFunction") \
    .getOrCreate()

Step 2: Create Sample DataFrames

# Sample data for DataFrame 1
data1 = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000),
    ("Aamir Shahzad", "Engineering", 5000)
]

# Sample data for DataFrame 2
data2 = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]

# Create DataFrames
df1 = spark.createDataFrame(data1, schema=columns)
df2 = spark.createDataFrame(data2, schema=columns)

df1.show()

Expected Output (df1)

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
|Aamir Shahzad|Engineering|  5000|
+-------------+-----------+------+
df2.show()

Expected Output (df2)

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Step 3: Apply exceptAll() Function

# Subtract df2 from df1 using exceptAll()
result_df = df1.exceptAll(df2)

result_df.show()

Expected Output (result_df)

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
+-------------+-----------+------+

Explanation: - df1 had two identical rows for Aamir Shahzad, Engineering, 5000. - df2 had one row for Aamir Shahzad, Engineering, 5000, so only one of them was removed. - The row for Lisa was fully removed since it was present once in both dataframes.

Conclusion

The exceptAll() function in PySpark is useful for subtracting one DataFrame from another while keeping duplicates intact. It's essential when you need to perform data comparison operations that consider multiple occurrences of records. Make sure the schemas of both DataFrames match when using this function.

Watch the Video Tutorial

For a complete walkthrough on using the exceptAll() function in PySpark, check out this video tutorial:

© 2025 Aamir Shahzad | PySpark Tutorials

PySpark Tutorial: first(), head(), and tail() Functions Explained with Examples

How to Use first(), head(), and tail() Functions in PySpark | Step-by-Step Guide

How to Use first(), head(), and tail() Functions in PySpark

Author: Aamir Shahzad

Date: March 2025

Introduction

In PySpark, the functions first(), head(), and tail() are used to retrieve specific rows from a DataFrame. These functions are particularly useful for inspecting data, debugging, and performing quick checks.

Why Use These Functions?

  • first() returns the first row of the DataFrame.
  • head(n) returns the first n rows of the DataFrame as a list of Row objects.
  • tail(n) returns the last n rows of the DataFrame as a list of Row objects.

Step 1: Create SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkFirstHeadTailFunctions") \
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)

df.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Step 3: Using head() Function

# Get the first 3 rows using head()
head_rows = df.head(3)

# Print each row
for row in head_rows:
    print(row)

Expected Output

Row(Name='Aamir Shahzad', Department='Engineering', Salary=5000)
Row(Name='Ali', Department='Sales', Salary=4000)
Row(Name='Raza', Department='Marketing', Salary=3500)

Step 4: Using first() Function

# Get the first row
first_row = df.first()

# Print the first row
print(first_row)

Expected Output

Row(Name='Aamir Shahzad', Department='Engineering', Salary=5000)

Step 5: Using tail() Function

# Get the last 2 rows
tail_rows = df.tail(2)

# Print each row
for row in tail_rows:
    print(row)

Expected Output

Row(Name='Bob', Department='Sales', Salary=4200)
Row(Name='Lisa', Department='Engineering', Salary=6000)

Conclusion

PySpark provides several functions to access rows in a DataFrame. first(), head(), and tail() are simple yet powerful tools for data inspection and debugging. Understanding their differences helps in retrieving data more effectively during data processing tasks.

Watch the Video Tutorial

For a complete walkthrough of first(), head(), and tail() functions in PySpark, check out this video tutorial:

© 2025 Aamir Shahzad | PySpark Tutorials

PySpark Tutorial: groupBy() Function | Group & Summarize DataFrames

How to Use groupBy() in PySpark | Aggregate & Summarize DataFrames

How to Use groupBy() in PySpark

Author: Aamir Shahzad

Date: March 2025

Introduction

The groupBy() function in PySpark is used to group rows based on one or more columns and perform aggregate functions like count, sum, avg, min, max, etc. It works similarly to SQL GROUP BY.

Step 1: Import SparkSession and Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkGroupByFunction") \
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    ("Aamir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)

df.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Step 3: groupBy() Example 1 - Count Employees in Each Department

df.groupBy("Department").count().show()

Expected Output

+-----------+-----+
| Department|count|
+-----------+-----+
|Engineering|    2|
|  Marketing|    1|
|      Sales|    2|
+-----------+-----+

Step 4: groupBy() Example 2 - Total Salary by Department

from pyspark.sql.functions import sum

df.groupBy("Department") \
  .agg(sum("Salary").alias("Total_Salary")) \
  .show()

Expected Output

+-----------+------------+
| Department|Total_Salary|
+-----------+------------+
|Engineering|        11000|
|  Marketing|         3500|
|      Sales|         8200|
+-----------+------------+

Step 5: groupBy() Example 3 - Average, Min, and Max Salary by Department

from pyspark.sql.functions import avg, min, max

df.groupBy("Department") \
  .agg(
      avg("Salary").alias("Average_Salary"),
      min("Salary").alias("Min_Salary"),
      max("Salary").alias("Max_Salary")
  ).show()

Expected Output

+-----------+--------------+----------+----------+
| Department|Average_Salary|Min_Salary|Max_Salary|
+-----------+--------------+----------+----------+
|Engineering|        5500.0|      5000|      6000|
|  Marketing|        3500.0|      3500|      3500|
|      Sales|        4100.0|      4000|      4200|
+-----------+--------------+----------+----------+

Step 6: groupBy() Example 4 - Group By Name and Department

df.groupBy("Name", "Department") \
  .sum("Salary") \
  .show()

Expected Output

+-------------+-----------+-----------+
|         Name| Department|sum(Salary)|
+-------------+-----------+-----------+
|Aamir Shahzad|Engineering|       5000|
|          Ali|      Sales|       4000|
|         Raza|  Marketing|       3500|
|          Bob|      Sales|       4200|
|         Lisa|Engineering|       6000|
+-------------+-----------+-----------+

Conclusion

Using groupBy() in PySpark allows you to aggregate and summarize data effectively. You can combine it with various aggregate functions to perform complex data analysis directly on your Spark DataFrames.

Watch the Video Tutorial

For a complete walkthrough of groupBy() in PySpark, check out the video tutorial below:

© 2025 Aamir Shahzad | PySpark Tutorials

PySpark Tutorial: filter() vs where() | Filter DataFrames

How to Use filter() or where() Function in PySpark | Step-by-Step Guide

How to Use filter() or where() Function in PySpark

Author: Aamir Shahzad

Date: March 2025

Introduction

In this tutorial, you will learn how to use the filter() and where() functions in PySpark to filter rows in a DataFrame. These functions are essential for data manipulation and play a critical role in transforming datasets for analysis or machine learning tasks.

Why Use filter() and where() in PySpark?

  • Both functions are used to filter rows based on a condition.
  • They return a new DataFrame that satisfies the given condition.
  • filter() and where() are equivalent and work the same way in PySpark.

Step 1: Import SparkSession and Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkFilterFunction") \
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    ("Amir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Bob", "Sales", 4200),
    ("Lisa", "Engineering", 6000)
]

columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)

df.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Step 3: filter() Example 1 - Filter Rows Where Department is 'Sales'

df.filter(df.Department == "Sales").show()

Expected Output

+----+----------+------+
|Name|Department|Salary|
+----+----------+------+
| Ali|     Sales|  4000|
| Bob|     Sales|  4200|
+----+----------+------+

Step 4: filter() Example 2 - Filter Rows Where Salary is Greater Than 4000

df.filter(df.Salary > 4000).show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Step 5: filter() Example 3 - Filter Rows with Multiple Conditions (AND)

df.filter((df.Department == "Engineering") & (df.Salary > 5000)).show()

Expected Output

+----+-----------+------+
|Name| Department|Salary|
+----+-----------+------+
|Lisa|Engineering|  6000|
+----+-----------+------+

Step 6: filter() Example 4 - Filter Rows with Multiple Conditions (OR)

df.filter((df.Department == "Sales") | (df.Salary > 5000)).show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|          Ali|      Sales|  4000|
|          Bob|      Sales|  4200|
|         Lisa|Engineering|  6000|
+-------------+-----------+------+

Step 7: Bonus - filter() with isin() Function

df.filter(df.Name.isin("Amir Shahzad", "Raza")).show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|         Raza|  Marketing|  3500|
+-------------+-----------+------+

Step 8: Using where() Instead of filter()

You can use where() in the same way as filter().

df.where(df.Department == "Sales").show()

Expected Output

+----+----------+------+
|Name|Department|Salary|
+----+----------+------+
| Ali|     Sales|  4000|
| Bob|     Sales|  4200|
+----+----------+------+

Conclusion

In PySpark, both filter() and where() are used to filter DataFrame rows based on given conditions. They are functionally identical, and you can use whichever makes your code more readable.

Watch the Video Tutorial

For a complete walkthrough of using filter() and where() functions in PySpark, check out this video:

© 2025 Aamir Shahzad | PySpark Tutorials

PySpark Tutorial: fillna() Function to Replace Null or Missing Values | #PySparkTutorial #PySpark

How to Use fillna() Function in PySpark | Step-by-Step Guide

How to Use fillna() Function in PySpark

Author: Aamir Shahzad

Date: March 2025

Introduction

In this tutorial, we will learn how to handle missing or null values in PySpark DataFrames using the fillna() function. Handling missing data is a critical part of data cleaning in data engineering workflows.

Why Use fillna() in PySpark?

  • Replace NULL values in DataFrame columns with specific values.
  • Apply different replacement values to different columns.
  • Clean your dataset before analysis or feeding it into machine learning models.

Step 1: Import SparkSession and Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySparkFillnaFunction") \
    .getOrCreate()

Step 2: Create a Sample DataFrame

data = [
    ("Amir Shahzad", "Engineering", 5000),
    ("Ali", None, 4000),
    ("Raza", "Marketing", None),
    (None, "Sales", 4500),
    ("Ali", None, None)
]

columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)

df.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|          Ali|       null|  4000|
|         Raza|  Marketing|  null|
|         null|      Sales|  4500|
|          Ali|       null|  null|
+-------------+-----------+------+

Step 3: Fill All NULL Values

Fill all NULL values with 'Unknown' for string columns and 0 for numeric columns.

df_fill_all = df.fillna("Unknown").fillna(0)

df_fill_all.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|          Ali|    Unknown|  4000|
|         Raza|  Marketing|     0|
|      Unknown|      Sales|  4500|
|          Ali|    Unknown|     0|
+-------------+-----------+------+

Step 4: Fill NULLs with Column-Specific Values

df_fill_columns = df.fillna({
    "Department": "NA",
    "Salary": 10000
})

df_fill_columns.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|          Ali|         NA|  4000|
|         Raza|  Marketing| 10000|
|         null|      Sales|  4500|
|          Ali|         NA| 10000|
+-------------+-----------+------+

Step 5: Fill NULLs in a Specific Column Only

df_fill_name = df.fillna("No Name", subset=["Name"])

df_fill_name.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|          Ali|       null|  4000|
|         Raza|  Marketing|  null|
|      No Name|      Sales|  4500|
|          Ali|       null|  null|
+-------------+-----------+------+

Conclusion

Handling null and missing values is an essential part of data processing in PySpark. The fillna() function provides a simple and flexible way to replace these values, ensuring your data is clean and ready for further analysis or modeling.

Watch the Video Tutorial

Difference Between union() and unionAll() in PySpark | union() vs unionAll() | PySpark Tutorial

Difference Between union() and unionAll() in PySpark | Step-by-Step Guide

Difference Between union() and unionAll() in PySpark

In this PySpark tutorial, you'll learn how to use union() and unionAll() functions to combine two DataFrames. These functions are essential for merging datasets in big data processing using Apache Spark.

What is union()?

The union() function merges two DataFrames with the same schema and returns a new DataFrame. It includes all rows, and you can apply distinct() to remove duplicates.

What is unionAll()?

The unionAll() function works similarly to union() in modern versions of PySpark. It includes all rows, even duplicates, without the need for distinct().

Why Use union() and unionAll()?

  • To combine data from different sources
  • To merge datasets with the same schema
  • For data aggregation and analysis

Example: union() and unionAll() in PySpark

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("PySparkUnionExample").getOrCreate()

# Create DataFrame 1
data1 = [("Aamir Shahzad", "Engineering", 5000),
         ("Ali", "Sales", 4000)]

columns = ["Name", "Department", "Salary"]

df1 = spark.createDataFrame(data1, schema=columns)

# Create DataFrame 2
data2 = [("Raza", "Marketing", 3500),
         ("Ali", "Sales", 4000)]  # Duplicate row for demonstration

df2 = spark.createDataFrame(data2, schema=columns)

DataFrame 1

df1.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
+-------------+-----------+------+

DataFrame 2

df2.show()

Expected Output

+-----+-----------+------+
| Name| Department|Salary|
+-----+-----------+------+
| Raza|  Marketing|  3500|
|  Ali|      Sales|  4000|
+-----+-----------+------+

union() Example (Apply distinct() for Unique Rows)

df_union = df1.union(df2).distinct()
df_union.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
+-------------+-----------+------+

unionAll() Example (Includes Duplicates)

df_unionAll = df1.unionAll(df2)
df_unionAll.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Aamir Shahzad|Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|          Ali|      Sales|  4000|
+-------------+-----------+------+

Row Count Comparison

print("Row count after union (with distinct):", df_union.count())
print("Row count after unionAll (with duplicates):", df_unionAll.count())

Expected Output

Row count after union (with distinct): 3
Row count after unionAll (with duplicates): 4

Key Points to Remember

  • Both DataFrames must have the same schema for union() and unionAll().
  • union() returns a DataFrame with all rows; apply distinct() if needed to remove duplicates.
  • unionAll() (legacy) includes all duplicates; in modern PySpark, union() behaves like unionAll().

Watch the Full Tutorial on YouTube

How to use PySpark count() Function | Count Rows & Records Easily | PySpark Tutorial

How to Use count() Function in PySpark | Step-by-Step Guide

How to Use count() Function in PySpark

The count() function in PySpark returns the number of rows in a DataFrame. In this tutorial, you'll learn how to use count(), distinct().count(), and groupBy().count() with examples and expected outputs.

1. Import SparkSession and Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySparkCountFunction").getOrCreate()

2. Create Sample Data

data = [
    ("Amir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000),
    ("Raza", "Marketing", 3500),
    ("Amir Shahzad", "Engineering", 5000),
    ("Ali", "Sales", 4000)
]

3. Define Schema

columns = ["Name", "Department", "Salary"]

4. Create a DataFrame

df = spark.createDataFrame(data, schema=columns)

5. Show the DataFrame

df.show()

Expected Output

+-------------+-----------+------+
|         Name| Department|Salary|
+-------------+-----------+------+
|Amir Shahzad |Engineering|  5000|
|          Ali|      Sales|  4000|
|         Raza|  Marketing|  3500|
|Amir Shahzad |Engineering|  5000|
|          Ali|      Sales|  4000|
+-------------+-----------+------+

6. count() - Total Number of Rows (Including Duplicates)

total_rows = df.count()
print("Total number of rows:", total_rows)

Expected Output

Total number of rows: 5

7. distinct().count() - Counts Unique Rows

distinct_rows = df.distinct().count()
print("Number of distinct rows:", distinct_rows)

Expected Output

Number of distinct rows: 3

8. groupBy() + count() - Count Occurrences of Each Name

df.groupBy("Name").count().show()

Expected Output

+-------------+-----+
|         Name|count|
+-------------+-----+
|         Raza|    1|
|          Ali|    2|
|Amir Shahzad |    2|
+-------------+-----+

Conclusion

In this tutorial, you have learned how to use the count() function in PySpark to get the total number of rows, unique rows with distinct().count(), and count occurrences by grouping data using groupBy().count().

Watch the Video Tutorial