π°οΈ Streaming with Kafka β Message Highways of Data City
Imagine a city where every action β a click, swipe, payment, or sensor reading β becomes a message speeding through ultra-fast highways.
These highways are managed by Apache Kafka, the cityβs transport department for events.
But raw events alone mean nothing.
Enter PySpark, the city's analyst, ready to read, understand, transform, and store these messages in real time.
Together they form one of the most powerful pipelines in Data Engineering.
π¦ What Is Kafka? (The Simple Way)
Kafka is:
- a real-time messaging system
- a high-throughput event pipeline
- a distributed log system
- a buffer between producers & consumers
Producers β Kafka Topics β Consumers (Spark)
Kafka stores messages in topics, split into partitions for parallel processing.
π― Why Spark + Kafka?
| Benefit | Explanation |
|---|---|
| β‘ High Throughput | Supports millions of events per second |
| π‘ Fault Tolerant | Handles crashes gracefully |
| π Scalability | Add partitions/consumers to scale horizontally |
| β± Event-Time Analytics | Spark adds windows, watermarks & event-time |
| π Ecosystem Integration | ML, ETL, batch, dashboards |
This combination is used by Netflix, Uber, PayPal, and Airbnb for real-time ML, fraud detection, and streaming ETL.
π§± Kafka Message Structure (Very Important)
Kafka provides:
key(binary)value(binary)topicpartitionoffsettimestamp
In most pipelines, you primarily parse the value field.
π§ Reading Kafka Streams (PySpark Structured Streaming)
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events-topic") \
.load()
df_parsed = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
β Spark automatically tracks Kafka offsets β Data arrives as a streaming DataFrame β Value is usually JSON, CSV, or delimited text
π§© Parsing Kafka Messages (JSON Example)
Most real-world Kafka pipelines send JSON messages.
from pyspark.sql.functions import *
schema = "user STRING, action STRING, amount DOUBLE, event_time TIMESTAMP"
json_df = df_parsed.select(
from_json(col("value"), schema).alias("data"),
"timestamp"
)
parsed = json_df.select("data.*", "timestamp")
β Now all fields become real DataFrame columns β Schema validation prevents broken data
π Using Event-Time & Watermarking
Kafka messages may arrive late. Spark needs a watermark to manage state.
result = parsed \
.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "user") \
.count()
β Handles late events β Prevents infinite memory usage β Enables time-based aggregation
π Transforming Kafka Data
cleaned = parsed \
.filter(col("user").isNotNull()) \
.withColumn("action_upper", upper(col("action")))
π€ Writing Back to Kafka
cleaned \
.selectExpr(
"CAST(user AS STRING) AS key",
"to_json(struct(*)) AS value"
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "processed-events") \
.option("checkpointLocation", "/tmp/kafka-checkpoint") \
.start()
β Supports exactly-once delivery β Automatically commits Kafka offsets
π Subscribing to Multiple Topics
.option("subscribe", "topic1,topic2")
Or use a pattern:
.option("subscribePattern", "events-*")
π§ Manual Offset Control (Advanced but Important)
.option("startingOffsets", "earliest")
Values you can use:
"earliest""latest"- A JSON of specific partitions
Perfect for reprocessing old data.
π¦ Kafka β Spark Parallelism
Kafka partitions = Spark parallelism.
Example:
Kafka topic has 12 partitions β Spark can process using 12 parallel tasks
If input is slow:
Increase partitions OR Increase consumer group instances
π Security (SSL & SASL)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/certs/kafka.jks")
Supports:
- SSL
- Kerberos
- SASL
- OAuth
π‘οΈ Checkpointing (Mandatory)
.option("checkpointLocation", "/path/to/checkpoints")
Checkpoint stores:
- Kafka offsets
- Aggregation state
- Watermark data
- Query progress
Without checkpoint β no exactly-once guarantees.
π§ͺ Testing Kafka + Spark Locally
Use:
nc -lk 9999
or Run a local Kafka:
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
π Production Best Practices
β Always use JSON with schemaβ
Avoid unstructured messages.
β Use watermarks with aggregationsβ
Prevent memory buildup.
β Tune partition countsβ
More partitions β more throughput.
β Use Delta/Parquet as sinkβ
Avoid console sink in production.
β Monitor lagβ
Kafka Consumer Lag = Health of your pipeline.
β Use autoscaling Spark clustersβ
Handle spikes automatically.
π οΈ Full Working Example (Clean, Ready-to-Use)
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([
StructField("user", StringType()),
StructField("action", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", TimestampType())
])
# Read from Kafka
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transactions") \
.load()
# Parse JSON
json = df.select(
from_json(col("value").cast("string"), schema).alias("data"),
"timestamp"
)
parsed = json.select("data.*", "timestamp")
# Aggregation with windows
agg = parsed.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "user") \
.agg(sum("amount").alias("total_spent"))
# Write back to Kafka
agg.selectExpr("CAST(user AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "user-spending") \
.option("checkpointLocation", "/checkpoint/kafka-pipeline") \
.start()
π Summary
Kafka + PySpark Streaming gives you:
- Real-time event ingestion
- Scalable, fault-tolerant pipelines
- Event-time windowing
- Exactly-once delivery
- JSON parsing & schema validation
- State management via watermarks
- High-throughput processing
This is the backbone of modern data engineering.