Skip to main content

Introduction to Structured Streaming in PySpark — Real-Time Data Processing

At NeoMart, data flows continuously:

  • Online orders every second
  • Clickstream events from the website
  • IoT sensor metrics from warehouses

To analyze data in real-time, we use Structured Streaming, PySpark’s high-level, scalable, fault-tolerant stream processing engine.


1. Why Structured Streaming?

  • Handles unbounded streams of data
  • Provides DataFrame/Dataset API → familiar for batch processing
  • Ensures exactly-once semantics for reliable pipelines
  • Integrates with multiple sources and sinks

Story: NeoMart wants to calculate real-time sales metrics without waiting for batch jobs.


2. Reading a Stream from Socket

Input: Each line typed into a socket (localhost:9999)


hello world
spark streaming rocks
neo mart analytics

Code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

spark = SparkSession.builder.appName("NeoMartStreaming").getOrCreate()

# Read streaming data from socket
df_stream = spark.readStream.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()

# Split incoming lines into words
words = df_stream.select(split(col("value"), " ").alias("words"))

words.printSchema()

Output Schema:

root
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
  • Each line from the socket → split into words
  • df_stream is unbounded → continuously updated

3. Aggregating Streaming Data

Code:

from pyspark.sql.functions import explode

# Explode words for counting
word_counts = words.select(explode(col("words")).alias("word")) \
.groupBy("word").count()

# Stream output to console
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()

query.awaitTermination()

Output Example:

+--------+-----+
|word |count|
+--------+-----+
|hello |1 |
|world |1 |
|spark |1 |
|streaming|1 |
|rocks |1 |
|neo |1 |
|mart |1 |
|analytics|1 |
+--------+-----+

Story: NeoMart dashboard shows top words in real-time clickstreams.


4. Checkpointing

query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/tmp/checkpoints") \
.start()
  • Stores streaming state → recover after failure
  • Required for stateful operations like aggregations

Summary

Structured Streaming allows NeoMart to:

  • Ingest real-time data from multiple sources
  • Apply transformations and aggregations like batch DataFrames
  • Write results to sinks with exactly-once semantics
  • Ensure fault tolerance with checkpointing

Next Topic → Reading Streaming Data from Kafka, S3, and Socket

Career