ETL Pipelines in PySpark — End-to-End Example
At DataVerse Labs, the Data Engineering team processes millions of daily transactions from various systems — API feeds, CSV exports, app logs, and real-time streams.
To unify all this, they built a PySpark ETL pipeline that:
- Extracts data from multiple sources
- Transforms it using business rules
- Loads it into a warehouse (Snowflake/Delta/Hive)
This chapter shows you a clean, professional, end-to-end ETL pipeline you can use in any enterprise environment.
1. ETL Architecture Overview
RAW DATA (CSV, JSON, DB) ↓
Spark Extraction
↓
Data Cleansing & Validation
↓
Business Transformations
↓
Aggregations
↓
Data Warehouse
(Snowflake/Delta/Hive)
The pipeline we build will simulate a Retail Orders ETL Pipeline.
2. Step 1 — Extract Phase
We load CSV, JSON, and Parquet files into Spark.
Example Input Files:
orders.csv
order_id,customer_id,amount,timestamp
O1,C101,200,2024-01-01 10:00:00
O2,C102,450,2024-01-01 10:15:00
O3,C103,,2024-01-01 10:40:00
customers.json
{"customer_id": "C101", "country": "USA"}
{"customer_id": "C102", "country": "India"}
{"customer_id": "C103", "country": "UK"}
Extraction Code
df_orders = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/data/orders.csv")
df_customers = spark.read.json("/data/customers.json")
df_orders.show()
df_customers.show()
Output (orders.csv)
+--------+-----------+------+-------------------+
|order_id|customer_id|amount|timestamp |
+--------+-----------+------+-------------------+
|O1 |C101 |200 |2024-01-01 10:00:00|
|O2 |C102 |450 |2024-01-01 10:15:00|
|O3 |C103 |null |2024-01-01 10:40:00|
+--------+-----------+------+-------------------+
3. Step 2 — Cleansing & Validation
3.1 Fix Null Values
from pyspark.sql.functions import col
df_orders_clean = df_orders.fillna({"amount": 0})
3.2 Validate Customer IDs
df_orders_valid = df_orders_clean.filter(col("customer_id").isNotNull())
4. Step 3 — Business Transformations
Join Orders with Customer Info
df_joined = df_orders_valid.join(
df_customers,
on="customer_id",
how="left"
)
Add Sales Category
from pyspark.sql.functions import when
df_transformed = df_joined.withColumn(
"sales_category",
when(col("amount") > 300, "HIGH")
.when(col("amount") > 100, "MEDIUM")
.otherwise("LOW")
)
Output Example
+-----------+--------+------+-------------------+-------+--------------+
|customer_id|order_id|amount|timestamp |country|sales_category|
+-----------+--------+------+-------------------+-------+--------------+
|C101 |O1 |200 |2024-01-01 10:00:00|USA |MEDIUM |
|C102 |O2 |450 |2024-01-01 10:15:00|India |HIGH |
|C103 |O3 |0 |2024-01-01 10:40:00|UK |LOW |
+-----------+--------+------+-------------------+-------+--------------+
5. Step 4 — Aggregation Layer
Daily Sales Summary
from pyspark.sql.functions import to_date, sum as spark_sum
df_daily_summary = df_transformed \
.withColumn("date", to_date("timestamp")) \
.groupBy("date", "country") \
.agg(spark_sum("amount").alias("total_sales"))
Output
+----------+-------+-----------+
|date |country|total_sales|
+----------+-------+-----------+
|2024-01-01|USA |200 |
|2024-01-01|India |450 |
|2024-01-01|UK |0 |
+----------+-------+-----------+
6. Step 5 — Load Phase
You may load into:
- Snowflake
- Delta Lake
- Hive
Example: Write to Delta Lake
df_transformed.write \
.format("delta") \
.mode("overwrite") \
.save("/warehouse/processed/orders")
Example: Write to Hive
df_daily_summary.write \
.mode("overwrite") \
.saveAsTable("dv_dw.daily_sales")
Example: Write to Snowflake
df_daily_summary.write \
.format("snowflake") \
.options(**sf_options) \
.option("dbtable", "DAILY_SALES") \
.mode("overwrite") \
.save()
7. Production-Grade ETL: Best Practices (SEO Summary)
✔ Use explicit schemas for large files
✔ Track ETL runs with audit columns (created_at, batch_id)
✔ Store raw → cleaned → curated layers separately
✔ Use Delta Lake for versioned tables
✔ Handle bad records using badRecordsPath
✔ Maintain checkpoints for streaming ETL
✔ Schedule ETL with Airflow, Oozie, or Databricks Jobs
Summary
In this chapter, you built an end-to-end ETL pipeline in PySpark:
✔ Extraction
- Load CSV, JSON, Parquet
✔ Transformation
- Cleansing
- Validation
- Business rules
- Joins & nested logic
✔ Loading
- Delta Lake
- Hive
- Snowflake
This pipeline mirrors what real enterprise data engineering teams deploy at scale.
Next Topic → Real Company Use Cases — Support, Development, & Production Workflows