Redis Streaming
首页
快速开始
  • 核心 API
  • Runtime
  • Config
  • State
  • Checkpoint
  • Watermark
  • Window
  • Source & Sink
  • Reliability
  • Registry
  • MQ
  • 架构设计
  • Exactly-Once
  • MQ 设计
  • Registry 设计
  • 部署指南
  • 性能调优
  • 故障排查
GitHub
首页
快速开始
  • 核心 API
  • Runtime
  • Config
  • State
  • Checkpoint
  • Watermark
  • Window
  • Source & Sink
  • Reliability
  • Registry
  • MQ
  • 架构设计
  • Exactly-Once
  • MQ 设计
  • Registry 设计
  • 部署指南
  • 性能调优
  • 故障排查
GitHub
  • 快速开始

    • 5分钟上手
    • Spring Boot 集成
  • 核心概念

    • 架构概述
    • 核心 API
    • 运行时环境
  • 基础设施模块

    • Config 配置中心
    • Registry 服务注册
    • MQ 消息队列
  • 流处理核心

    • State 状态管理
    • Checkpoint 检查点
    • Watermark 水位线
    • Window 窗口
  • 数据集成

    • Source & Sink
    • CDC 变更捕获
    • Aggregation 聚合
    • Table 表操作
    • Join 流连接
  • 可靠性

    • Reliability 组件
    • Metrics 监控
  • 设计文档

    • Exactly-Once 语义
    • MQ 设计
    • Registry 设计
  • 运维

    • 部署指南
    • 性能调优
    • 故障排查
    • CI/CD
  • 开发

    • 开发指南
    • 测试指南
    • 发布流程

Checkpoint 模块

概述

Checkpoint 模块提供检查点机制,用于实现流处理应用的容错和状态恢复。支持定期创建检查点、保存状态快照,并在故障发生时从最近的检查点恢复。

核心功能

  • 检查点协调: 协调整个作业的检查点创建
  • 状态快照: 保存所有算子的状态到持久化存储
  • 故障恢复: 从最近的检查点恢复作业状态
  • 分布式协调: 支持分布式环境下的检查点协调

核心接口

CheckpointCoordinator

public interface CheckpointCoordinator {
    // 触发检查点
    long triggerCheckpoint() throws Exception;

    // 恢复检查点
    void restoreFromCheckpoint(long checkpointId) throws Exception;

    // 获取最新检查点ID
    long getLatestCheckpointId();
}

CheckpointStorage

public interface CheckpointStorage {
    // 保存检查点
    void saveCheckpoint(Checkpoint checkpoint) throws IOException;

    // 获取检查点
    Checkpoint getCheckpoint(long checkpointId) throws IOException;

    // 获取最新检查点
    Checkpoint getLatestCheckpoint() throws IOException;
}

使用方式

1. 创建检查点协调器

CheckpointStorage storage = new RedisCheckpointStorage(redissonClient);
CheckpointCoordinator coordinator = new RedisCheckpointCoordinator(
    redissonClient,
    storage,
    "my-job"
);

2. 触发检查点

long checkpointId = coordinator.triggerCheckpoint();
System.out.println("Checkpoint created: " + checkpointId);

3. 恢复检查点

long latestId = coordinator.getLatestCheckpointId();
coordinator.restoreFromCheckpoint(latestId);

Redis 数据结构

  • 检查点数据: Hash

    • Key: checkpoint:{jobId}:{checkpointId}
    • Fields: timestamp, status, stateSnapshots
  • 检查点索引: Sorted Set

    • Key: checkpoint:{jobId}:index
    • Score: checkpointId
    • Member: checkpointId

相关文档

  • State 模块 - 状态管理
  • Runtime 模块 - 运行时环境
Edit this page
最近更新: 2026/1/1 13:26
Contributors: cuihairu
Prev
State 状态管理
Next
Watermark 水位线