Skip to main content

Reading Streaming Data from Kafka, S3, and Socket in PySpark

At NeoMart, streaming data comes from multiple sources:

  • Kafka → real-time orders and events
  • S3 → log files and batch append files
  • Socket → testing and demo streams

Structured Streaming makes it easy to read from these sources as continuous DataFrames.


1. Reading from Kafka

Input: Kafka topic orders receives messages:


{"order_id":"O1001","customer_id":"C101","amount":250}
{"order_id":"O1002","customer_id":"C102","amount":150}
{"order_id":"O1003","customer_id":"C103","amount":300}

Code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder.appName("NeoMartKafka").getOrCreate()

# Read streaming data from Kafka
df_kafka = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.load()

# Convert binary value to string
df_orders = df_kafka.selectExpr("CAST(value AS STRING) as order_json")

# Define schema
schema = StructType() \
.add("order_id", StringType()) \
.add("customer_id", StringType()) \
.add("amount", IntegerType())

# Parse JSON
df_parsed = df_orders.select(from_json(col("order_json"), schema).alias("order")).select("order.*")
df_parsed.printSchema()

Output Schema:

root
|-- order_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- amount: integer (nullable = true)

Streaming Output Example (Console Sink):

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+

2. Reading from S3

Input: S3 folder s3://neomart-logs/orders/ receives JSON files:

order_id,customer_id,amount
O1004,C104,400
O1005,C105,120

Code:

df_s3 = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/tmp/schema") \
.load("s3://neomart-logs/orders/")

df_s3.printSchema()

Output Schema:

root
|-- order_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- amount: long (nullable = true)

Streaming Output Example (Console Sink):

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1004 |C104 |400 |
|O1005 |C105 |120 |
+--------+-----------+------+

3. Reading from Socket (Quick Testing)

Input: Lines typed into localhost:9999:

order O1006 C106 350
order O1007 C107 500

Code:

df_socket = spark.readStream.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()

df_socket.printSchema()

Output Schema:

root
|-- value: string (nullable = true)

Streaming Output Example (Console Sink):

+-------------------+
|value |
+-------------------+
|order O1006 C106 350|
|order O1007 C107 500|
+-------------------+

4. Common Notes

  • All sources return unbounded DataFrames → continuously updated
  • Combine with transformations and aggregations like batch DataFrames
  • Always set checkpointLocation for fault-tolerance:
query = df_parsed.writeStream \
.format("console") \
.option("checkpointLocation", "/tmp/checkpoints") \
.start()

query.awaitTermination()

5. Best Practices

✔ For Kafka: tune startingOffsets and maxOffsetsPerTrigger ✔ For S3: use Auto Loader (cloudFiles) for incremental ingestion ✔ For Socket: use only for small-scale tests ✔ Always define schema → avoids performance penalties ✔ Use checkpointing for exactly-once guarantees


Summary

NeoMart pipelines use multiple streaming sources:

  • Kafka → high-throughput real-time events
  • S3 → append-only log ingestion
  • Socket → testing and demo purposes

Structured Streaming allows uniform API to handle all sources as continuous DataFrames, enabling scalable real-time analytics.


Next Topic → Writing Streaming Data to Sinks