Skip to main content

Writing Streaming Data to Sinks in PySpark β€” Console, Parquet, Snowflake

At NeoMart, streaming data is only valuable when it reaches dashboards, storage, or analytics systems:

  • Console β†’ quick debugging
  • Parquet β†’ persistent storage for batch + streaming queries
  • Snowflake β†’ real-time analytics and reporting

Structured Streaming provides a unified API to write data safely and efficiently.


1. Writing to Console (Quick Debugging)​

Input Data:

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+

Code:

query_console = df_parsed.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()

query_console.awaitTermination()

Console Output Example:

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+

2. Writing to Parquet (Persistent Storage)​

Input Data: Same as above

Code:

query_parquet = df_parsed.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/tmp/neomart_orders") \
.option("checkpointLocation", "/tmp/checkpoints_parquet") \
.start()

query_parquet.awaitTermination()

Output Example (Files Created in /tmp/neomart_orders):

/tmp/neomart_orders/
β”œβ”€β”€ part-00000-xxxx.snappy.parquet
β”œβ”€β”€ part-00001-xxxx.snappy.parquet
└── part-00002-xxxx.snappy.parquet

Reading back Parquet files:

spark.read.parquet("/tmp/neomart_orders").show()

Output:

+--------+-----------+------+
|order_id|customer_id|amount|
+--------+-----------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+--------+-----------+------+

3. Writing to Snowflake (Cloud Analytics)​

Input Data: Same as above

Code:

sf_options = {
"sfURL": "account.snowflakecomputing.com",
"sfDatabase": "NEOMART_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "COMPUTE_WH",
"sfRole": "SYSADMIN",
"sfUser": "USER",
"sfPassword": "PASSWORD"
}

query_snowflake = df_parsed.writeStream \
.format("snowflake") \
.options(**sf_options) \
.option("dbtable", "ORDERS_STREAM") \
.option("checkpointLocation", "/tmp/checkpoints_snowflake") \
.outputMode("append") \
.start()

query_snowflake.awaitTermination()

Snowflake Output Example:

SELECT * FROM ORDERS_STREAM;
+---------+------------+------+
|ORDER_ID |CUSTOMER_ID |AMOUNT|
+---------+------------+------+
|O1001 |C101 |250 |
|O1002 |C102 |150 |
|O1003 |C103 |300 |
+---------+------------+------+

4. Choosing OutputMode​

ModeUse Case
appendOnly new rows, no updates (Kafka, Parquet, Snowflake)
completeFull aggregation results (console, dashboards)
updateOnly changed rows since last batch (stateful aggregations)

5. Best Practices​

βœ” Always set checkpointLocation β†’ prevents data loss
βœ” Use append mode for streaming writes to Parquet/Kafka/Snowflake
βœ” Optimize batch size with .trigger(processingTime='10 seconds')
βœ” Monitor sinks for backpressure or failures
βœ” Test with console sink before writing to production sinks


Summary​

NeoMart’s streaming pipelines efficiently write data to:

  • Console β†’ debugging & testing
  • Parquet β†’ scalable, persistent storage
  • Snowflake β†’ real-time analytics

Structured Streaming ensures reliable, exactly-once delivery, combining performance, fault-tolerance, and flexibility.


Next Topic β†’ Introduction to MLlib β€” Pipelines, Transformers, and Estimators