🔄 Dataflow 主要変換処理

GroupBy、ParDo、SideInputの仕組みと使い方

📦 GroupBy変換

GroupBy(GroupByKey)は、同じキーを持つ要素をグループ化する変換です。MapReduceのShuffle処理に相当します。

🎯 核心概念: キーバリューペア (key, value) を受け取り、(key, [value1, value2, ...]) に変換します。

GroupByの動作イメージ

変換前: 個別の要素
('apple', 5)
('banana', 3)
('apple', 2)
('orange', 7)
('banana', 4)
('apple', 1)
変換後: キーごとにグループ化
🍎 'apple'
[5, 2, 1]
🍌 'banana'
[3, 4]
🍊 'orange'
[7]
# Apache Beam (Dataflow) での実装 import apache_beam as beam with beam.Pipeline() as p: # 単語のカウント例 word_counts = ( p | 'Create' >> beam.Create([ 'apple banana apple', 'banana orange apple', 'orange orange' ]) | 'Split' >> beam.FlatMap(lambda line: line.split()) | 'PairWithOne' >> beam.Map(lambda word: (word, 1)) | 'GroupByKey' >> beam.GroupByKey() # ← ここでグループ化 | 'Sum' >> beam.Map(lambda kv: (kv[0], sum(kv[1]))) ) # GroupByKey後のデータ形式: # ('apple', [1, 1, 1]) → ('apple', 3) # ('banana', [1, 1]) → ('banana', 2) # ('orange', [1, 1]) → ('orange', 2)
⚠️ 重要な注意点:

🎯 典型的なユースケース

⚙️ ParDo変換

ParDo (Parallel Do) は、各要素に対して任意の処理を並列実行する最も基本的で柔軟な変換です。MapReduceのMapに相当しますが、より強力です。

🎯 核心概念: DoFnクラスを使って、1つの入力要素から0個、1個、または複数の出力要素を生成できます。

ParDoの動作イメージ

入力
"Hello World"
"Apache Beam"
"Data Flow"
ParDo
各要素を処理
出力
"hello", "world"
"apache", "beam"
"data", "flow"

1対1の変換 (Map的な使い方)

class MultiplyByTwoFn(beam.DoFn): def process(self, element): yield element * 2 # 使用例 numbers = p | beam.Create([1, 2, 3, 4]) doubled = numbers | beam.ParDo(MultiplyByTwoFn()) # 結果: [2, 4, 6, 8]

1対多の変換 (FlatMap的な使い方)

class SplitWordsFn(beam.DoFn): def process(self, element): for word in element.split(): yield word.lower() # 使用例 lines = p | beam.Create([ 'Hello World', 'Apache Beam' ]) words = lines | beam.ParDo(SplitWordsFn()) # 結果: ['hello', 'world', 'apache', 'beam']

🔧 高度な使い方: Setup/Teardown、状態管理

class EnrichWithDatabaseFn(beam.DoFn): # 初期化処理(ワーカーごとに1回実行) def setup(self): self.db_connection = connect_to_database() # 終了処理(ワーカーごとに1回実行) def teardown(self): self.db_connection.close() # 各要素の処理 def process(self, element): # データベースから追加情報を取得 enriched_data = self.db_connection.query(element) yield enriched_data # Stateを使った集計 class StatefulCountFn(beam.DoFn): COUNT_STATE = beam.DoFn.StateParam( beam.coders.VarIntCoder() ) def process(self, element, count_state=COUNT_STATE): count = count_state.read() or 0 count += 1 count_state.write(count) yield (element, count)
💡 ParDoの強力な機能:

🎯 典型的なユースケース

📥 SideInput(サイド入力)

SideInputは、メインのデータパイプラインに追加のデータを提供する仕組みです。ParDo処理中に参照データやメタデータにアクセスできます。

🎯 核心概念: メインの入力とは別に、補助的なデータ(辞書、リスト、設定値など)を全ての要素処理で共有します。

SideInputの動作イメージ

📋 Side Input (補助データ)
{'USD': 110, 'EUR': 130}
為替レート辞書
Main Input
('USD', 100)
('EUR', 50)
('USD', 200)
ParDo with
SideInput
Output
11,000円
6,500円
22,000円
# Side Inputの実装例 import apache_beam as beam from apache_beam.pvalue import AsDict, AsList, AsSingleton class EnrichWithExchangeRateFn(beam.DoFn): def process(self, element, rates_dict): currency, amount = element # Side Inputから為替レートを取得 rate = rates_dict[currency] jpy_amount = amount * rate yield (currency, amount, jpy_amount) with beam.Pipeline() as p: # メインデータ: 外貨金額 transactions = p | 'CreateTransactions' >> beam.Create([ ('USD', 100), ('EUR', 50), ('USD', 200), ]) # Side Input: 為替レート exchange_rates = p | 'CreateRates' >> beam.Create([ ('USD', 110), ('EUR', 130), ]) # Side Inputを辞書として変換 rates_dict = beam.pvalue.AsDict(exchange_rates) # メインデータにSide Inputを適用 enriched = transactions | beam.ParDo( EnrichWithExchangeRateFn(), rates_dict=rates_dict # ← Side Inputを渡す )
1
Side Inputの作成
別のPCollectionとして補助データを準備
2
ビュー変換
AsDict, AsList, AsSingletonで適切な形式に変換
3
ParDoで参照
process()メソッドの引数として受け取り、各要素処理で使用

Side Inputのビュー形式

ビュー形式 説明 使用例
AsDict PCollectionを辞書に変換
(key, value)ペアが必要
マスタデータの参照、設定値の取得
AsList PCollectionをリストに変換
全要素を順序付きで保持
ホワイトリスト、ブラックリストの参照
AsSingleton PCollectionを単一の値に変換
1要素のみを含む必要がある
グローバル設定値、集計結果の参照
AsIter PCollectionをイテレータに変換
大量データの効率的な走査
大きなデータセットの逐次処理

例1: マスタデータとのJOIN

class JoinWithMasterFn(beam.DoFn): def process(self, element, master_dict): user_id, value = element user_info = master_dict.get(user_id) if user_info: yield { 'user_id': user_id, 'value': value, 'name': user_info['name'], 'email': user_info['email'] } # ユーザーマスタをSide Inputとして使用 user_master = (p | beam.io.ReadFromBigQuery(table='master.users') | beam.Map(lambda x: (x['user_id'], x)) ) master_dict = beam.pvalue.AsDict(user_master) transactions | beam.ParDo( JoinWithMasterFn(), master_dict=master_dict )

例2: ブラックリストフィルタリング

class FilterBlacklistFn(beam.DoFn): def process(self, element, blacklist): user_id = element['user_id'] if user_id not in blacklist: yield element # ブラックリストをSide Inputとして使用 blacklist_users = (p | beam.io.ReadFromText('blacklist.txt') | beam.Map(lambda x: x.strip()) ) blacklist_set = beam.pvalue.AsList(blacklist_users) events | beam.ParDo( FilterBlacklistFn(), blacklist=blacklist_set )
⚠️ Side Inputの制約と注意点:

🎯 典型的なユースケース

⚖️ 3つの変換の比較

GroupBy

目的: データの集約
入力: (K, V)ペア
出力: (K, [V])
特徴: シャッフル処理
コスト: 高い

ParDo

目的: 要素単位の処理
入力: 任意の要素
出力: 0個以上の要素
特徴: 並列実行
コスト: 低い

SideInput

目的: 補助データ参照
入力: 別PCollection
出力: ビュー形式
特徴: 全要素で共有
コスト: 中程度

組み合わせパターン

パターン1: GroupBy → ParDo

# ユーザーごとにイベントを集約し、統計を計算 (events | beam.Map(lambda e: (e['user_id'], e)) | beam.GroupByKey() # ユーザーごとにグループ化 | beam.ParDo(CalculateUserStatsFn()) # 各ユーザーの統計を計算 )

パターン2: ParDo with SideInput

# イベントをマスタデータでエンリッチしてからフィルタリング master_dict = beam.pvalue.AsDict(master_data) blacklist = beam.pvalue.AsList(blacklist_data) (events | beam.ParDo(EnrichFn(), master_dict=master_dict) | beam.ParDo(FilterFn(), blacklist=blacklist) )

パターン3: ParDo → GroupBy → ParDo

# 完全なETLパイプライン (raw_data | beam.ParDo(ParseAndCleanFn()) # データクレンジング | beam.Map(lambda x: (x['key'], x)) | beam.GroupByKey() # キーごとに集約 | beam.ParDo(AggregateFn()) # 集計処理 | beam.io.WriteToBigQuery(...) # 出力 )

💡 ベストプラクティス

✅ やるべきこと

  • GroupByKeyの前にCombinePerKeyで事前集約
  • ParDoでバッチ処理(複数要素をまとめて処理)
  • Side Inputは小さいデータに限定
  • setup/teardownで接続を再利用
  • 適切なウィンドウ設定でデータ量を制御

❌ 避けるべきこと

  • 不要なGroupByKey(シャッフルコストが高い)
  • ParDoで外部APIを同期呼び出し(遅い)
  • 巨大なSide Input(メモリ不足の原因)
  • 状態を持つグローバル変数の使用
  • setup/teardownの省略(リソースリーク)

🚀 パフォーマンス最適化のヒント

📚 まとめ

🎓 3つの変換の使い分け

これら3つを組み合わせることで、複雑なデータパイプラインを効率的に構築できます。 特に、ParDoとSideInputの組み合わせは、データエンリッチメントやフィルタリングで非常に強力です。