📦 GroupBy変換
GroupBy(GroupByKey)は、同じキーを持つ要素をグループ化する変換です。MapReduceのShuffle処理に相当します。
🎯 核心概念: キーバリューペア (key, value) を受け取り、(key, [value1, value2, ...]) に変換します。
GroupByの動作イメージ
変換前: 個別の要素
('apple', 5)
('banana', 3)
('apple', 2)
('orange', 7)
('banana', 4)
('apple', 1)
→
import apache_beam as beam
with beam.Pipeline() as p:
word_counts = (
p
| 'Create' >> beam.Create([
'apple banana apple',
'banana orange apple',
'orange orange'
])
| 'Split' >> beam.FlatMap(lambda line: line.split())
| 'PairWithOne' >> beam.Map(lambda word: (word, 1))
| 'GroupByKey' >> beam.GroupByKey()
| 'Sum' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
)
⚠️ 重要な注意点:
- GroupByKeyはシャッフル処理を伴うため、コストが高い
- 入力データは必ず(key, value)のペアである必要がある
- キーごとに値がメモリに集約されるため、大量の値を持つキーはホットスポットになる可能性
🎯 典型的なユースケース
- ログ集計: ユーザーIDごとにアクセスログを集約
- WordCount: 単語ごとに出現回数をカウント
- セッション分析: セッションIDごとにイベントをまとめる
- 売上集計: 商品IDごとに売上データを集約
⚙️ ParDo変換
ParDo (Parallel Do) は、各要素に対して任意の処理を並列実行する最も基本的で柔軟な変換です。MapReduceのMapに相当しますが、より強力です。
🎯 核心概念: DoFnクラスを使って、1つの入力要素から0個、1個、または複数の出力要素を生成できます。
ParDoの動作イメージ
→
ParDo
各要素を処理
→
出力
"hello", "world"
"apache", "beam"
"data", "flow"
1対1の変換 (Map的な使い方)
class MultiplyByTwoFn(beam.DoFn):
def process(self, element):
yield element * 2
numbers = p | beam.Create([1, 2, 3, 4])
doubled = numbers | beam.ParDo(MultiplyByTwoFn())
1対多の変換 (FlatMap的な使い方)
class SplitWordsFn(beam.DoFn):
def process(self, element):
for word in element.split():
yield word.lower()
lines = p | beam.Create([
'Hello World',
'Apache Beam'
])
words = lines | beam.ParDo(SplitWordsFn())
🔧 高度な使い方: Setup/Teardown、状態管理
class EnrichWithDatabaseFn(beam.DoFn):
def setup(self):
self.db_connection = connect_to_database()
def teardown(self):
self.db_connection.close()
def process(self, element):
enriched_data = self.db_connection.query(element)
yield enriched_data
class StatefulCountFn(beam.DoFn):
COUNT_STATE = beam.DoFn.StateParam(
beam.coders.VarIntCoder()
)
def process(self, element, count_state=COUNT_STATE):
count = count_state.read() or 0
count += 1
count_state.write(count)
yield (element, count)
💡 ParDoの強力な機能:
- 柔軟な出力: 1要素から0個、1個、複数個の出力が可能
- 副作用: ログ出力、外部API呼び出しなどが可能
- ライフサイクル: setup()とteardown()でリソース管理
- 状態管理: State APIで要素間の状態を保持可能
- タイマー: Timer APIで時間ベースの処理が可能
🎯 典型的なユースケース
- データクレンジング: 不正なデータのフィルタリングや正規化
- データ変換: フォーマット変換、フィールド追加・削除
- 外部API呼び出し: APIからデータを取得してエンリッチメント
- 複雑なパース処理: JSON、XML、ログファイルのパース
- データ検証: バリデーションルールの適用
📥 SideInput(サイド入力)
SideInputは、メインのデータパイプラインに追加のデータを提供する仕組みです。ParDo処理中に参照データやメタデータにアクセスできます。
🎯 核心概念: メインの入力とは別に、補助的なデータ(辞書、リスト、設定値など)を全ての要素処理で共有します。
import apache_beam as beam
from apache_beam.pvalue import AsDict, AsList, AsSingleton
class EnrichWithExchangeRateFn(beam.DoFn):
def process(self, element, rates_dict):
currency, amount = element
rate = rates_dict[currency]
jpy_amount = amount * rate
yield (currency, amount, jpy_amount)
with beam.Pipeline() as p:
transactions = p | 'CreateTransactions' >> beam.Create([
('USD', 100),
('EUR', 50),
('USD', 200),
])
exchange_rates = p | 'CreateRates' >> beam.Create([
('USD', 110),
('EUR', 130),
])
rates_dict = beam.pvalue.AsDict(exchange_rates)
enriched = transactions | beam.ParDo(
EnrichWithExchangeRateFn(),
rates_dict=rates_dict
)
1
Side Inputの作成
別のPCollectionとして補助データを準備
2
ビュー変換
AsDict, AsList, AsSingletonで適切な形式に変換
3
ParDoで参照
process()メソッドの引数として受け取り、各要素処理で使用
Side Inputのビュー形式
| ビュー形式 |
説明 |
使用例 |
| AsDict |
PCollectionを辞書に変換 (key, value)ペアが必要 |
マスタデータの参照、設定値の取得 |
| AsList |
PCollectionをリストに変換 全要素を順序付きで保持 |
ホワイトリスト、ブラックリストの参照 |
| AsSingleton |
PCollectionを単一の値に変換 1要素のみを含む必要がある |
グローバル設定値、集計結果の参照 |
| AsIter |
PCollectionをイテレータに変換 大量データの効率的な走査 |
大きなデータセットの逐次処理 |
例1: マスタデータとのJOIN
class JoinWithMasterFn(beam.DoFn):
def process(self, element, master_dict):
user_id, value = element
user_info = master_dict.get(user_id)
if user_info:
yield {
'user_id': user_id,
'value': value,
'name': user_info['name'],
'email': user_info['email']
}
user_master = (p
| beam.io.ReadFromBigQuery(table='master.users')
| beam.Map(lambda x: (x['user_id'], x))
)
master_dict = beam.pvalue.AsDict(user_master)
transactions | beam.ParDo(
JoinWithMasterFn(),
master_dict=master_dict
)
例2: ブラックリストフィルタリング
class FilterBlacklistFn(beam.DoFn):
def process(self, element, blacklist):
user_id = element['user_id']
if user_id not in blacklist:
yield element
blacklist_users = (p
| beam.io.ReadFromText('blacklist.txt')
| beam.Map(lambda x: x.strip())
)
blacklist_set = beam.pvalue.AsList(blacklist_users)
events | beam.ParDo(
FilterBlacklistFn(),
blacklist=blacklist_set
)
⚠️ Side Inputの制約と注意点:
- メモリに収まる必要: Side Inputは全ワーカーのメモリに展開されるため、大きすぎるデータは使用不可
- 推奨サイズ: 通常は数MB~数百MB程度まで。数GB以上は避けるべき
- 更新頻度: ストリーミングでは定期的に更新される(ウィンドウごと)
- 代替手段: 大きなデータはCoGroupByKeyやBigQueryの直接参照を検討
🎯 典型的なユースケース
- マスタデータ参照: ユーザー情報、商品情報、カテゴリマスタとの結合
- 設定値の適用: グローバル設定、閾値、パラメータの参照
- フィルタリング: ホワイトリスト/ブラックリストによるフィルタ
- データエンリッチメント: 追加属性の付与、分類ラベルの追加
- 正規化・変換: 辞書ベースの値変換、コード変換
⚖️ 3つの変換の比較
GroupBy
目的: データの集約
入力: (K, V)ペア
出力: (K, [V])
特徴: シャッフル処理
コスト: 高い
ParDo
目的: 要素単位の処理
入力: 任意の要素
出力: 0個以上の要素
特徴: 並列実行
コスト: 低い
SideInput
目的: 補助データ参照
入力: 別PCollection
出力: ビュー形式
特徴: 全要素で共有
コスト: 中程度
組み合わせパターン
パターン1: GroupBy → ParDo
(events
| beam.Map(lambda e: (e['user_id'], e))
| beam.GroupByKey()
| beam.ParDo(CalculateUserStatsFn())
)
パターン2: ParDo with SideInput
master_dict = beam.pvalue.AsDict(master_data)
blacklist = beam.pvalue.AsList(blacklist_data)
(events
| beam.ParDo(EnrichFn(), master_dict=master_dict)
| beam.ParDo(FilterFn(), blacklist=blacklist)
)
パターン3: ParDo → GroupBy → ParDo
(raw_data
| beam.ParDo(ParseAndCleanFn())
| beam.Map(lambda x: (x['key'], x))
| beam.GroupByKey()
| beam.ParDo(AggregateFn())
| beam.io.WriteToBigQuery(...)
)