Reading Streaming Data from Kafka, S3, and Socket in PySpark
At NeoMart, streaming data comes from multiple sources:
- Kafka → real-time orders and events
- S3 → log files and batch append files
- Socket → testing and demo streams
Structured Streaming makes it easy to read from these sources as continuous DataFrames.
1. Reading from Kafka
Input: Kafka topic orders receives messages:
{"order_id":"O1001","customer_id":"C101","amount":250}
{"order_id":"O1002","customer_id":"C102","amount":150}
{"order_id":"O1003","customer_id":"C103","amount":300}
Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder.appName("NeoMartKafka").getOrCreate()
# Read streaming data from Kafka
df_kafka = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.load()
# Convert binary value to string
df_orders = df_kafka.selectExpr("CAST(value AS STRING) as order_json")
# Define schema
schema = StructType() \
.add("order_id", StringType()) \
.add("customer_id", StringType()) \
.add("amount", IntegerType())
# Parse JSON
df_parsed = df_orders.select(from_json(col("order_json"), schema).alias("order")).select("order.*")
df_parsed.printSchema()
Output Schema:
root
|-- order_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- amount: integer (nullable = true)
Streaming Output Example (Console Sink):
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+
2. Reading from S3
Input: S3 folder s3://neomart-logs/orders/ receives JSON files:
order_id,customer_id,amount
O1004,C104,400
O1005,C105,120
Code:
df_s3 = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/tmp/schema") \
.load("s3://neomart-logs/orders/")
df_s3.printSchema()
Output Schema:
root
|-- order_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- amount: long (nullable = true)
Streaming Output Example (Console Sink):
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1004 |C104 |400 |
|O1005 |C105 |120 |
+--------+-----------+------+
3. Reading from Socket (Quick Testing)
Input: Lines typed into localhost:9999:
order O1006 C106 350
order O1007 C107 500
Code:
df_socket = spark.readStream.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
df_socket.printSchema()
Output Schema:
root
|-- value: string (nullable = true)
Streaming Output Example (Console Sink):
+-------------------+
|value |
+-------------------+
|order O1006 C106 350|
|order O1007 C107 500|
+-------------------+
4. Common Notes
- All sources return unbounded DataFrames → continuously updated
- Combine with transformations and aggregations like batch DataFrames
- Always set checkpointLocation for fault-tolerance:
query = df_parsed.writeStream \
.format("console") \
.option("checkpointLocation", "/tmp/checkpoints") \
.start()
query.awaitTermination()
5. Best Practices
✔ For Kafka: tune startingOffsets and maxOffsetsPerTrigger
✔ For S3: use Auto Loader (cloudFiles) for incremental ingestion
✔ For Socket: use only for small-scale tests
✔ Always define schema → avoids performance penalties
✔ Use checkpointing for exactly-once guarantees
Summary
NeoMart pipelines use multiple streaming sources:
- Kafka → high-throughput real-time events
- S3 → append-only log ingestion
- Socket → testing and demo purposes
Structured Streaming allows uniform API to handle all sources as continuous DataFrames, enabling scalable real-time analytics.
Next Topic → Writing Streaming Data to Sinks