💻 実装例
1. Apache Beam (Python)
import apache_beam as beam
from apache_beam import window
def process_with_tumbling_window():
with beam.Pipeline() as pipeline:
(
pipeline
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
subscription='projects/my-project/subscriptions/my-sub'
)
| 'ParseJSON' >> beam.Map(lambda x: json.loads(x))
| 'ExtractTimestamp' >> beam.Map(
lambda x: beam.window.TimestampedValue(
x, x['timestamp']
)
)
| 'WindowInto' >> beam.WindowInto(
window.FixedWindows(5 * 60)
)
| 'CountPerWindow' >> beam.combiners.Count.PerElement()
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='project:dataset.windowed_counts'
)
)
2. Apache Flink (Java)
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
DataStream stream = ...;
stream
.keyBy(event -> event.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.sum("value")
.print();
stream
.keyBy(event -> event.getUserId())
.window(TumblingProcessingTimeWindows.of(Time.minutes(10)))
.aggregate(new MyAggregateFunction())
.addSink(new MySink());
3. Apache Spark Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
spark = SparkSession.builder.appName("TumblingWindowExample").getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
windowed_counts = df \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("user_id")
) \
.count()
query = windowed_counts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
4. Google Cloud Dataflow (SQL)
SELECT
TUMBLE_START(event_timestamp, INTERVAL 1 HOUR) AS window_start,
TUMBLE_END(event_timestamp, INTERVAL 1 HOUR) AS window_end,
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
pubsub.project.topic.events
GROUP BY
TUMBLE(event_timestamp, INTERVAL 1 HOUR),
user_id
5. Kafka Streams (Java)
import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
KStream stream = builder.stream("events");
stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count()
.toStream()
.to("windowed-counts");