⚡ dbt Incremental Model

大量データを効率的に処理する増分更新モデル

📖 Incremental Modelとは

🎯 概要

Incremental Model(増分モデル)は、既存のテーブルに対して新規データや変更データのみを追加・更新する、dbtの効率的なマテリアライゼーション方式です。 毎回全データを再構築するTABLEモデルと異なり、差分データだけを処理することで:

🔄 TABLEモデル(フルリフレッシュ)
毎回全データを再構築
Day 1: 100万行を処理 → テーブル作成
Day 2: 100万行を処理 → テーブル再作成
Day 3: 100万行を処理 → テーブル再作成
...
Day 30: 100万行を処理 → テーブル再作成
⚠️ 課題:
  • データ量が増えると処理時間が線形増加
  • 毎回フルスキャン → 高コスト
  • 大量データでは実行不可能に
  • 頻繁な更新が困難
⚡ Incrementalモデル(差分更新)
新規・変更データのみ処理
Day 1: 100万行を処理 → テーブル作成
Day 2: 新規1万行のみ処理 → 追加
Day 3: 新規1万行のみ処理 → 追加
...
Day 30: 新規1万行のみ処理 → 追加
✅ メリット:
  • 処理時間がほぼ一定
  • 差分のみスキャン → 低コスト
  • 大量データでも高速
  • 毎時・毎分の更新も可能

🔄 Incrementalの処理フロー

初回実行と2回目以降の違い

🆕 初回実行(is_incremental() = False)
テーブルが存在しないため、フルリフレッシュと同じ動作
1
全データを取得
ソーステーブルから全レコードをSELECT。
WHERE句の増分フィルタは適用されない。
2
テーブルを作成
CREATE TABLEで新規テーブルを作成し、全データを挿入。
パーティション設定やクラスタリングも適用。
⚡ 2回目以降の実行(is_incremental() = True)
テーブルが既に存在するため、差分データのみ処理
1
差分データのみ取得
WHERE句で新規データだけフィルタリング。
例: WHERE created_at > (SELECT MAX(created_at) FROM existing_table)
2
既存テーブルとマージ
選択した戦略に応じて処理:
append: 単純に追加
merge: unique_keyで重複を更新
delete+insert: 古いデータを削除して挿入
3
テーブルを更新
差分データが既存テーブルに統合される。
全データの再構築は行わない。

🎯 Incremental戦略

📌 戦略とは:

既存データと新規データをどのように統合するかを定義する方法。
materialized='incremental'incremental_strategyで指定。

1. append(追加)

動作: 新規データを単純に追加(INSERT)
SQL: INSERT INTO table SELECT ...
適用: イベントログ、タイムスタンプテーブル
注意: 重複データが発生する可能性
{{ config( materialized='incremental', incremental_strategy='append' ) }}
🔀

2. merge(マージ)

動作: unique_keyで重複チェック、更新 or 挿入
SQL: MERGE文(UPSERT)
適用: ディメンションテーブル、CDC
注意: unique_keyが必須
{{ config( materialized='incremental', incremental_strategy='merge', unique_key='id' ) }}
🗑️

3. delete+insert

動作: 該当期間のデータを削除して挿入
SQL: DELETE WHERE ... INSERT ...
適用: 日次集計、期間ベース更新
注意: パーティション列が必要
{{ config( materialized='incremental', incremental_strategy='delete+insert', partition_by={'field': 'date'} ) }}
📊

4. insert_overwrite

動作: パーティション単位で上書き
SQL: パーティション置換
適用: BigQuery、Spark、Hive
注意: パーティションテーブル専用
{{ config( materialized='incremental', incremental_strategy='insert_overwrite', partition_by={'field': 'date'} ) }}

💻 実装例

1. 基本的なIncremental Model(append戦略)

-- models/events_incremental.sql {{ config( materialized='incremental', incremental_strategy='append', partition_by={ 'field': 'event_date', 'data_type': 'date', 'granularity': 'day' }, cluster_by=['user_id', 'event_type'] ) }} SELECT event_id, user_id, event_type, event_timestamp, DATE(event_timestamp) AS event_date, properties FROM {{ source('raw', 'events') }} -- 増分フィルタ: 2回目以降のみ適用 {% if is_incremental() %} WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }}) {% endif %}

2. merge戦略(ディメンションテーブル)

-- models/dim_customers_incremental.sql {{ config( materialized='incremental', incremental_strategy='merge', unique_key='customer_id', merge_update_columns=['name', 'email', 'updated_at'] ) }} SELECT customer_id, name, email, created_at, updated_at FROM {{ source('raw', 'customers') }} {% if is_incremental() %} -- 更新されたレコードのみ取得 WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }}) {% endif %}

3. delete+insert戦略(日次集計)

-- models/fct_daily_sales.sql {{ config( materialized='incremental', incremental_strategy='delete+insert', partition_by={ 'field': 'sale_date', 'data_type': 'date' } ) }} SELECT DATE(sale_timestamp) AS sale_date, product_id, SUM(quantity) AS total_quantity, SUM(amount) AS total_amount, COUNT(DISTINCT customer_id) AS unique_customers FROM {{ ref('stg_sales') }} {% if is_incremental() %} -- 過去7日分を再計算(遅延データ対応) WHERE DATE(sale_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) {% endif %} GROUP BY 1, 2

4. 複合キーでのmerge

-- models/fct_order_items.sql {{ config( materialized='incremental', incremental_strategy='merge', unique_key=['order_id', 'item_id'], -- 複合キー partition_by={'field': 'order_date', 'data_type': 'date'} ) }} SELECT order_id, item_id, order_date, quantity, price, discount, updated_at FROM {{ source('raw', 'order_items') }} {% if is_incremental() %} WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }}) {% endif %}

5. insert_overwrite戦略(BigQuery)

-- models/fct_hourly_metrics.sql {{ config( materialized='incremental', incremental_strategy='insert_overwrite', partition_by={ 'field': 'metric_hour', 'data_type': 'timestamp', 'granularity': 'hour' } ) }} SELECT TIMESTAMP_TRUNC(event_timestamp, HOUR) AS metric_hour, metric_name, AVG(metric_value) AS avg_value, MAX(metric_value) AS max_value, COUNT(*) AS count_events FROM {{ ref('stg_metrics') }} {% if is_incremental() %} -- 過去24時間分のパーティションを上書き WHERE metric_hour >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) {% endif %} GROUP BY 1, 2

⚙️ 重要な設定オプション

主要な設定パラメータ

materialized

'incremental' を指定して増分モデルを有効化

incremental_strategy

データ統合方法を指定:
'append': 追加のみ
'merge': UPSERT(更新 or 挿入)
'delete+insert': 削除してから挿入
'insert_overwrite': パーティション上書き

unique_key

merge戦略で重複判定に使用するキー。
単一カラム: 'user_id'
複合キー: ['order_id', 'item_id']

partition_by

パーティション設定(BigQuery):
{'field': 'date', 'data_type': 'date', 'granularity': 'day'}

cluster_by

クラスタリング設定(BigQuery):
['user_id', 'product_id']

on_schema_change

スキーマ変更時の動作:
'ignore': 変更を無視(デフォルト)
'fail': エラーで停止
'append_new_columns': 新カラムを追加
'sync_all_columns': 全カラムを同期

full_refresh

フルリフレッシュを強制:
コマンド: dbt run --full-refresh
テーブルを削除して再作成

📊 パフォーマンス比較

実行時間とコストの比較

シナリオ: 10億行のイベントテーブル、日次で100万行の新規データ追加
TABLEモデル(フルリフレッシュ)
処理時間: 45分 | コスト: $50 | スキャン: 500 GB
Incrementalモデル(差分更新)
処理時間: 2分 | コスト: $0.5 | スキャン: 5 GB
指標 TABLEモデル Incrementalモデル 改善率
初回実行 45分 45分 同じ
2回目以降 45分 2分 95%短縮
スキャン量 500 GB 5 GB 99%削減
コスト/日 $50 $0.5 99%削減
月間コスト $1,500 $15 $1,485節約

🎯 ユースケース

1️⃣ イベントログの蓄積

シナリオ: Webアプリのクリックストリームデータを蓄積

実装:
  • 戦略: append
  • パーティション: event_date
  • フィルタ: WHERE event_timestamp > MAX(event_timestamp)
  • 頻度: 毎時実行
効果: 毎時数百万件のイベントを高速処理、低コスト

2️⃣ 顧客マスタの更新

シナリオ: CRMからの顧客データをデータウェアハウスに同期

実装:
  • 戦略: merge
  • unique_key: customer_id
  • フィルタ: WHERE updated_at > MAX(updated_at)
  • 頻度: 毎15分実行
効果: 変更された顧客データのみ更新、リアルタイムに近い同期

3️⃣ 日次売上集計

シナリオ: 日別・商品別の売上サマリを計算

実装:
  • 戦略: delete+insert
  • パーティション: sale_date
  • フィルタ: 過去7日分を再計算(遅延データ対応)
  • 頻度: 毎日1回実行
効果: 遅延到着データも正確に反映、過去分も再計算可能

4️⃣ CDC(Change Data Capture)

シナリオ: データベースの変更ログをキャプチャ

実装:
  • 戦略: merge
  • unique_key: プライマリキー
  • フィルタ: WHERE log_timestamp > MAX(log_timestamp)
  • 削除フラグ: is_incremental() and is_deletedで削除処理
効果: データベースの変更をリアルタイムに反映

5️⃣ 時系列メトリクス

シナリオ: 1時間ごとのシステムメトリクスを集計

実装:
  • 戦略: insert_overwrite
  • パーティション: metric_hour(hourly granularity)
  • フィルタ: 過去24時間分を再計算
  • 頻度: 毎時実行
効果: 時間単位でパーティションを上書き、効率的な更新

⚠️ 注意点とベストプラクティス

項目 推奨事項
フィルタ条件 必ず{% if is_incremental() %}内にWHERE句を記述。初回実行では全データ取得が必要
パーティション設定 日付カラムでパーティション化すると、delete+insertやinsert_overwriteが効率的
unique_key選択 merge戦略では適切なキーを選択。複合キーも可能
遅延データ対応 過去数日分をlookback_windowで再処理することで遅延データをキャッチ
フルリフレッシュ 定期的に--full-refreshでテーブルを再構築し、データ品質を維持
モニタリング 処理行数、実行時間、エラー率を監視。異常があればフルリフレッシュ
スキーマ変更 on_schema_changeを適切に設定し、カラム追加・削除に対応
🚨 Incrementalが適さないケース:

✅ Incrementalを使うべきケース

🔧 高度なテクニック

1. Lookback Window(遅延データ対応)

-- 過去7日分を常に再計算して遅延データをキャッチ {% if is_incremental() %} WHERE event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) {% endif %}

2. 条件付きロジック

-- 初回実行時のみ履歴データ全体を取得、2回目以降は差分のみ {% if is_incremental() %} WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }}) {% else %} -- 初回は過去1年分のみ取得(全データは不要な場合) WHERE updated_at >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 YEAR) {% endif %}

3. 削除レコードの処理

-- 論理削除フラグを使用 {{ config( materialized='incremental', incremental_strategy='merge', unique_key='id' ) }} SELECT id, name, is_deleted, updated_at FROM {{ source('raw', 'users') }} {% if is_incremental() %} WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }}) -- 削除されたレコードも含める OR is_deleted = TRUE {% endif %}

4. カスタムマクロでの共通化

-- macros/incremental_filter.sql {% macro incremental_filter(column_name, lookback_days=0) %} {% if is_incremental() %} WHERE {{ column_name }} > ( SELECT DATE_SUB(MAX({{ column_name }}), INTERVAL {{ lookback_days }} DAY) FROM {{ this }} ) {% endif %} {% endmacro %} -- models/my_model.sql SELECT * FROM {{ source('raw', 'events') }} {{ incremental_filter('event_date', lookback_days=7) }}

📚 まとめ

🎓 dbt Incremental Modelの重要ポイント

💡 使い分けの基準:
  • TABLE: 小規模データ(〜数十万行)、開発初期、複雑なロジック
  • INCREMENTAL: 大規模データ(数百万行以上)、頻繁な更新、コスト重視
  • VIEW: 軽量な変換のみ、常に最新データが必要
Incremental Modelにより、
大規模データパイプラインを高速・低コストで実現できます!