⏱️ タンブリングウィンドウ (Tumbling Window)

固定サイズで重複しないウィンドウによるストリーム処理

📖 タンブリングウィンドウとは

🎯 概要

タンブリングウィンドウ(Tumbling Window)は、固定サイズで重複しないウィンドウを使ってストリームデータを区切る方法です。 各ウィンドウは連続しており、データポイントは必ず1つのウィンドウにのみ属します。

タンブリングウィンドウの視覚化

例: 5分間のタンブリングウィンドウ
10:00
10:05
10:10
10:15
10:20
10:25
Window 1
[10:00-10:05)
Window 2
[10:05-10:10)
Window 3
[10:10-10:15)
Window 4
[10:15-10:20)
Window 5
[10:20-10:25)
✅ 特徴:
  • 各ウィンドウは正確に5分間
  • ウィンドウ間に隙間やオーバーラップなし
  • イベントは1つのウィンドウにのみ属する
  • 10:05のイベントはWindow 2に属する(左閉右開区間: [10:05, 10:10))

🔍 タンブリングウィンドウの特性

📏

固定サイズ

すべてのウィンドウが同じ長さ。
例: 1分、5分、1時間、1日など。
ウィンドウサイズは変わらない。
🚫

重複なし

各データポイントは1つのウィンドウにのみ属する。
ウィンドウ間でデータの重複なし。
効率的なメモリ使用。
⏱️

時間ベース

イベント時間またはプロセッシング時間で区切る。
タイムスタンプに基づいてウィンドウ割り当て。
遅延データの扱いに注意。
🔗

連続性

前のウィンドウ終了 = 次のウィンドウ開始。
隙間なく連続。
データの欠損なし。

🆚 他のウィンドウタイプとの比較

タンブリング
特徴:
• 固定サイズ
• 重複なし
• 連続

メモリ:
計算量:
遅延:
図解:
[---][---][---][---]
ホッピング
特徴:
• 固定サイズ
• 重複あり
• ホップ間隔で移動

メモリ:
計算量:
遅延:
図解:
[-----]
  [-----]
    [-----]
スライディング
特徴:
• 固定サイズ
• 連続的にスライド
• イベント毎に更新

メモリ:
計算量:
遅延:
図解:
[-----]
 [-----]
  [-----]
セッション
特徴:
• 可変サイズ
• アクティビティギャップで区切る
• タイムアウトベース

メモリ: 中〜高
計算量:
遅延: 可変
図解:
[--]   [----]  [-]
↑ギャップ↑ギャップ

💻 実装例

1. Apache Beam (Python)

import apache_beam as beam from apache_beam import window def process_with_tumbling_window(): with beam.Pipeline() as pipeline: ( pipeline | 'ReadFromPubSub' >> beam.io.ReadFromPubSub( subscription='projects/my-project/subscriptions/my-sub' ) | 'ParseJSON' >> beam.Map(lambda x: json.loads(x)) | 'ExtractTimestamp' >> beam.Map( lambda x: beam.window.TimestampedValue( x, x['timestamp'] ) ) # 5分間のタンブリングウィンドウ | 'WindowInto' >> beam.WindowInto( window.FixedWindows(5 * 60) # 5分 = 300秒 ) | 'CountPerWindow' >> beam.combiners.Count.PerElement() | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( table='project:dataset.windowed_counts' ) )

2. Apache Flink (Java)

import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; DataStream stream = ...; // 10分間のタンブリングウィンドウ stream .keyBy(event -> event.getUserId()) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .sum("value") .print(); // または処理時間ベース stream .keyBy(event -> event.getUserId()) .window(TumblingProcessingTimeWindows.of(Time.minutes(10))) .aggregate(new MyAggregateFunction()) .addSink(new MySink());

3. Apache Spark Structured Streaming

from pyspark.sql import SparkSession from pyspark.sql.functions import window, col spark = SparkSession.builder.appName("TumblingWindowExample").getOrCreate() # ストリーム読み込み df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load() # 10分間のタンブリングウィンドウで集計 windowed_counts = df \ .groupBy( window(col("timestamp"), "10 minutes"), col("user_id") ) \ .count() # 結果を出力 query = windowed_counts \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()

4. Google Cloud Dataflow (SQL)

-- 1時間のタンブリングウィンドウで集計 SELECT TUMBLE_START(event_timestamp, INTERVAL 1 HOUR) AS window_start, TUMBLE_END(event_timestamp, INTERVAL 1 HOUR) AS window_end, user_id, COUNT(*) AS event_count, SUM(amount) AS total_amount FROM pubsub.project.topic.events GROUP BY TUMBLE(event_timestamp, INTERVAL 1 HOUR), user_id

5. Kafka Streams (Java)

import org.apache.kafka.streams.kstream.TimeWindows; import java.time.Duration; KStream stream = builder.stream("events"); // 5分間のタンブリングウィンドウ stream .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) .count() .toStream() .to("windowed-counts");

📊 具体例: Webサイトのアクセスログ分析

5分間のタンブリングウィンドウでページビュー数を集計

シナリオ: リアルタイムでページビュー数を5分ごとに集計してダッシュボードに表示
入力データ(イベントストリーム): 10:01:30 - user_123 - /home - PageView 10:02:15 - user_456 - /products - PageView 10:03:45 - user_789 - /home - PageView 10:04:20 - user_123 - /cart - PageView 10:05:10 - user_456 - /checkout - PageView 10:06:30 - user_789 - /products - PageView 10:07:45 - user_123 - /home - PageView 10:09:20 - user_456 - /about - PageView 10:10:05 - user_789 - /contact - PageView

ウィンドウ 1: [10:00:00 - 10:05:00)

含まれるイベント:
• 10:01:30 - /home
• 10:02:15 - /products
• 10:03:45 - /home
• 10:04:20 - /cart

集計結果:
• 総ページビュー: 4
• ユニークユーザー: 3
• 人気ページ: /home (2回)

ウィンドウ 2: [10:05:00 - 10:10:00)

含まれるイベント:
• 10:05:10 - /checkout
• 10:06:30 - /products
• 10:07:45 - /home
• 10:09:20 - /about

集計結果:
• 総ページビュー: 4
• ユニークユーザー: 3
• 人気ページ: すべて1回ずつ

ウィンドウ 3: [10:10:00 - 10:15:00)

含まれるイベント:
• 10:10:05 - /contact

集計結果:
• 総ページビュー: 1
• ユニークユーザー: 1
• 人気ページ: /contact (1回)

🎯 ユースケース

1️⃣ リアルタイムダッシュボード

シナリオ: 5分ごとのWebサイトトラフィックをリアルタイム表示

実装:
  • 5分間のタンブリングウィンドウでページビュー集計
  • ウィンドウ終了時にダッシュボードを更新
  • 過去24時間分のウィンドウを表示
メリット: シンプル、予測可能、オーバーヘッド低

2️⃣ 時間別売上レポート

シナリオ: ECサイトの1時間ごとの売上集計

実装:
  • 1時間のタンブリングウィンドウで注文を集計
  • 売上金額、注文数、平均単価を計算
  • BigQueryに保存してLookerで可視化
メリット: 正確、重複なし、時系列分析が容易

3️⃣ IoTセンサーデータ集計

シナリオ: 温度センサーの10分ごとの平均値を記録

実装:
  • 10分間のタンブリングウィンドウで温度データを集計
  • 平均、最小、最大を計算
  • 異常値検知のベースライン作成
メリット: データ量削減、トレンド分析、ストレージ効率化

4️⃣ アプリケーションメトリクス

シナリオ: アプリケーションの1分ごとのエラー率監視

実装:
  • 1分間のタンブリングウィンドウでログを集計
  • エラー数 / 総リクエスト数でエラー率計算
  • 閾値超過時にアラート送信
メリット: リアルタイム監視、誤検知削減、トレンド把握

5️⃣ ログ集約とバッチ処理

シナリオ: アプリケーションログを15分ごとにCloud Storageに保存

実装:
  • 15分間のタンブリングウィンドウでログを収集
  • ウィンドウ終了時にParquet形式で保存
  • 日次バッチでBigQueryにロード
メリット: ファイル数削減、コスト削減、効率的な保存

⚖️ メリットとデメリット

✅ メリット

⚠️ デメリット

💡 ベストプラクティス

項目 推奨事項
ウィンドウサイズ選択 ユースケースに応じて適切なサイズを選択(1分〜1時間が一般的)
遅延データ対応 Allowed LatenessやWatermarkを設定して遅延データを処理
Event Time vs Processing Time 正確性重視ならEvent Time、低レイテンシならProcessing Time
アラート設計 単一ウィンドウではなく複数ウィンドウのトレンドで判断
パーティショニング ウィンドウ開始時刻でパーティション分割してクエリ最適化
モニタリング ウィンドウごとのデータ量、処理時間、遅延を監視

📚 まとめ

🎓 タンブリングウィンドウの重要ポイント

💡 使い分けの基準:
  • タンブリング: 固定間隔の集計、シンプルさ重視
  • ホッピング: 移動平均、重複データ分析
  • スライディング: リアルタイム更新、低レイテンシ重視
  • セッション: ユーザーセッション、可変長アクティビティ
タンブリングウィンドウで、
効率的なストリームデータ処理を実現しましょう!