Caching, Persisting & Memory Management in PySpark — Boost Performance at Scale
At NeoMart, some pipelines touch the same DataFrame multiple times:
- Computing total sales by region
- Filtering top products for dashboards
- Joining customer and transaction data repeatedly
Recomputing every time is expensive and slow.
Caching and persisting allow Spark to store intermediate results in memory or disk for faster reuse.
1. Why Cache & Persist?
- Spark lazy-evaluates transformations
- Each action triggers recomputation from the original source
- Repeated queries → expensive shuffles & scans
Caching avoids this by storing intermediate results, speeding up iterative algorithms and repeated queries.
2. Sample DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("NeoMart").getOrCreate()
data = [
(1, "Laptop", 1200),
(2, "Mouse", 25),
(3, "Monitor", 220),
(4, "Keyboard", 75),
(5, "Headset", 60)
]
df = spark.createDataFrame(data, ["product_id", "name", "price"])
3. Caching a DataFrame
# Cache DataFrame in memory (default)
df.cache()
# Perform action to materialize cache
df.count()
Story
NeoMart analysts repeatedly query df to generate multiple dashboards.
Caching avoids recomputation every time.
4. Persist with Storage Levels
persist() allows specifying storage level:
from pyspark import StorageLevel
# Persist in memory and disk
df.persist(StorageLevel.MEMORY_AND_DISK)
df.show()
Common Storage Levels
| StorageLevel | Description |
|---|---|
| MEMORY_ONLY | Store in memory only |
| MEMORY_AND_DISK | Store in memory, spill to disk if not enough space |
| DISK_ONLY | Store on disk only |
| MEMORY_ONLY_SER | Serialized memory, saves space |
| MEMORY_AND_DISK_SER | Serialized + spill to disk |
5. Unpersist / Release Memory
# Release memory when no longer needed
df.unpersist()
- Frees cached memory
- Prevents executor memory issues
- Recommended after large intermediate steps
6. Monitoring Cache & Memory
spark.catalog.cacheTable("df") # cache using table name
spark.catalog.isCached("df") # check if cached
- Spark UI → Storage tab shows memory usage, number of cached partitions
Story
NeoMart’s Spark UI shows which tables are cached, helping optimize memory usage.
7. Memory Management Tips
✔ Cache only frequently reused DataFrames ✔ Avoid caching huge tables unnecessarily → spills to disk slow down jobs ✔ Use persist(StorageLevel.MEMORY_AND_DISK) for very large datasets ✔ Unpersist intermediate DataFrames after use ✔ Monitor executor memory in Spark UI → avoid OOM errors ✔ Prefer columnar formats (Parquet, ORC) → reduces memory footprint
8. Example: Iterative Query with Cache
# Example: compute discounts multiple times
df_discounted = df.withColumn("discount_price", F.col("price")*0.9)
df_discounted.cache() # cache result
# Multiple actions
df_discounted.filter(F.col("discount_price") > 100).show()
df_discounted.agg(F.avg("discount_price")).show()
Without cache
- Spark recomputes
df_discountedfor every action
With cache
- Computed once → reused → faster analytics
9. Combining with SQL
df.createOrReplaceTempView("products")
spark.sql("CACHE TABLE products")
# Query multiple times
spark.sql("SELECT * FROM products WHERE price > 100").show()
spark.sql("SELECT AVG(price) FROM products").show()
Summary
Caching and persisting are essential for iterative or repeated operations:
cache()→ default memory cachingpersist()→ specify storage levels (memory/disk/serialized)unpersist()→ release memory- Monitor via Spark UI → Storage tab
- Use wisely to avoid OOM or unnecessary disk spills
NeoMart pipelines rely on caching and persisting to turn heavy transformations into lightning-fast analytics.
Next Topic → Shuffle, Narrow vs Wide Transformations