Skip to main content

Caching, Persisting & Memory Management in PySpark — Boost Performance at Scale

At NeoMart, some pipelines touch the same DataFrame multiple times:

  • Computing total sales by region
  • Filtering top products for dashboards
  • Joining customer and transaction data repeatedly

Recomputing every time is expensive and slow.
Caching and persisting allow Spark to store intermediate results in memory or disk for faster reuse.


1. Why Cache & Persist?

  • Spark lazy-evaluates transformations
  • Each action triggers recomputation from the original source
  • Repeated queries → expensive shuffles & scans

Caching avoids this by storing intermediate results, speeding up iterative algorithms and repeated queries.


2. Sample DataFrame

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("NeoMart").getOrCreate()

data = [
(1, "Laptop", 1200),
(2, "Mouse", 25),
(3, "Monitor", 220),
(4, "Keyboard", 75),
(5, "Headset", 60)
]

df = spark.createDataFrame(data, ["product_id", "name", "price"])

3. Caching a DataFrame

# Cache DataFrame in memory (default)
df.cache()

# Perform action to materialize cache
df.count()

Story

NeoMart analysts repeatedly query df to generate multiple dashboards. Caching avoids recomputation every time.


4. Persist with Storage Levels

persist() allows specifying storage level:

from pyspark import StorageLevel

# Persist in memory and disk
df.persist(StorageLevel.MEMORY_AND_DISK)

df.show()

Common Storage Levels

StorageLevelDescription
MEMORY_ONLYStore in memory only
MEMORY_AND_DISKStore in memory, spill to disk if not enough space
DISK_ONLYStore on disk only
MEMORY_ONLY_SERSerialized memory, saves space
MEMORY_AND_DISK_SERSerialized + spill to disk

5. Unpersist / Release Memory

# Release memory when no longer needed
df.unpersist()
  • Frees cached memory
  • Prevents executor memory issues
  • Recommended after large intermediate steps

6. Monitoring Cache & Memory

spark.catalog.cacheTable("df")  # cache using table name
spark.catalog.isCached("df") # check if cached
  • Spark UI → Storage tab shows memory usage, number of cached partitions

Story

NeoMart’s Spark UI shows which tables are cached, helping optimize memory usage.


7. Memory Management Tips

✔ Cache only frequently reused DataFrames ✔ Avoid caching huge tables unnecessarily → spills to disk slow down jobs ✔ Use persist(StorageLevel.MEMORY_AND_DISK) for very large datasets ✔ Unpersist intermediate DataFrames after use ✔ Monitor executor memory in Spark UI → avoid OOM errors ✔ Prefer columnar formats (Parquet, ORC) → reduces memory footprint


8. Example: Iterative Query with Cache

# Example: compute discounts multiple times
df_discounted = df.withColumn("discount_price", F.col("price")*0.9)

df_discounted.cache() # cache result

# Multiple actions
df_discounted.filter(F.col("discount_price") > 100).show()
df_discounted.agg(F.avg("discount_price")).show()

Without cache

  • Spark recomputes df_discounted for every action

With cache

  • Computed once → reused → faster analytics

9. Combining with SQL

df.createOrReplaceTempView("products")
spark.sql("CACHE TABLE products")

# Query multiple times
spark.sql("SELECT * FROM products WHERE price > 100").show()
spark.sql("SELECT AVG(price) FROM products").show()

Summary

Caching and persisting are essential for iterative or repeated operations:

  • cache() → default memory caching
  • persist() → specify storage levels (memory/disk/serialized)
  • unpersist() → release memory
  • Monitor via Spark UI → Storage tab
  • Use wisely to avoid OOM or unnecessary disk spills

NeoMart pipelines rely on caching and persisting to turn heavy transformations into lightning-fast analytics.


Next Topic → Shuffle, Narrow vs Wide Transformations