Back to Notes

Using Snowflake in ETL Workflows

SnowflakeETLData EngineeringData WarehouseSQL

Using Snowflake in ETL Workflows

Snowflake's cloud-native architecture makes it an excellent choice for modern ETL (Extract, Transform, Load) workflows. This guide covers practical patterns for integrating Snowflake into data pipelines.

Architecture Overview

Basic ETL Pipeline with Snowflake

┌─────────────────┐
│  Data Sources   │
│                 │
│  • APIs         │
│  • Databases    │
│  • Files (S3)   │
│  • Streaming    │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Extract Layer  │
│                 │
│  • Python/Spark │
│  • Fivetran     │
│  • Airbyte      │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Staging Area   │
│                 │
│  • S3/Azure/GCS │
│  • Raw Files    │
└────────┬────────┘
         │
         ▼
┌─────────────────────────────────────────┐
│           SNOWFLAKE                      │
│                                          │
│  ┌────────────────────────────────────┐ │
│  │  Raw Database (Landing Zone)       │ │
│  │  • COPY INTO statements            │ │
│  │  • VARIANT columns for JSON        │ │
│  │  • File format definitions         │ │
│  └──────────────┬─────────────────────┘ │
│                 │                        │
│                 ▼                        │
│  ┌────────────────────────────────────┐ │
│  │  Staging Database (Transformation) │ │
│  │  • dbt models                      │ │
│  │  • Stored procedures               │ │
│  │  • Streams & Tasks                 │ │
│  └──────────────┬─────────────────────┘ │
│                 │                        │
│                 ▼                        │
│  ┌────────────────────────────────────┐ │
│  │  Production Database (Marts)       │ │
│  │  • Aggregated tables               │ │
│  │  • Materialized views              │ │
│  │  • Business logic applied          │ │
│  └──────────────┬─────────────────────┘ │
│                                          │
└──────────────────┬───────────────────────┘
                   │
                   ▼
         ┌─────────────────┐
         │  Consumption    │
         │                 │
         │  • Tableau      │
         │  • Power BI     │
         │  • Custom Apps  │
         └─────────────────┘

Core Components

1. Data Ingestion Patterns

Using COPY INTO from S3

-- Create file format
CREATE OR REPLACE FILE FORMAT json_format
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE
  COMPRESSION = 'AUTO';

-- Create stage pointing to S3
CREATE OR REPLACE STAGE raw_data_stage
  URL = 's3://my-bucket/raw-data/'
  CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'xxx')
  FILE_FORMAT = json_format;

-- Load data into raw table
COPY INTO raw_database.raw_schema.events
FROM @raw_data_stage
PATTERN = '.*events.*[.]json'
ON_ERROR = 'CONTINUE'
PURGE = TRUE;

Snowpipe for Streaming Ingestion

-- Create pipe for automatic ingestion
CREATE OR REPLACE PIPE raw_database.raw_schema.events_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO raw_database.raw_schema.events
  FROM @raw_data_stage
  FILE_FORMAT = json_format;

-- Get SQS queue ARN for S3 event notification
SHOW PIPES LIKE 'events_pipe';

2. Data Transformation with Streams and Tasks

Setting Up Change Data Capture

-- Create stream to track changes
CREATE OR REPLACE STREAM raw_events_stream
ON TABLE raw_database.raw_schema.events;

-- Create transformation task
CREATE OR REPLACE TASK transform_events_task
  WAREHOUSE = transform_wh
  SCHEDULE = '5 MINUTE'
WHEN
  SYSTEM$STREAM_HAS_DATA('raw_events_stream')
AS
  INSERT INTO staging_database.staging_schema.events_cleaned
  SELECT
    event_data:id::STRING AS event_id,
    event_data:user_id::STRING AS user_id,
    event_data:event_type::STRING AS event_type,
    event_data:timestamp::TIMESTAMP AS event_timestamp,
    event_data:properties AS properties_json,
    CURRENT_TIMESTAMP() AS processed_at
  FROM raw_events_stream
  WHERE METADATA$ACTION = 'INSERT';

-- Resume task
ALTER TASK transform_events_task RESUME;

3. Multi-Layer Architecture

┌─────────────────────────────────────────────────────┐
│                    SNOWFLAKE                         │
│                                                      │
│  ┌──────────────────────────────────────────────┐  │
│  │  RAW Layer (Bronze)                          │  │
│  │  • Schema-on-read with VARIANT              │  │
│  │  • Minimal transformations                   │  │
│  │  • Audit columns (loaded_at, source)        │  │
│  └──────────────────┬───────────────────────────┘  │
│                     │                                │
│                     ▼                                │
│  ┌──────────────────────────────────────────────┐  │
│  │  STAGING Layer (Silver)                      │  │
│  │  • Type casting & validation                 │  │
│  │  • Deduplication                             │  │
│  │  • Basic business logic                      │  │
│  │  • Surrogate keys added                      │  │
│  └──────────────────┬───────────────────────────┘  │
│                     │                                │
│                     ▼                                │
│  ┌──────────────────────────────────────────────┐  │
│  │  PRODUCTION Layer (Gold)                     │  │
│  │  • Star/snowflake schemas                    │  │
│  │  • Aggregations & metrics                    │  │
│  │  • Ready for BI consumption                  │  │
│  └──────────────────────────────────────────────┘  │
│                                                      │
└─────────────────────────────────────────────────────┘

Best Practices

Warehouse Sizing Strategy

-- Separate warehouses by workload
CREATE WAREHOUSE ingestion_wh
  WAREHOUSE_SIZE = 'SMALL'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE
  INITIALLY_SUSPENDED = TRUE;

CREATE WAREHOUSE transform_wh
  WAREHOUSE_SIZE = 'MEDIUM'
  AUTO_SUSPEND = 300
  AUTO_RESUME = TRUE
  SCALING_POLICY = 'STANDARD';

CREATE WAREHOUSE reporting_wh
  WAREHOUSE_SIZE = 'LARGE'
  AUTO_SUSPEND = 600
  AUTO_RESUME = TRUE
  MAX_CLUSTER_COUNT = 3;

Incremental Loading Pattern

-- Track last processed timestamp
CREATE OR REPLACE TABLE etl_control.metadata.load_timestamps (
  table_name VARCHAR,
  last_loaded_at TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);

-- Incremental load procedure
CREATE OR REPLACE PROCEDURE load_incremental_events()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
  last_load TIMESTAMP;
  rows_loaded INTEGER;
BEGIN
  -- Get last load timestamp
  SELECT last_loaded_at INTO last_load
  FROM etl_control.metadata.load_timestamps
  WHERE table_name = 'events';
  
  -- Load only new records
  INSERT INTO staging_database.staging_schema.events_cleaned
  SELECT
    event_id,
    user_id,
    event_type,
    event_timestamp,
    properties_json
  FROM raw_database.raw_schema.events
  WHERE loaded_at > :last_load;
  
  rows_loaded := SQLROWCOUNT;
  
  -- Update control table
  UPDATE etl_control.metadata.load_timestamps
  SET last_loaded_at = CURRENT_TIMESTAMP(),
      updated_at = CURRENT_TIMESTAMP()
  WHERE table_name = 'events';
  
  RETURN 'Loaded ' || rows_loaded || ' rows';
END;
$$;

Advanced Patterns

ELT with dbt

-- models/staging/stg_events.sql
{{
  config(
    materialized='incremental',
    unique_key='event_id',
    cluster_by=['event_date']
  )
}}

SELECT
  event_data:id::STRING AS event_id,
  event_data:user_id::STRING AS user_id,
  event_data:event_type::STRING AS event_type,
  TO_DATE(event_data:timestamp::TIMESTAMP) AS event_date,
  event_data:timestamp::TIMESTAMP AS event_timestamp,
  PARSE_JSON(event_data:properties) AS properties
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
  WHERE loaded_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}

Zero-Copy Cloning for Testing

-- Clone production for testing transformations
CREATE DATABASE staging_test_db
CLONE staging_database;

-- Test transformations
USE DATABASE staging_test_db;
-- Run your ETL logic here

-- Drop when done
DROP DATABASE staging_test_db;

Time Travel for Data Recovery

-- Query table as it was 1 hour ago
SELECT * FROM production_database.marts.daily_metrics
AT(OFFSET => -3600);

-- Restore accidentally deleted data
INSERT INTO production_database.marts.daily_metrics
SELECT * FROM production_database.marts.daily_metrics
BEFORE(STATEMENT => '<query_id>');

Orchestration Architecture

Airflow + Snowflake Integration

┌──────────────────────────────────────────┐
│           Apache Airflow                  │
│                                           │
│  ┌────────────────────────────────────┐  │
│  │  DAG: daily_etl_pipeline           │  │
│  │                                    │  │
│  │  [Extract] → [Stage] → [Load]     │  │
│  │      ↓          ↓         ↓        │  │
│  │  [Transform] → [Test] → [Publish] │  │
│  └────────────────┬───────────────────┘  │
│                   │                       │
└───────────────────┼───────────────────────┘
                    │
                    ▼
        ┌───────────────────────┐
        │  Snowflake Connector  │
        └───────────┬───────────┘
                    │
                    ▼
        ┌───────────────────────┐
        │      SNOWFLAKE        │
        │                       │
        │  • Execute SQL        │
        │  • Call Procedures    │
        │  • Check Status       │
        └───────────────────────┘

Sample Airflow DAG

from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'snowflake_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline to Snowflake',
    schedule_interval='0 2 * * *',  # 2 AM daily
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['etl', 'snowflake']
) as dag:

    # Load raw data from S3
    load_raw_data = S3ToSnowflakeOperator(
        task_id='load_raw_data',
        snowflake_conn_id='snowflake_default',
        s3_keys=['raw-data/{{ ds }}/*.json'],
        table='raw_database.raw_schema.events',
        stage='raw_data_stage',
        file_format='json_format'
    )

    # Transform data
    transform_data = SnowflakeOperator(
        task_id='transform_data',
        snowflake_conn_id='snowflake_default',
        sql='CALL load_incremental_events();',
        warehouse='transform_wh'
    )

    # Build aggregations
    build_metrics = SnowflakeOperator(
        task_id='build_metrics',
        snowflake_conn_id='snowflake_default',
        sql='''
            INSERT INTO production_database.marts.daily_metrics
            SELECT
                event_date,
                event_type,
                COUNT(DISTINCT user_id) AS unique_users,
                COUNT(*) AS event_count
            FROM staging_database.staging_schema.events_cleaned
            WHERE event_date = '{{ ds }}'
            GROUP BY event_date, event_type;
        ''',
        warehouse='transform_wh'
    )

    # Data quality check
    quality_check = SnowflakeOperator(
        task_id='quality_check',
        snowflake_conn_id='snowflake_default',
        sql='''
            SELECT
                CASE 
                    WHEN COUNT(*) = 0 THEN ERROR('No data loaded for {{ ds }}')
                    WHEN COUNT(DISTINCT event_date) > 1 THEN ERROR('Multiple dates in partition')
                    ELSE TRUE
                END AS quality_check
            FROM staging_database.staging_schema.events_cleaned
            WHERE event_date = '{{ ds }}';
        ''',
        warehouse='transform_wh'
    )

    load_raw_data >> transform_data >> quality_check >> build_metrics

Performance Optimization

Clustering Strategy

-- Define clustering keys for large tables
ALTER TABLE staging_database.staging_schema.events_cleaned
CLUSTER BY (event_date, event_type);

-- Monitor clustering effectiveness
SELECT SYSTEM$CLUSTERING_INFORMATION('events_cleaned');

Materialized Views for Aggregations

-- Create materialized view for common queries
CREATE OR REPLACE MATERIALIZED VIEW production_database.marts.hourly_event_counts
AS
SELECT
    DATE_TRUNC('HOUR', event_timestamp) AS event_hour,
    event_type,
    COUNT(*) AS event_count,
    COUNT(DISTINCT user_id) AS unique_users
FROM staging_database.staging_schema.events_cleaned
GROUP BY DATE_TRUNC('HOUR', event_timestamp), event_type;

Monitoring and Observability

Query History Analysis

-- Monitor long-running queries
SELECT
    query_id,
    query_text,
    database_name,
    schema_name,
    user_name,
    warehouse_name,
    start_time,
    end_time,
    total_elapsed_time / 1000 AS execution_seconds,
    bytes_scanned,
    rows_produced
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(hour, -24, CURRENT_TIMESTAMP())
  AND total_elapsed_time > 60000  -- Over 1 minute
ORDER BY total_elapsed_time DESC
LIMIT 20;

Cost Tracking

-- Warehouse credit usage by day
SELECT
    DATE_TRUNC('day', start_time) AS usage_date,
    warehouse_name,
    SUM(credits_used) AS total_credits,
    SUM(credits_used) * 3 AS estimated_cost_usd  -- Assuming $3/credit
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
GROUP BY DATE_TRUNC('day', start_time), warehouse_name
ORDER BY usage_date DESC, total_credits DESC;

Key Takeaways

When to Use Snowflake for ETL

Good Fit:

  • High volume batch processing (millions+ rows)
  • Complex SQL transformations
  • Semi-structured data (JSON, Parquet)
  • Need for separation of compute and storage
  • Multiple concurrent workloads

Consider Alternatives:

  • Real-time streaming (< 1 second latency)
  • Small datasets (< 100k rows)
  • Heavy Python/Java transformations
  • Ultra-low budget constraints

Cost Optimization Tips

  1. Right-size warehouses - Start small and scale up
  2. Use auto-suspend - Set to 60-300 seconds
  3. Cluster tables - Reduce scanning for large tables
  4. Partition by date - Enable efficient pruning
  5. Use result caching - Identical queries return cached results free
  6. Schedule tasks wisely - Batch jobs during off-peak hours

Security Best Practices

-- Row-level security with masking policies
CREATE OR REPLACE MASKING POLICY email_mask AS (val STRING)
  RETURNS STRING ->
    CASE
      WHEN CURRENT_ROLE() IN ('ADMIN', 'ANALYST') THEN val
      ELSE '***MASKED***'
    END;

ALTER TABLE staging_database.staging_schema.users
  MODIFY COLUMN email SET MASKING POLICY email_mask;

Resources