Skip to main content

πŸ›°οΈ Streaming with Kafka β€” Message Highways of Data City

Imagine a city where every action β€” a click, swipe, payment, or sensor reading β€” becomes a message speeding through ultra-fast highways.
These highways are managed by Apache Kafka, the city’s transport department for events.

But raw events alone mean nothing.

Enter PySpark, the city's analyst, ready to read, understand, transform, and store these messages in real time.

Together they form one of the most powerful pipelines in Data Engineering.


🚦 What Is Kafka? (The Simple Way)

Kafka is:

  • a real-time messaging system
  • a high-throughput event pipeline
  • a distributed log system
  • a buffer between producers & consumers

Producers β†’ Kafka Topics β†’ Consumers (Spark)

Kafka stores messages in topics, split into partitions for parallel processing.


🎯 Why Spark + Kafka?

BenefitExplanation
⚑ High ThroughputSupports millions of events per second
πŸ›‘ Fault TolerantHandles crashes gracefully
πŸ”„ ScalabilityAdd partitions/consumers to scale horizontally
⏱ Event-Time AnalyticsSpark adds windows, watermarks & event-time
πŸ”— Ecosystem IntegrationML, ETL, batch, dashboards

This combination is used by Netflix, Uber, PayPal, and Airbnb for real-time ML, fraud detection, and streaming ETL.


🧱 Kafka Message Structure (Very Important)

Kafka provides:

  • key (binary)
  • value (binary)
  • topic
  • partition
  • offset
  • timestamp

In most pipelines, you primarily parse the value field.


πŸ”§ Reading Kafka Streams (PySpark Structured Streaming)

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events-topic") \
.load()

df_parsed = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")

βœ” Spark automatically tracks Kafka offsets βœ” Data arrives as a streaming DataFrame βœ” Value is usually JSON, CSV, or delimited text


🧩 Parsing Kafka Messages (JSON Example)

Most real-world Kafka pipelines send JSON messages.

from pyspark.sql.functions import *

schema = "user STRING, action STRING, amount DOUBLE, event_time TIMESTAMP"

json_df = df_parsed.select(
from_json(col("value"), schema).alias("data"),
"timestamp"
)

parsed = json_df.select("data.*", "timestamp")

βœ” Now all fields become real DataFrame columns βœ” Schema validation prevents broken data


πŸ•’ Using Event-Time & Watermarking

Kafka messages may arrive late. Spark needs a watermark to manage state.

result = parsed \
.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "user") \
.count()

βœ” Handles late events βœ” Prevents infinite memory usage βœ” Enables time-based aggregation


πŸ“ Transforming Kafka Data

cleaned = parsed \
.filter(col("user").isNotNull()) \
.withColumn("action_upper", upper(col("action")))

πŸ“€ Writing Back to Kafka

cleaned \
.selectExpr(
"CAST(user AS STRING) AS key",
"to_json(struct(*)) AS value"
) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "processed-events") \
.option("checkpointLocation", "/tmp/kafka-checkpoint") \
.start()

βœ” Supports exactly-once delivery βœ” Automatically commits Kafka offsets


πŸ“ Subscribing to Multiple Topics

.option("subscribe", "topic1,topic2")

Or use a pattern:

.option("subscribePattern", "events-*")

🧭 Manual Offset Control (Advanced but Important)

.option("startingOffsets", "earliest")

Values you can use:

  • "earliest"
  • "latest"
  • A JSON of specific partitions

Perfect for reprocessing old data.


πŸ“¦ Kafka β†’ Spark Parallelism

Kafka partitions = Spark parallelism.

Example:

Kafka topic has 12 partitions β†’ Spark can process using 12 parallel tasks

If input is slow:

Increase partitions OR Increase consumer group instances


πŸ” Security (SSL & SASL)

.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/certs/kafka.jks")

Supports:

  • SSL
  • Kerberos
  • SASL
  • OAuth

πŸ›‘οΈ Checkpointing (Mandatory)

.option("checkpointLocation", "/path/to/checkpoints")

Checkpoint stores:

  • Kafka offsets
  • Aggregation state
  • Watermark data
  • Query progress

Without checkpoint β†’ no exactly-once guarantees.


πŸ§ͺ Testing Kafka + Spark Locally

Use:

nc -lk 9999

or Run a local Kafka:

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

πŸš€ Production Best Practices

βœ” Always use JSON with schema​

Avoid unstructured messages.

βœ” Use watermarks with aggregations​

Prevent memory buildup.

βœ” Tune partition counts​

More partitions β†’ more throughput.

βœ” Use Delta/Parquet as sink​

Avoid console sink in production.

βœ” Monitor lag​

Kafka Consumer Lag = Health of your pipeline.

βœ” Use autoscaling Spark clusters​

Handle spikes automatically.


πŸ› οΈ Full Working Example (Clean, Ready-to-Use)

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([
StructField("user", StringType()),
StructField("action", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", TimestampType())
])

# Read from Kafka
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transactions") \
.load()

# Parse JSON
json = df.select(
from_json(col("value").cast("string"), schema).alias("data"),
"timestamp"
)

parsed = json.select("data.*", "timestamp")

# Aggregation with windows
agg = parsed.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "user") \
.agg(sum("amount").alias("total_spent"))

# Write back to Kafka
agg.selectExpr("CAST(user AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "user-spending") \
.option("checkpointLocation", "/checkpoint/kafka-pipeline") \
.start()

🏁 Summary

Kafka + PySpark Streaming gives you:

  • Real-time event ingestion
  • Scalable, fault-tolerant pipelines
  • Event-time windowing
  • Exactly-once delivery
  • JSON parsing & schema validation
  • State management via watermarks
  • High-throughput processing

This is the backbone of modern data engineering.