Skip to main content

Data Aggregation in PySpark DataFrames

Data aggregation is one of the most powerful operations in PySpark.
Whether you're analyzing shipments, sales, demand planning, logs, or events — aggregation is essential for summarizing large datasets.

This guide walks through groupBy, aggregations, sorting, and built-in PySpark aggregation functions with a real shipments dataset.


Loading the Shipments Dataset

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('agg').getOrCreate()

df = spark.read.csv('/path/to/shipments.csv', header=True, inferSchema=True)

✔️ Why This Is Needed

To start a Spark session, load the dataset, and auto-infer column types.


Preview & Schema

df.show()

Example dataset:

+----------+--------+----------+-----+-----+
|ShipmentID|Company | Product |Units|Sales|
+----------+--------+----------+-----+-----+
| S001 | FedEx | Soap | 100 |1500 |
| S002 | FedEx | Shampoo | 200 |3000 |
| S003 |BlueDart| Bread | 150 |1800 |
| S004 | DHL |Toothpaste| 120 |2400 |
| S005 | DHL | Rice | 300 |6000 |
| S006 |BlueDart|Chocolate | 180 |3600 |
| S007 | FedEx | Juice | 130 |2600 |
| S008 | DHL | Cereal | 220 |4400 |
| S009 |BlueDart| Soda | 110 |2200 |
| S010 | FedEx |Facewash | 140 |2800 |
+----------+--------+----------+-----+-----+

Schema:

df.printSchema()

Output:

root
|-- ShipmentID: string (nullable = true)
|-- Company: string (nullable = true)
|-- Product: string (nullable = true)
|-- Units: integer (nullable = true)
|-- Sales: integer (nullable = true)

Aggregation Examples

1. Group by Company (no aggregation yet)

df.groupBy("Company")

Returns:

GroupedData object — must apply aggregation such as .count(), .avg(), .max(), etc.

✔️ Why We Need This

To prepare the data for summarization by company.


2. Average of all numeric columns per Company

df.groupBy("Company").mean().show()

Output:

+--------+-----------+-----------+
|Company |avg(Units) |avg(Sales) |
+--------+-----------+-----------+
|BlueDart| 146.6667 | 2533.3333 |
| FedEx| 142.5 | 2475.0 |
| DHL| 213.3333 | 4266.6667 |
+--------+-----------+-----------+

✔️ Why We Need This

To quickly compute per-company performance metrics.


3. Count of shipments per Company

df.groupBy("Company").count().show()

Output:

+--------+-----+
|Company |count|
+--------+-----+
|BlueDart| 3 |
| FedEx| 4 |
| DHL| 3 |
+--------+-----+

✔️ Why We Need This

To understand shipment volume distribution.


4. Maximum values per Company

df.groupBy("Company").max().show()

Output:

+--------+-----------+-----------+
|Company |max(Units) |max(Sales) |
+--------+-----------+-----------+
|BlueDart| 180 | 3600 |
| FedEx| 200 | 3000 |
| DHL| 300 | 6000 |
+--------+-----------+-----------+

5. Total sales across all rows

df.agg({'Sales': 'sum'}).show()

Output:

+----------+
|sum(Sales)|
+----------+
| 30300 |
+----------+

6. Maximum sale value

df.agg({'Sales': 'max'}).show()

Output:

+----------+
|max(Sales)|
+----------+
| 6000 |
+----------+

7. Max Sales per Company (group then aggregate)

group_data = df.groupBy("Company")
group_data.agg({'Sales': 'max'}).show()

Output:

+--------+----------+
|Company |max(Sales)|
+--------+----------+
|BlueDart| 3600 |
| FedEx| 3000 |
| DHL| 6000 |
+--------+----------+

Using Built-In Functions

8. Average Sales using avg()

from pyspark.sql.functions import avg

df.select(avg('Sales').alias('Average Sales')).show()

Output:

+-------------+
|Average Sales|
+-------------+
| 3030.0 |
+-------------+

9. Count distinct Sales values

from pyspark.sql.functions import countDistinct

df.select(countDistinct('Sales')).show()

Output:

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
| 10 |
+---------------------+

Sorting & Ordering Data

10. Sort by Sales (ascending)

df.orderBy('Sales').show()

11. Sort by Sales (descending)

df.orderBy(df['Sales'].desc()).show()

🟦 1-Minute Summary — PySpark Data Aggregation

Code SnippetWhat It Does
df.groupBy("Company")Groups rows by company
groupBy().mean()Calculates average of numeric columns
groupBy().count()Counts number of shipments per company
groupBy().max()Gets max of each numeric column
df.agg({'Sales': 'sum'})Total sales across dataset
df.agg({'Sales': 'max'})Maximum sale value
groupBy().agg({'Sales': 'max'})Max sales per company
avg(), countDistinct()Built-in aggregation functions
orderBy('Sales')Sorts ascending
orderBy(df['Sales'].desc())Sorts descending

Next, we’ll explore Window Functions in PySpark DataFrames, enabling running totals, rankings, and time-based calculations.

Career