Skip to main content

Writing Streaming Data to Sinks in PySpark — Console, Parquet, Snowflake

At NeoMart, streaming data is only valuable when it reaches dashboards, storage, or analytics systems:

  • Console → quick debugging
  • Parquet → persistent storage for batch + streaming queries
  • Snowflake → real-time analytics and reporting

Structured Streaming provides a unified API to write data safely and efficiently.


1. Writing to Console (Quick Debugging)

Input Data:

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+

Code:

query_console = df_parsed.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()

query_console.awaitTermination()

Console Output Example:

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+

2. Writing to Parquet (Persistent Storage)

Input Data: Same as above

Code:

query_parquet = df_parsed.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/tmp/neomart_orders") \
.option("checkpointLocation", "/tmp/checkpoints_parquet") \
.start()

query_parquet.awaitTermination()

Output Example (Files Created in /tmp/neomart_orders):

/tmp/neomart_orders/
├── part-00000-xxxx.snappy.parquet
├── part-00001-xxxx.snappy.parquet
└── part-00002-xxxx.snappy.parquet

Reading back Parquet files:

spark.read.parquet("/tmp/neomart_orders").show()

Output:

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+

3. Writing to Snowflake (Cloud Analytics)

Input Data: Same as above

Code:

sf_options = {
"sfURL": "account.snowflakecomputing.com",
"sfDatabase": "NEOMART_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "COMPUTE_WH",
"sfRole": "SYSADMIN",
"sfUser": "USER",
"sfPassword": "PASSWORD"
}

query_snowflake = df_parsed.writeStream \
.format("snowflake") \
.options(**sf_options) \
.option("dbtable", "ORDERS_STREAM") \
.option("checkpointLocation", "/tmp/checkpoints_snowflake") \
.outputMode("append") \
.start()

query_snowflake.awaitTermination()

Snowflake Output Example:

SELECT * FROM ORDERS_STREAM;
+---------+------------+------+
|ORDER_ID |CUSTOMER_ID |AMOUNT|
+---------+------------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+---------+------------+------+

4. Choosing OutputMode

ModeUse Case
appendOnly new rows, no updates (Kafka, Parquet, Snowflake)
completeFull aggregation results (console, dashboards)
updateOnly changed rows since last batch (stateful aggregations)

5. Best Practices

✔ Always set checkpointLocation → prevents data loss ✔ Use append mode for streaming writes to Parquet/Kafka/Snowflake ✔ Optimize batch size with .trigger(processingTime='10 seconds') ✔ Monitor sinks for backpressure or failures ✔ Test with console sink before writing to production sinks


Summary

NeoMart’s streaming pipelines efficiently write data to:

  • Console → debugging & testing
  • Parquet → scalable, persistent storage
  • Snowflake → real-time analytics

Structured Streaming ensures reliable, exactly-once delivery, combining performance, fault-tolerance, and flexibility.


Next Topic → Introduction to MLlib — Pipelines, Transformers, and Estimators