🔀 Dataflow のシャッフル(Shuffle)とは?

📌 シャッフルの基本概念

シャッフル(Shuffle)とは、分散処理においてデータをキーごとに再分配(再グループ化)する操作のことです。

🎯 簡単に言うと:

複数のワーカー(処理マシン)に分散していたデータを、特定のキーに基づいて再配置し、同じキーを持つデータを同じワーカーに集める処理です。

🔍 シャッフルが発生する典型的な操作

GroupByKey

キーごとにデータをグループ化

# シャッフルが発生 data | beam.GroupByKey()

CoGroupByKey

複数のデータセットをキーで結合

# シャッフルが発生 {'orders': orders, 'users': users} \ | beam.CoGroupByKey()

Combine(グローバル)

全データを集約

# シャッフルが発生 data | beam.CombineGlobally(sum)

Distinct

重複を除去

# シャッフルが発生 data | beam.Distinct()

🎬 シャッフルの動作イメージ

GroupByKeyの例:顧客IDごとに購入履歴を集約

▼ シャッフル前(入力データが分散)

Worker 1
A: りんご
B: みかん
C: バナナ
Worker 2
A: ぶどう
C: いちご
B: メロン
Worker 3
A: もも
B: なし
C: キウイ
⬇️ シャッフル(データ再配置)⬇️
🌐 ネットワーク転送

▼ シャッフル後(キーごとにグループ化)

Worker 1
A: りんご
A: ぶどう
A: もも

A: [りんご, ぶどう, もも]
Worker 2
B: みかん
B: メロン
B: なし

B: [みかん, メロン, なし]
Worker 3
C: バナナ
C: いちご
C: キウイ

C: [バナナ, いちご, キウイ]

⚙️ シャッフルのプロセス詳細

1 データのパーティショニング(分割)

各ワーカーが保持しているデータを、キーのハッシュ値に基づいて送信先ワーカーを決定します。

2 中間データの書き込み

送信するデータを一時的にディスク(または Shuffle Service)に書き込みます。

3 ネットワーク転送

ワーカー間でデータを転送します。これが最もコストがかかる部分です。

4 データの受信とソート

受信側ワーカーがデータを受け取り、キーごとにソートして処理可能な状態にします。

5 後続処理の実行

グループ化されたデータに対して、次の変換処理を実行します。

🚀 Dataflow Shuffle vs Batch Shuffle

項目 Batch Shuffle(従来型) Dataflow Shuffle(最適化版)
データの保存先 各ワーカーのローカルディスク Dataflow Shuffle Service(専用サービス)
ワーカー間通信 ワーカー同士が直接通信 Shuffle Service経由で集中管理
耐障害性 ワーカー障害時に再計算が必要 Shuffle Service にデータが残るため再計算不要
パフォーマンス ワーカー数が増えると効率低下 大規模でも高効率
コスト ワーカーのCPU/メモリリソースを消費 Shuffle Service利用料が発生(効率的)
スケーラビリティ ワーカー数に限界あり 数千ワーカーでもスケール可能

⚠️ シャッフルのコストとパフォーマンス影響

🔥 シャッフルが高コストな理由

📊 パフォーマンスへの影響例

例:1TB のデータを GroupByKey で処理する場合

※ データの分散状況やキーの偏りによって大きく変動します

💡 シャッフルを最適化するベストプラクティス

✅ やるべきこと

❌ 避けるべきこと

📝 コード例:最適化前 vs 最適化後

❌ 非効率なコード

import apache_beam as beam # 各ユーザーの購入総額を計算 result = ( p | beam.io.ReadFromText('purchases.csv') | beam.Map(lambda x: x.split(',')) | beam.Map(lambda x: (x[0], float(x[1]))) | beam.GroupByKey() # シャッフル発生 | beam.Map(lambda x: (x[0], sum(x[1]))) )

問題点: 全データをシャッフルしてから集約している

✅ 最適化されたコード

import apache_beam as beam # CombinePerKey で効率的に集約 result = ( p | beam.io.ReadFromText('purchases.csv') | beam.Map(lambda x: x.split(',')) | beam.Map(lambda x: (x[0], float(x[1]))) | beam.CombinePerKey(sum) # 効率的 )

改善点: 事前集約により転送データ量を大幅削減

🎯 まとめ

シャッフルの重要ポイント

💡 実務での心構え:

大崎のオフィスでDataflowパイプラインを設計する際は、シャッフルが発生する箇所を意識し、データ量が大きい場合は必ず最適化を検討しましょう!特にGCP Data Engineer試験でも頻出のテーマです。