Writing Streaming Data to Sinks in PySpark — Console, Parquet, Snowflake
At NeoMart, streaming data is only valuable when it reaches dashboards, storage, or analytics systems:
- Console → quick debugging
- Parquet → persistent storage for batch + streaming queries
- Snowflake → real-time analytics and reporting
Structured Streaming provides a unified API to write data safely and efficiently.
1. Writing to Console (Quick Debugging)
Input Data:
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+
Code:
query_console = df_parsed.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
query_console.awaitTermination()
Console Output Example:
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+
2. Writing to Parquet (Persistent Storage)
Input Data: Same as above
Code:
query_parquet = df_parsed.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/tmp/neomart_orders") \
.option("checkpointLocation", "/tmp/checkpoints_parquet") \
.start()
query_parquet.awaitTermination()
Output Example (Files Created in /tmp/neomart_orders):
/tmp/neomart_orders/
├── part-00000-xxxx.snappy.parquet
├── part-00001-xxxx.snappy.parquet
└── part-00002-xxxx.snappy.parquet
Reading back Parquet files:
spark.read.parquet("/tmp/neomart_orders").show()
Output:
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+
3. Writing to Snowflake (Cloud Analytics)
Input Data: Same as above
Code:
sf_options = {
"sfURL": "account.snowflakecomputing.com",
"sfDatabase": "NEOMART_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "COMPUTE_WH",
"sfRole": "SYSADMIN",
"sfUser": "USER",
"sfPassword": "PASSWORD"
}
query_snowflake = df_parsed.writeStream \
.format("snowflake") \
.options(**sf_options) \
.option("dbtable", "ORDERS_STREAM") \
.option("checkpointLocation", "/tmp/checkpoints_snowflake") \
.outputMode("append") \
.start()
query_snowflake.awaitTermination()
Snowflake Output Example:
SELECT * FROM ORDERS_STREAM;
+---------+------------+------+
|ORDER_ID |CUSTOMER_ID |AMOUNT|
+---------+------------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+---------+------------+------+
4. Choosing OutputMode
| Mode | Use Case |
|---|---|
| append | Only new rows, no updates (Kafka, Parquet, Snowflake) |
| complete | Full aggregation results (console, dashboards) |
| update | Only changed rows since last batch (stateful aggregations) |
5. Best Practices
✔ Always set checkpointLocation → prevents data loss
✔ Use append mode for streaming writes to Parquet/Kafka/Snowflake
✔ Optimize batch size with .trigger(processingTime='10 seconds')
✔ Monitor sinks for backpressure or failures
✔ Test with console sink before writing to production sinks
Summary
NeoMart’s streaming pipelines efficiently write data to:
- Console → debugging & testing
- Parquet → scalable, persistent storage
- Snowflake → real-time analytics
Structured Streaming ensures reliable, exactly-once delivery, combining performance, fault-tolerance, and flexibility.
Next Topic → Introduction to MLlib — Pipelines, Transformers, and Estimators