🔄 Dataflow Update & Drain

パイプラインの更新と正常停止

📖 概要

🎯 Update と Drain とは

Google Cloud Dataflowには、実行中のパイプラインを管理するための2つの重要な機能があります:

❌ Cancel(キャンセル)
動作:
即座にパイプラインを停止
⚠️ 結果:
  • 処理中データが失われる
  • 状態が保存されない
  • 即座に停止
  • データ損失リスク
✅ Update(更新)
動作:
ダウンタイムなしで
新バージョンに移行
✅ 結果:
  • データ損失なし
  • 状態を引き継ぎ
  • 継続的に実行
  • ダウンタイムなし
🚰 Drain(ドレイン)
動作:
処理中データを完了後
正常に停止
⏱️ 結果:
  • 処理中データを完了
  • 新規データは拒否
  • 正常に終了
  • 再起動可能

🔄 Update(更新)

💡 Updateとは

Updateは、実行中のストリーミングパイプラインを停止せずに、新しいバージョンのパイプラインに置き換える機能です。 既存のパイプラインの状態(ウィンドウ、集計、タイマーなど)を新しいパイプラインに引き継ぎます。

Updateの処理フロー

1
新しいパイプラインコードをデプロイ
--updateフラグを指定して新しいパイプラインを起動。
既存のJob IDを指定して更新を実行。
2
互換性チェック
Dataflowが新旧パイプラインの互換性を検証。
ステップ名、状態、変換の一貫性を確認。
3
状態の移行
既存パイプラインの状態(ウィンドウ状態、タイマー、バッファ等)を新しいパイプラインにコピー。
データ損失なしで引き継ぎ。
4
新しいパイプラインで処理継続
新しいパイプラインが既存の状態を使って処理を継続。
古いパイプラインは自動的に停止。
5
更新完了
新しいバージョンで継続的に実行。
ダウンタイムなし、データ損失なし。

Updateの実行方法

gcloudコマンド:
# 既存のジョブを更新 gcloud dataflow jobs update JOB_ID \ --update \ --region=us-central1 \ --gcs-location=gs://my-bucket/templates/my-template # 既存ジョブIDの確認 gcloud dataflow jobs list \ --region=us-central1 \ --filter="state=Running"
Python SDK:
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # パイプラインオプションにupdateフラグを設定 pipeline_options = PipelineOptions([ '--runner=DataflowRunner', '--project=my-project', '--region=us-central1', '--temp_location=gs://my-bucket/temp', '--staging_location=gs://my-bucket/staging', '--update', # 更新フラグ '--job_name=my-streaming-job' # 既存ジョブと同じ名前 ]) with beam.Pipeline(options=pipeline_options) as pipeline: # 新しいパイプライン定義 ...
Java SDK:
// パイプラインオプションの設定 DataflowPipelineOptions options = PipelineOptionsFactory .as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setProject("my-project"); options.setRegion("us-central1"); options.setUpdate(true); // 更新フラグ options.setJobName("my-streaming-job"); // 既存ジョブ名 Pipeline pipeline = Pipeline.create(options); // 新しいパイプライン定義 ... pipeline.run();

✅ Updateが可能な変更

❌ Updateができない変更:

これらの変更が必要な場合は、Drainして新しいパイプラインを起動する必要があります。

🚰 Drain(ドレイン)

💡 Drainとは

Drainは、実行中のパイプラインを正常に停止する機能です。 新しいデータの受け入れを停止し、処理中のデータをすべて完了してからパイプラインを終了します。

Drainのタイムライン

T+0
Drain開始
新規データの受け入れを停止。
処理中のデータは継続。
T+数秒
パイプライン処理継続
バッファ内のデータを処理。
ウィンドウがクローズされる。
T+数分
最終データの書き込み
すべての処理が完了。
最終結果をシンクに書き込み。
T+完了
パイプライン停止
Drained状態に遷移。
リソースが解放される。

Drainの実行方法

gcloudコマンド:
# Drainを実行 gcloud dataflow jobs drain JOB_ID \ --region=us-central1 # 複数のジョブをまとめてDrain gcloud dataflow jobs list \ --filter="state=Running" \ --format="value(id)" \ | xargs -I {} gcloud dataflow jobs drain {} --region=us-central1
REST API:
# Drain状態に更新 curl -X PUT \ https://dataflow.googleapis.com/v1b3/projects/{PROJECT_ID}/locations/{REGION}/jobs/{JOB_ID} \ -H 'Authorization: Bearer $(gcloud auth print-access-token)' \ -H 'Content-Type: application/json' \ -d '{ "requestedState": "JOB_STATE_DRAINING" }'
Python SDK(プログラムから):
from google.cloud import dataflow_v1beta3 client = dataflow_v1beta3.JobsV1Beta3Client() # Drain状態に更新 request = dataflow_v1beta3.UpdateJobRequest( project_id="my-project", location="us-central1", job_id="my-job-id", job={ "id": "my-job-id", "requestedState": dataflow_v1beta3.JobState.JOB_STATE_DRAINING } ) response = client.update_job(request=request) print(f"Job is now draining: {response.id}")

✅ Drainのメリット

⚠️ Drainの注意点:

🎯 使い分けガイド

どの操作を使うべきか?

パイプラインコードを変更したい
ダウンタイムは許容できる?
YES
🚰 Drain
1. 既存ジョブをDrain
2. 完了を待つ
3. 新しいジョブを起動
NO
変更は互換性がある?
YES
🔄 Update
即座に更新
NO
🚰 Drain
+ 新規起動
シナリオ 推奨操作 理由
DoFnのバグ修正 Update ロジック変更のみ、状態は維持、ダウンタイムなし
新しい変換ステップを追加 Update 互換性あり、既存パイプラインを拡張
ステップ名を変更したい Drain + 新規 状態の互換性なし、再起動が必要
パイプライン全体の再設計 Drain + 新規 大規模変更、状態を引き継げない
一時的な停止(メンテナンス) Drain データ損失なく停止、後で再起動
緊急停止(コスト削減) Cancel 即座に停止が必要、データ損失OK
Pub/SubサブスクリプションURLの変更 Drain + 新規 ソース変更、再起動が必要
BigQuery出力テーブルの変更 Update シンク変更は互換性あり

📊 実際の運用例

1️⃣ バグ修正のデプロイ

シナリオ: 本番環境のストリーミングパイプラインでバグを発見

手順:
  1. 修正したコードをテスト環境で検証
  2. --updateフラグで本番にデプロイ
  3. 既存ジョブが自動的に新バージョンに置き換わる
  4. データ損失なし、ダウンタイムなし
メリット: 24時間365日稼働のパイプラインでも安全に修正可能

2️⃣ 新機能の追加

シナリオ: 既存パイプラインに新しい集計ロジックを追加

手順:
  1. 新しいブランチ(GroupByKey、Combine等)を追加
  2. --updateで段階的にロールアウト
  3. 既存の処理は継続、新しい処理が開始
  4. 問題があれば即座にロールバック(再度Update)
メリット: カナリアデプロイメント、A/Bテスト可能

3️⃣ メンテナンス時の一時停止

シナリオ: BigQueryの大規模なスキーマ変更を実施

手順:
  1. パイプラインをDrain(処理中データを完了)
  2. BigQueryテーブルのスキーマを変更
  3. 新しいスキーマに対応したパイプラインを起動
  4. Pub/Subバックログから処理再開
メリット: データ損失なし、整合性維持

4️⃣ パイプラインの大規模リファクタリング

シナリオ: パイプライン全体を新しいアーキテクチャに移行

手順:
  1. 新しいPub/Subサブスクリプションを作成
  2. 旧パイプラインと並行して新パイプラインを起動
  3. 新パイプラインの動作を検証
  4. 旧パイプラインをDrain
  5. 旧サブスクリプションを削除
メリット: ブルーグリーンデプロイメント、リスク最小化

5️⃣ コスト最適化のための一時停止

シナリオ: 夜間はデータ量が少ないため一時停止してコスト削減

手順:
  1. Cloud Schedulerで定期実行ジョブを設定
  2. 23:00にDrainを実行(処理完了まで待機)
  3. 夜間はPub/Subにデータが蓄積
  4. 07:00に新しいパイプラインを起動
  5. バックログから処理再開
メリット: 夜間のコンピュートコストをゼロに

⚙️ ベストプラクティス

項目 推奨事項
ステップ名の一貫性 Updateを使う場合、ステップ名を変更しない。状態が失われる。
段階的ロールアウト まずステージング環境でUpdateをテスト、問題なければ本番に適用。
バックログ管理 Drain中もPub/Subバックログが蓄積。再起動時に処理される。
モニタリング Update/Drain中のメトリクスを監視。異常があればロールバック。
ドキュメント化 互換性のある変更/不可能な変更をドキュメント化。
ロールバック計画 Update失敗時のロールバック手順を事前に準備。
状態の互換性 カスタム状態を使う場合、前方互換性を維持。
テスト Updateの互換性テストを自動化(CI/CD統合)。
🚨 よくある落とし穴:

📚 まとめ

🎓 Update & Drainの重要ポイント

🔄 Update(更新)

  • ダウンタイムなし
  • 状態を引き継ぎ
  • データ損失なし
  • 互換性のある変更のみ

🚰 Drain(ドレイン)

  • 正常に停止
  • 処理中データを完了
  • 再起動可能
  • 大規模変更に対応
💡 使い分けの原則:
  • Update優先: ダウンタイムを避けたい場合
  • Drain使用: 互換性のない変更、大規模リファクタリング
  • Cancel使用: 緊急停止のみ(データ損失OK)
UpdateとDrainを適切に使い分けて、
安全で効率的なパイプライン運用を実現しましょう!