Using Snowflake in ETL Workflows
•
SnowflakeETLData EngineeringData WarehouseSQL
Using Snowflake in ETL Workflows
Snowflake's cloud-native architecture makes it an excellent choice for modern ETL (Extract, Transform, Load) workflows. This guide covers practical patterns for integrating Snowflake into data pipelines.
Architecture Overview
Basic ETL Pipeline with Snowflake
┌─────────────────┐
│ Data Sources │
│ │
│ • APIs │
│ • Databases │
│ • Files (S3) │
│ • Streaming │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Extract Layer │
│ │
│ • Python/Spark │
│ • Fivetran │
│ • Airbyte │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Staging Area │
│ │
│ • S3/Azure/GCS │
│ • Raw Files │
└────────┬────────┘
│
▼
┌─────────────────────────────────────────┐
│ SNOWFLAKE │
│ │
│ ┌────────────────────────────────────┐ │
│ │ Raw Database (Landing Zone) │ │
│ │ • COPY INTO statements │ │
│ │ • VARIANT columns for JSON │ │
│ │ • File format definitions │ │
│ └──────────────┬─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────┐ │
│ │ Staging Database (Transformation) │ │
│ │ • dbt models │ │
│ │ • Stored procedures │ │
│ │ • Streams & Tasks │ │
│ └──────────────┬─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────┐ │
│ │ Production Database (Marts) │ │
│ │ • Aggregated tables │ │
│ │ • Materialized views │ │
│ │ • Business logic applied │ │
│ └──────────────┬─────────────────────┘ │
│ │
└──────────────────┬───────────────────────┘
│
▼
┌─────────────────┐
│ Consumption │
│ │
│ • Tableau │
│ • Power BI │
│ • Custom Apps │
└─────────────────┘
Core Components
1. Data Ingestion Patterns
Using COPY INTO from S3
-- Create file format
CREATE OR REPLACE FILE FORMAT json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE
COMPRESSION = 'AUTO';
-- Create stage pointing to S3
CREATE OR REPLACE STAGE raw_data_stage
URL = 's3://my-bucket/raw-data/'
CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'xxx')
FILE_FORMAT = json_format;
-- Load data into raw table
COPY INTO raw_database.raw_schema.events
FROM @raw_data_stage
PATTERN = '.*events.*[.]json'
ON_ERROR = 'CONTINUE'
PURGE = TRUE;
Snowpipe for Streaming Ingestion
-- Create pipe for automatic ingestion
CREATE OR REPLACE PIPE raw_database.raw_schema.events_pipe
AUTO_INGEST = TRUE
AS
COPY INTO raw_database.raw_schema.events
FROM @raw_data_stage
FILE_FORMAT = json_format;
-- Get SQS queue ARN for S3 event notification
SHOW PIPES LIKE 'events_pipe';
2. Data Transformation with Streams and Tasks
Setting Up Change Data Capture
-- Create stream to track changes
CREATE OR REPLACE STREAM raw_events_stream
ON TABLE raw_database.raw_schema.events;
-- Create transformation task
CREATE OR REPLACE TASK transform_events_task
WAREHOUSE = transform_wh
SCHEDULE = '5 MINUTE'
WHEN
SYSTEM$STREAM_HAS_DATA('raw_events_stream')
AS
INSERT INTO staging_database.staging_schema.events_cleaned
SELECT
event_data:id::STRING AS event_id,
event_data:user_id::STRING AS user_id,
event_data:event_type::STRING AS event_type,
event_data:timestamp::TIMESTAMP AS event_timestamp,
event_data:properties AS properties_json,
CURRENT_TIMESTAMP() AS processed_at
FROM raw_events_stream
WHERE METADATA$ACTION = 'INSERT';
-- Resume task
ALTER TASK transform_events_task RESUME;
3. Multi-Layer Architecture
┌─────────────────────────────────────────────────────┐
│ SNOWFLAKE │
│ │
│ ┌──────────────────────────────────────────────┐ │
│ │ RAW Layer (Bronze) │ │
│ │ • Schema-on-read with VARIANT │ │
│ │ • Minimal transformations │ │
│ │ • Audit columns (loaded_at, source) │ │
│ └──────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ STAGING Layer (Silver) │ │
│ │ • Type casting & validation │ │
│ │ • Deduplication │ │
│ │ • Basic business logic │ │
│ │ • Surrogate keys added │ │
│ └──────────────────┬───────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ PRODUCTION Layer (Gold) │ │
│ │ • Star/snowflake schemas │ │
│ │ • Aggregations & metrics │ │
│ │ • Ready for BI consumption │ │
│ └──────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
Best Practices
Warehouse Sizing Strategy
-- Separate warehouses by workload
CREATE WAREHOUSE ingestion_wh
WAREHOUSE_SIZE = 'SMALL'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE;
CREATE WAREHOUSE transform_wh
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
SCALING_POLICY = 'STANDARD';
CREATE WAREHOUSE reporting_wh
WAREHOUSE_SIZE = 'LARGE'
AUTO_SUSPEND = 600
AUTO_RESUME = TRUE
MAX_CLUSTER_COUNT = 3;
Incremental Loading Pattern
-- Track last processed timestamp
CREATE OR REPLACE TABLE etl_control.metadata.load_timestamps (
table_name VARCHAR,
last_loaded_at TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
-- Incremental load procedure
CREATE OR REPLACE PROCEDURE load_incremental_events()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
last_load TIMESTAMP;
rows_loaded INTEGER;
BEGIN
-- Get last load timestamp
SELECT last_loaded_at INTO last_load
FROM etl_control.metadata.load_timestamps
WHERE table_name = 'events';
-- Load only new records
INSERT INTO staging_database.staging_schema.events_cleaned
SELECT
event_id,
user_id,
event_type,
event_timestamp,
properties_json
FROM raw_database.raw_schema.events
WHERE loaded_at > :last_load;
rows_loaded := SQLROWCOUNT;
-- Update control table
UPDATE etl_control.metadata.load_timestamps
SET last_loaded_at = CURRENT_TIMESTAMP(),
updated_at = CURRENT_TIMESTAMP()
WHERE table_name = 'events';
RETURN 'Loaded ' || rows_loaded || ' rows';
END;
$$;
Advanced Patterns
ELT with dbt
-- models/staging/stg_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
cluster_by=['event_date']
)
}}
SELECT
event_data:id::STRING AS event_id,
event_data:user_id::STRING AS user_id,
event_data:event_type::STRING AS event_type,
TO_DATE(event_data:timestamp::TIMESTAMP) AS event_date,
event_data:timestamp::TIMESTAMP AS event_timestamp,
PARSE_JSON(event_data:properties) AS properties
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE loaded_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}
Zero-Copy Cloning for Testing
-- Clone production for testing transformations
CREATE DATABASE staging_test_db
CLONE staging_database;
-- Test transformations
USE DATABASE staging_test_db;
-- Run your ETL logic here
-- Drop when done
DROP DATABASE staging_test_db;
Time Travel for Data Recovery
-- Query table as it was 1 hour ago
SELECT * FROM production_database.marts.daily_metrics
AT(OFFSET => -3600);
-- Restore accidentally deleted data
INSERT INTO production_database.marts.daily_metrics
SELECT * FROM production_database.marts.daily_metrics
BEFORE(STATEMENT => '<query_id>');
Orchestration Architecture
Airflow + Snowflake Integration
┌──────────────────────────────────────────┐
│ Apache Airflow │
│ │
│ ┌────────────────────────────────────┐ │
│ │ DAG: daily_etl_pipeline │ │
│ │ │ │
│ │ [Extract] → [Stage] → [Load] │ │
│ │ ↓ ↓ ↓ │ │
│ │ [Transform] → [Test] → [Publish] │ │
│ └────────────────┬───────────────────┘ │
│ │ │
└───────────────────┼───────────────────────┘
│
▼
┌───────────────────────┐
│ Snowflake Connector │
└───────────┬───────────┘
│
▼
┌───────────────────────┐
│ SNOWFLAKE │
│ │
│ • Execute SQL │
│ • Call Procedures │
│ • Check Status │
└───────────────────────┘
Sample Airflow DAG
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'snowflake_etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline to Snowflake',
schedule_interval='0 2 * * *', # 2 AM daily
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['etl', 'snowflake']
) as dag:
# Load raw data from S3
load_raw_data = S3ToSnowflakeOperator(
task_id='load_raw_data',
snowflake_conn_id='snowflake_default',
s3_keys=['raw-data/{{ ds }}/*.json'],
table='raw_database.raw_schema.events',
stage='raw_data_stage',
file_format='json_format'
)
# Transform data
transform_data = SnowflakeOperator(
task_id='transform_data',
snowflake_conn_id='snowflake_default',
sql='CALL load_incremental_events();',
warehouse='transform_wh'
)
# Build aggregations
build_metrics = SnowflakeOperator(
task_id='build_metrics',
snowflake_conn_id='snowflake_default',
sql='''
INSERT INTO production_database.marts.daily_metrics
SELECT
event_date,
event_type,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(*) AS event_count
FROM staging_database.staging_schema.events_cleaned
WHERE event_date = '{{ ds }}'
GROUP BY event_date, event_type;
''',
warehouse='transform_wh'
)
# Data quality check
quality_check = SnowflakeOperator(
task_id='quality_check',
snowflake_conn_id='snowflake_default',
sql='''
SELECT
CASE
WHEN COUNT(*) = 0 THEN ERROR('No data loaded for {{ ds }}')
WHEN COUNT(DISTINCT event_date) > 1 THEN ERROR('Multiple dates in partition')
ELSE TRUE
END AS quality_check
FROM staging_database.staging_schema.events_cleaned
WHERE event_date = '{{ ds }}';
''',
warehouse='transform_wh'
)
load_raw_data >> transform_data >> quality_check >> build_metrics
Performance Optimization
Clustering Strategy
-- Define clustering keys for large tables
ALTER TABLE staging_database.staging_schema.events_cleaned
CLUSTER BY (event_date, event_type);
-- Monitor clustering effectiveness
SELECT SYSTEM$CLUSTERING_INFORMATION('events_cleaned');
Materialized Views for Aggregations
-- Create materialized view for common queries
CREATE OR REPLACE MATERIALIZED VIEW production_database.marts.hourly_event_counts
AS
SELECT
DATE_TRUNC('HOUR', event_timestamp) AS event_hour,
event_type,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users
FROM staging_database.staging_schema.events_cleaned
GROUP BY DATE_TRUNC('HOUR', event_timestamp), event_type;
Monitoring and Observability
Query History Analysis
-- Monitor long-running queries
SELECT
query_id,
query_text,
database_name,
schema_name,
user_name,
warehouse_name,
start_time,
end_time,
total_elapsed_time / 1000 AS execution_seconds,
bytes_scanned,
rows_produced
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(hour, -24, CURRENT_TIMESTAMP())
AND total_elapsed_time > 60000 -- Over 1 minute
ORDER BY total_elapsed_time DESC
LIMIT 20;
Cost Tracking
-- Warehouse credit usage by day
SELECT
DATE_TRUNC('day', start_time) AS usage_date,
warehouse_name,
SUM(credits_used) AS total_credits,
SUM(credits_used) * 3 AS estimated_cost_usd -- Assuming $3/credit
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
GROUP BY DATE_TRUNC('day', start_time), warehouse_name
ORDER BY usage_date DESC, total_credits DESC;
Key Takeaways
When to Use Snowflake for ETL
✅ Good Fit:
- High volume batch processing (millions+ rows)
- Complex SQL transformations
- Semi-structured data (JSON, Parquet)
- Need for separation of compute and storage
- Multiple concurrent workloads
❌ Consider Alternatives:
- Real-time streaming (< 1 second latency)
- Small datasets (< 100k rows)
- Heavy Python/Java transformations
- Ultra-low budget constraints
Cost Optimization Tips
- Right-size warehouses - Start small and scale up
- Use auto-suspend - Set to 60-300 seconds
- Cluster tables - Reduce scanning for large tables
- Partition by date - Enable efficient pruning
- Use result caching - Identical queries return cached results free
- Schedule tasks wisely - Batch jobs during off-peak hours
Security Best Practices
-- Row-level security with masking policies
CREATE OR REPLACE MASKING POLICY email_mask AS (val STRING)
RETURNS STRING ->
CASE
WHEN CURRENT_ROLE() IN ('ADMIN', 'ANALYST') THEN val
ELSE '***MASKED***'
END;
ALTER TABLE staging_database.staging_schema.users
MODIFY COLUMN email SET MASKING POLICY email_mask;