Shuffle, Narrow vs Wide Transformations in PySpark — Understand Data Movement
At NeoMart, understanding how Spark moves data is key:
- Joining millions of orders with customers
- Aggregating revenue by region
- Sorting logs by timestamp
Data movement can make or break performance.
This chapter explains narrow vs wide transformations, shuffles, and how to optimize them.
1. Narrow vs Wide Transformations
Transformations in Spark are either:
| Type | Description | Example |
|---|---|---|
| Narrow | Each partition depends only on data from the same partition | map(), filter(), union() |
| Wide | Each partition depends on multiple partitions → shuffle | groupByKey(), reduceByKey(), join() |
2. Sample DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("NeoMart").getOrCreate()
data = [
(1, "Electronics", 1200),
(2, "Mouse", 25),
(3, "Monitor", 220),
(4, "Keyboard", 75),
(5, "Headset", 60)
]
df = spark.createDataFrame(data, ["product_id", "category", "price"])
3. Narrow Transformations
Narrow transformations do not require shuffling:
# Filter and map columns
df_filtered = df.filter(F.col("price") > 100)
df_mapped = df_filtered.withColumn("price_tax", F.col("price")*1.1)
df_mapped.show()
Output
| product_id | category | price | price_tax |
|---|---|---|---|
| 1 | Electronics | 1200 | 1320.0 |
| 3 | Monitor | 220 | 242.0 |
✅ Key: Each partition can compute independently → fast
4. Wide Transformations (Cause Shuffle)
Wide transformations require data to move across partitions:
# Aggregation
df_grouped = df.groupBy("category").agg(F.sum("price").alias("total_price"))
df_grouped.show()
Output
| category | total_price |
|---|---|
| Electronics | 1200 |
| Mouse | 25 |
| Monitor | 220 |
| Keyboard | 75 |
| Headset | 60 |
- Spark may shuffle data to group by
category - Expensive for large datasets
Other wide transformations: join(), distinct(), repartition()
5. Understanding Shuffle
Shuffle = data movement across the cluster:
- Triggered by wide transformations
- Writes temporary files to disk → network transfer
- Can cause slow tasks or stragglers
6. Example: Join (Wide Transformation)
customers = spark.createDataFrame([
(1, "Alice"),
(2, "Bob"),
(3, "Charlie")
], ["product_id", "customer_name"])
# Wide join triggers shuffle
df_joined = df.join(customers, "product_id")
df_joined.show()
Output
| product_id | category | price | customer_name |
|---|---|---|---|
| 1 | Electronics | 1200 | Alice |
| 2 | Mouse | 25 | Bob |
| 3 | Monitor | 220 | Charlie |
7. Minimizing Shuffle
- Use narrow transformations whenever possible
- Filter data before wide transformations
- Use broadcast joins if one table is small
- Repartition wisely → avoid unnecessary reshuffle
from pyspark.sql.functions import broadcast
df_broadcast = df.join(broadcast(customers), "product_id")
- Broadcast → avoids shuffle → faster join
8. Visualizing Narrow vs Wide
- Narrow: single partition operations → local computation
- Wide: multiple partition dependency → shuffle across nodes
NeoMart uses this understanding to optimize large joins and aggregations, saving hours of compute.
9. Best Practices
✔ Identify wide transformations using Spark UI → DAG visualization
✔ Filter, project, and cache data before shuffles
✔ Prefer reduceByKey over groupByKey in RDDs
✔ Broadcast small dimension tables for joins
✔ Monitor shuffle spill → adjust executor memory
Summary
- Narrow transformations → no shuffle → faster
- Wide transformations → cause shuffle → expensive
- Understanding shuffle is critical for performance tuning
- Combine caching, partitioning, and broadcast joins to reduce shuffle overhead
NeoMart pipelines are optimized by minimizing wide transformations where possible and using smart memory strategies.
Next Topic → Spark UI & Job Debugging Techniques