Redis Streaming
首页
快速开始
  • 核心 API
  • Runtime
  • Config
  • State
  • Checkpoint
  • Watermark
  • Window
  • Source & Sink
  • Reliability
  • Registry
  • MQ
  • 架构设计
  • Exactly-Once
  • MQ 设计
  • Registry 设计
  • 部署指南
  • 性能调优
  • 故障排查
GitHub
首页
快速开始
  • 核心 API
  • Runtime
  • Config
  • State
  • Checkpoint
  • Watermark
  • Window
  • Source & Sink
  • Reliability
  • Registry
  • MQ
  • 架构设计
  • Exactly-Once
  • MQ 设计
  • Registry 设计
  • 部署指南
  • 性能调优
  • 故障排查
GitHub
  • 快速开始

    • 5分钟上手
    • Spring Boot 集成
  • 核心概念

    • 架构概述
    • 核心 API
    • 运行时环境
  • 基础设施模块

    • Config 配置中心
    • Registry 服务注册
    • MQ 消息队列
  • 流处理核心

    • State 状态管理
    • Checkpoint 检查点
    • Watermark 水位线
    • Window 窗口
  • 数据集成

    • Source & Sink
    • CDC 变更捕获
    • Aggregation 聚合
    • Table 表操作
    • Join 流连接
  • 可靠性

    • Reliability 组件
    • Metrics 监控
  • 设计文档

    • Exactly-Once 语义
    • MQ 设计
    • Registry 设计
  • 运维

    • 部署指南
    • 性能调优
    • 故障排查
    • CI/CD
  • 开发

    • 开发指南
    • 测试指南
    • 发布流程

Source & Sink 模块

概述

Source 和 Sink 模块提供数据源连接器和数据汇连接器,用于与外部系统集成。

Source 模块

功能

从外部系统读取数据,创建数据流。

内置 Source

1. RedisListSource

从 Redis List 读取数据。

RedisListSource<String> source = new RedisListSource<>(
    redissonClient,
    "my-list",      // List 键
    "LPOP",         // 弹出方式 (LPOP/RPOP/LPOP/RPOP)
    new StringCodec()
);

DataStream<String> stream = env.fromSource(source);

2. KafkaSource

从 Kafka Topic 读取数据。

KafkaSource<String> source = new KafkaSource<>(
    "localhost:9092",
    "my-topic",
    "my-group",
    new StringDeserializer()
);

DataStream<String> stream = env.fromSource(source);

3. HttpSource

从 HTTP 端点轮询数据。

HttpSource<String> source = new HttpSource<>(
    "http://api.example.com/data",
    Duration.ofSeconds(10),  // 轮询间隔
    new StringCodec()
);

DataStream<String> stream = env.fromSource(source);

自定义 Source

实现 StreamSource 接口创建自定义数据源。

public class MySource implements StreamSource<String> {
    private final Iterator<String> iterator;

    public MySource(Iterator<String> iterator) {
        this.iterator = iterator;
    }

    @Override
    public void open() {
        // 初始化资源
    }

    @Override
    public boolean hasNext() {
        return iterator.hasNext();
    }

    @Override
    public String next() {
        return iterator.next();
    }

    @Override
    public void close() {
        // 释放资源
    }
}

Sink 模块

功能

将处理结果写入外部系统。

内置 Sink

1. RedisHashSink

写入 Redis Hash。

stream.sinkToRedisHash(
    "results",           // Hash 键
    (key, value) -> Map.of(
        "key", key,
        "value", value.toString(),
        "timestamp", String.valueOf(System.currentTimeMillis())
    )
);

2. RedisStreamSink

写入 Redis Stream。

stream.sinkToRedisStream(
    "output-stream",
    message -> new Message(
        message.getKey(),
        message.getValue(),
        Map.of("timestamp", String.valueOf(System.currentTimeMillis()))
    )
);

3. RedisListSink

写入 Redis List。

stream.sinkToRedisList(
    "output-list",
    value -> value.toString(),
    "RPUSH"  // LPUSH/RPUSH
);

4. KafkaSink

写入 Kafka Topic。

stream.sinkToKafka(
    "localhost:9092",
    "output-topic",
    new StringSerializer()
);

自定义 Sink

实现 StreamSink 接口创建自定义数据汇。

public class MySink implements StreamSink<Result> {
    private final Connection connection;

    public MySink(Connection connection) {
        this.connection = connection;
    }

    @Override
    public void open() {
        // 初始化资源
    }

    @Override
    public void write(Result value) {
        // 写入数据
        try (Statement stmt = connection.createStatement()) {
            stmt.execute("INSERT INTO results VALUES ("
                + "'" + value.getKey() + "', "
                + value.getValue() + ")");
        }
    }

    @Override
    public void close() {
        // 释放资源
    }
}

端到端示例

Kafka 到 Redis

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromKafka("localhost:9092", "input-topic", "group", new StringDeserializer())
    .map(String::toUpperCase)
    .keyBy(s -> s.substring(0, 1))
    .sum("value")
    .sinkToRedisStream("output-stream");

env.execute("KafkaToRedis");

HTTP 到 Redis

env.fromHttp("http://api.example.com/data", Duration.ofMinutes(1))
    .filter(data -> data.isValid())
    .map(Data::transform)
    .sinkToRedisHash("processed-data");

事务支持

支持 Exactly-Once 语义的 Sink。

IdempotentSink

幂等 Sink,重复写入不产生副作用。

stream.sinkTo(new IdempotentRedisListSink<>(
    redissonClient,
    "output",
    record -> record.getId(),  // 幂等键
    record::toString
));

CheckpointAwareSink

与 Checkpoint 集成的 Sink。

stream.sinkTo(new CheckpointAwareSink<Result>() {
    @Override
    public void write(Result value) {
        // 写入数据
    }

    @Override
    public void onCheckpointRestore(long checkpointId) {
        // 从 Checkpoint 恢复
    }
});

并行处理

分区写入

Sink 支持并行写入多个分区。

stream.setParallelism(4)
    .keyBy(Result::getKey)
    .sinkToRedisHash("results");

负载均衡

使用分区键实现负载均衡。

stream.keyBy(record -> record.getKey().hashCode() % 10)
    .sinkTo(new MultiPartitionSink(partitionSinks));

相关文档

  • MQ 模块 - 消息队列
  • Runtime 模块 - 运行时环境
  • Checkpoint 模块 - Checkpoint 机制
Edit this page
最近更新: 2026/1/1 13:26
Contributors: cuihairu
Next
CDC 变更捕获