Redis 流(Streams)

Redis 流(Streams)是 Redis 5.0 引入的一种新的数据结构,用于处理实时数据流。流数据结构结合了消息队列的特性和日志的特性,提供了高效的消息传递和处理能力,适合用于实时数据处理和分析。它是 Redis 中一种高效的、可持久化的数据结构,支持数据的持久化、排序、以及消费者组等功能。

场景示例

  1. 实时日志收集

    • 示例:将应用程序日志实时推送到 Redis 流中,然后进行分析和监控。
      XADD logs * level "info" message "Application started"
      XADD logs * level "error" message "Something went wrong"
      
    • 说明:XADD 将日志条目添加到 logs 流中。
  2. 消息队列

    • 示例:使用 Redis 流作为消息队列,生产者将消息推送到流中,消费者从流中读取消息进行处理。
      XADD message_queue * sender "user1" content "Hello, World!"
      
    • 说明:XADD 将消息添加到 message_queue 流中,消费者可以使用 XREADXREADGROUP 命令来读取消息。
  3. 任务调度

    • 示例:将定时任务加入 Redis 流,工作进程从流中读取任务并执行。
      XADD tasks * task_id "1" action "process_data" timestamp "2024-07-29T10:00:00Z"
      
    • 说明:XADD 将任务条目添加到 tasks 流中,工作进程可以定期检查流中的任务。
  4. 实时数据处理

    • 示例:实时处理传感器数据流,进行分析和存储。
      XADD sensor_data * sensor_id "123" temperature "22.5" humidity "55"
      
    • 说明:XADD 将传感器数据添加到 sensor_data 流中,后续的分析工具可以实时处理这些数据。

底层实现

Redis 流(Streams)的底层实现是基于一个链表结构和散列结构的混合体,以实现高效的写入和读取操作。主要特性包括:

  1. 数据结构

    • 消息条目:每个消息条目包含一个唯一的 ID 和一个包含多个字段的散列。ID 是基于时间戳和序列号生成的,确保了条目的唯一性和顺序性。
    • 内部存储:Redis 流使用链表结构来存储消息条目,每个条目由一个散列数据结构表示。链表确保了消息的顺序,而散列提供了高效的数据存储。
  2. 主要命令

    • XADD:向流中添加新消息。
      XADD key ID field value [field value ...]
      
    • XRANGE:读取流中的消息。
      XRANGE key start end [COUNT count]
      
    • XREVRANGE:反向读取流中的消息。
      XREVRANGE key end start [COUNT count]
      
    • XREAD:从流中读取消息,可以设置阻塞模式。
      XREAD [BLOCK timeout] [COUNT count] STREAMS key [key ...] ID [ID ...]
      
    • XREADGROUP:使用消费者组从流中读取消息。
      XREADGROUP GROUP group consumer [BLOCK timeout] [COUNT count] STREAMS key [key ...] ID [ID ...]
      
    • XGROUP:创建和管理消费者组。
      XGROUP CREATE key group name ID [MKSTREAM]
      XGROUP DELCONSUMER key group name consumer
      XGROUP SETID key group name ID
      
  3. 消息 ID

    • 消息 ID 是由时间戳和序列号组成的,例如 "1627574400000-0",其中 "1627574400000" 是时间戳(毫秒级别),"-0" 是序列号。这种结构确保了消息的唯一性和顺序。
  4. 消费者组

    • Redis 流支持消费者组功能,使得多个消费者可以从同一个流中读取消息,而不会重复消费。消费者组使得消息可以被分配给不同的消费者,提高了处理能力和效率。

Go 中使用 Redis 流

在 Go 语言中,可以使用 go-redis 库来操作 Redis 流。以下是一些基本操作示例。

  1. 连接 Redis

    package main
    
    import (
        "github.com/go-redis/redis/v8"
        "context"
        "fmt"
    )
    
    var ctx = context.Background()
    
    func main() {
        rdb := redis.NewClient(&redis.Options{
            Addr:     "localhost:6379",
            Password: "", // no password set
            DB:       0,  // use default DB
        })
    
        // 示例代码调用
        example(rdb)
    }
    
    func example(rdb *redis.Client) {
        // 示例操作
    }
    
  2. 添加消息到流

    func example(rdb *redis.Client) {
        _, err := rdb.XAdd(ctx, &redis.XAddArgs{
            Stream: "messages",
            Values: map[string]interface{}{
                "sender":  "user1",
                "content": "Hello, World!",
            },
        }).Result()
        if err != nil {
            panic(err)
        }
    }
    
  3. 读取流中的消息

    func example(rdb *redis.Client) {
        messages, err := rdb.XRange(ctx, "messages", "-", "+").Result()
        if err != nil {
            panic(err)
        }
        for _, message := range messages {
            fmt.Println("Message ID:", message.ID)
            fmt.Println("Fields:", message.Values)
        }
    }
    
  4. 使用消费者组读取消息

    func example(rdb *redis.Client) {
        _, err := rdb.XGroupCreateMkStream(ctx, "messages", "group1", "$").Result()
        if err != nil {
            panic(err)
        }
        
        messages, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    "group1",
            Consumer: "consumer1",
            Block:    0,
            Count:    10,
            Streams:  []string{"messages", ">"}, // ">" indicates read new messages only
        }).Result()
        if err != nil {
            panic(err)
        }
        for _, stream := range messages {
            for _, message := range stream.Messages {
                fmt.Println("Message ID:", message.ID)
                fmt.Println("Fields:", message.Values)
            }
        }
    }
    

Redis 流提供了强大的实时数据处理功能,通过 Go 客户端库可以轻松实现各种数据流操作和处理,适用于需要高效处理实时数据流的应用场景。