Snowpipe — Real-Time Streaming Load with Example Pipeline
Snowpipe — Real-Time Streaming Load with Example Pipeline
Welcome back to RetailCo, our fictional retail company.
Alice, the data engineer, has been using COPY INTO for batch loading daily sales data. Now the marketing team wants real-time sales data for dashboards.
“We need Snowpipe — it continuously loads data as it arrives, without waiting for daily batches,” Alice explains.
Let’s explore Snowpipe, how it works, and a real-world example.
🏗️ What Is Snowpipe?
Snowpipe is Snowflake’s serverless, continuous data ingestion service:
- Loads data automatically from stages (internal or external)
- Supports event-driven or scheduled triggers
- Scales automatically for incoming data
- Ideal for real-time dashboards and analytics
RetailCo example: Sales CSVs arrive in S3 every minute. Snowpipe immediately loads them into Snowflake for analytics.
🔹 Snowpipe Architecture
- External Stage → e.g.,
s3://retailco-realtime-sales - File Arrival Event → triggers Snowpipe automatically
- Snowflake Table → data loaded continuously
- Optional Transformation → downstream pipelines or dashboards
Visual flow:
S3 Bucket (New File) → Event Notification → Snowpipe → SALES Table → BI Dashboard
🔹 Setting Up Snowpipe
Step 1: Create an External Stage
CREATE STAGE REALTIME_SALES_STAGE
URL='s3://retailco-realtime-sales/'
CREDENTIALS=(AWS_KEY_ID='YOUR_KEY' AWS_SECRET_KEY='YOUR_SECRET')
FILE_FORMAT=(TYPE=CSV FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1);
Step 2: Create Snowpipe
CREATE PIPE SALES_PIPE
AUTO_INGEST=TRUE
AS
COPY INTO SALES
FROM @REALTIME_SALES_STAGE
FILE_FORMAT=(TYPE=CSV);
AUTO_INGEST=TRUE→ Snowpipe loads files automatically upon arrival- No manual COPY INTO required
Step 3: Configure Cloud Event Notifications
RetailCo S3 example:
- Enable S3 event notifications to trigger Snowpipe when a new CSV is uploaded
- Snowpipe receives the event and loads the file immediately
🔹 Monitoring Snowpipe
- Use Snowflake’s
LOAD_HISTORYview to track file load status:
SELECT *
FROM TABLE(INFORMATION_SCHEMA.LOAD_HISTORY(
PIPE_NAME=>'SALES_PIPE',
START_TIME=>DATEADD('hour', -1, CURRENT_TIMESTAMP)
));
- Ensures real-time visibility into ingestion
🧩 RetailCo Real-Time Pipeline
- Marketing team uploads sales CSVs every minute to S3
- S3 triggers Snowpipe automatically
- Snowpipe loads data into
SALEStable - BI dashboard reflects near real-time sales updates
- Alice monitors load history for errors
Outcome: Continuous, automated, and near real-time data availability without manual intervention.
🧠 Best Practices
- Use external stages (S3, Azure, GCS) for scalability
- Enable AUTO_INGEST for event-driven ingestion
- Use proper file formats (CSV, JSON, Parquet)
- Monitor load history regularly
- Archive or clean up files to control storage costs
- Combine Snowpipe with tasks for downstream transformations
🏁 Quick Summary
- Snowpipe = serverless, continuous data ingestion for Snowflake
- Key benefits: real-time data loading, automated, scalable
- Setup: external stage → Snowpipe → table → optional downstream tasks
- Best practices: event notifications, file formats, monitoring, and storage management
- Enables real-time dashboards and analytics without manual intervention
🚀 Coming Next
👉 Incremental Loading (CDC) in Snowflake — 5 Real Techniques