Redis Streaming
首页
快速开始
  • 核心 API
  • Runtime
  • Config
  • State
  • Checkpoint
  • Watermark
  • Window
  • Source & Sink
  • Reliability
  • Registry
  • MQ
  • 架构设计
  • Exactly-Once
  • MQ 设计
  • Registry 设计
  • 部署指南
  • 性能调优
  • 故障排查
GitHub
首页
快速开始
  • 核心 API
  • Runtime
  • Config
  • State
  • Checkpoint
  • Watermark
  • Window
  • Source & Sink
  • Reliability
  • Registry
  • MQ
  • 架构设计
  • Exactly-Once
  • MQ 设计
  • Registry 设计
  • 部署指南
  • 性能调优
  • 故障排查
GitHub
  • 快速开始

    • 5分钟上手
    • Spring Boot 集成
  • 核心概念

    • 架构概述
    • 核心 API
    • 运行时环境
  • 基础设施模块

    • Config 配置中心
    • Registry 服务注册
    • MQ 消息队列
  • 流处理核心

    • State 状态管理
    • Checkpoint 检查点
    • Watermark 水位线
    • Window 窗口
  • 数据集成

    • Source & Sink
    • CDC 变更捕获
    • Aggregation 聚合
    • Table 表操作
    • Join 流连接
  • 可靠性

    • Reliability 组件
    • Metrics 监控
  • 设计文档

    • Exactly-Once 语义
    • MQ 设计
    • Registry 设计
  • 运维

    • 部署指南
    • 性能调优
    • 故障排查
    • CI/CD
  • 开发

    • 开发指南
    • 测试指南
    • 发布流程

Window 模块

概述

Window 模块提供窗口操作功能,支持将无限流分割成有限的窗口进行聚合计算。支持滚动窗口、滑动窗口和会话窗口。

窗口类型

1. TumblingWindow (滚动窗口)

窗口不重叠,每个窗口独立处理。

DataStream<Event> stream = env.fromElements(...);

// 创建滚动窗口(1分钟)
stream.window(TumblingWindow.of(Duration.ofMinutes(1)))
      .aggregate(new MyAggregateFunction());

2. SlidingWindow (滑动窗口)

窗口有重叠,支持更平滑的数据分析。

// 创建滑动窗口(窗口1分钟,每30秒滑动一次)
stream.window(SlidingWindow.of(
        Duration.ofMinutes(1),   // 窗口大小
        Duration.ofSeconds(30)   // 滑动步长
    ))
   .sum("value");

3. SessionWindow (会话窗口)

基于活动间隔动态划分窗口。

// 创建会话窗口(30秒无活动则窗口结束)
stream.window(SessionWindow.withGap(Duration.ofSeconds(30)))
      .process(new MyProcessFunction());

核心组件

WindowAssigner

窗口分配器,决定元素属于哪些窗口。

public interface WindowAssigner<T, W extends Window> {
    // 分配窗口
    Iterable<W> assignWindows(T element, long timestamp);

    // 获取默认触发器
    Trigger<T> getDefaultTrigger();
}

Trigger

触发器,决定何时触发窗口计算。

public interface Trigger<T> {
    // 检查是否可以触发
    boolean canTrigger(Watermark watermark);

    // 处理元素
    void onElement(T element) throws Exception;
}

使用方式

1. 基本窗口聚合

DataStream<Event> stream = env.fromElements(...);

stream.keyBy(Event::getKey)
      .window(TumblingWindow.of(Duration.ofMinutes(1)))
      .aggregate(Aggregates.sum("value"));

2. 窗口处理函数

stream.window(SlidingWindow.of(Duration.ofHours(1), Duration.ofMinutes(30)))
      .process(new ProcessWindowFunction<String, Event, Result>() {
          @Override
          public void process(String key,
                            Context context,
                            Iterable<Event> elements,
                            Collector<Result> out) {
              // 处理窗口内所有元素
              long count = 0;
              for (Event e : elements) {
                  count++;
              }
              out.collect(new Result(key, count));
          }
      });

3. 迟到数据处理

stream.window(TumblingWindow.of(Duration.ofMinutes(1)))
      .allowedLateness(Duration.ofSeconds(30))  // 允许30秒迟到
      .sideOutputLateData(lateDataTag)         // 输出到侧输出流
      .sum("value");

时间语义

Event Time (事件时间)

基于事件本身的时间戳,处理乱序事件。

stream.assignTimestamps((event) -> event.getTimestamp())
      .window(TumblingWindow.of(Duration.ofMinutes(1)))
      .trigger(EventTimeTrigger.create());

Processing Time (处理时间)

基于系统时间,不关心事件时间。

stream.window(TumblingWindow.of(Duration.ofMinutes(1)))
      .trigger(ProcessingTimeTrigger.create());

相关文档

  • Watermark 模块 - 水位线机制
  • Aggregation 模块 - 聚合函数
Edit this page
最近更新: 2026/1/1 13:26
Contributors: cuihairu
Prev
Watermark 水位线