Writing Streaming Data to Sinks in PySpark β Console, Parquet, Snowflake
At NeoMart, streaming data is only valuable when it reaches dashboards, storage, or analytics systems:
- Console β quick debugging
- Parquet β persistent storage for batch + streaming queries
- Snowflake β real-time analytics and reporting
Structured Streaming provides a unified API to write data safely and efficiently.
1. Writing to Console (Quick Debugging)β
Input Data:
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+
Code:
query_console = df_parsed.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
query_console.awaitTermination()
Console Output Example:
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+
2. Writing to Parquet (Persistent Storage)β
Input Data: Same as above
Code:
query_parquet = df_parsed.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/tmp/neomart_orders") \
.option("checkpointLocation", "/tmp/checkpoints_parquet") \
.start()
query_parquet.awaitTermination()
Output Example (Files Created in /tmp/neomart_orders):
/tmp/neomart_orders/
βββ part-00000-xxxx.snappy.parquet
βββ part-00001-xxxx.snappy.parquet
βββ part-00002-xxxx.snappy.parquet
Reading back Parquet files:
spark.read.parquet("/tmp/neomart_orders").show()
Output:
+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+
3. Writing to Snowflake (Cloud Analytics)β
Input Data: Same as above
Code:
sf_options = {
"sfURL": "account.snowflakecomputing.com",
"sfDatabase": "NEOMART_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "COMPUTE_WH",
"sfRole": "SYSADMIN",
"sfUser": "USER",
"sfPassword": "PASSWORD"
}
query_snowflake = df_parsed.writeStream \
.format("snowflake") \
.options(**sf_options) \
.option("dbtable", "ORDERS_STREAM") \
.option("checkpointLocation", "/tmp/checkpoints_snowflake") \
.outputMode("append") \
.start()
query_snowflake.awaitTermination()
Snowflake Output Example:
SELECT * FROM ORDERS_STREAM;
+---------+------------+------+
|ORDER_ID |CUSTOMER_ID |AMOUNT|
+---------+------------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+---------+------------+------+
4. Choosing OutputModeβ
| Mode | Use Case |
|---|---|
| append | Only new rows, no updates (Kafka, Parquet, Snowflake) |
| complete | Full aggregation results (console, dashboards) |
| update | Only changed rows since last batch (stateful aggregations) |
5. Best Practicesβ
β Always set checkpointLocation β prevents data loss
β Use append mode for streaming writes to Parquet/Kafka/Snowflake
β Optimize batch size with .trigger(processingTime='10 seconds')
β Monitor sinks for backpressure or failures
β Test with console sink before writing to production sinks
Summaryβ
NeoMartβs streaming pipelines efficiently write data to:
- Console β debugging & testing
- Parquet β scalable, persistent storage
- Snowflake β real-time analytics
Structured Streaming ensures reliable, exactly-once delivery, combining performance, fault-tolerance, and flexibility.
Next Topic β Introduction to MLlib β Pipelines, Transformers, and Estimators