Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

3.6 可靠性、顺序与兼容性

游戏网络中,有些消息必须可靠送达(如购买物品),有些必须按顺序到达(如技能连招),有些需要版本兼容(如客户端更新)。

可靠性保证

ACK机制

// ACKSystem ACK确认系统
type ACKSystem struct {
    // 发送窗口
    sendWindow   map[uint16]*Packet

    // 接收窗口
    recvWindow   map[uint16]*Packet

    // 序列号
    sendSeq      uint16
    recvSeq      uint16

    // ACK确认
    ackedSeq     uint16

    // 重传定时器
    resendTimer  *time.Timer
}

// SendReliable 发送可靠消息
func (as *ACKSystem) SendReliable(data []byte) error {
    // 1. 分配序列号
    as.sendSeq++
    seq := as.sendSeq

    // 2. 创建数据包
    pkt := &Packet{
        Sequence: seq,
        Data:     data,
        Timestamp: time.Now(),
    }

    // 3. 保存到发送窗口
    as.sendWindow[seq] = pkt

    // 4. 发送数据
    if err := as.sendPacket(pkt); err != nil {
        return err
    }

    // 5. 启动重传定时器
    as.startResendTimer()

    return nil
}

// HandleACK 处理ACK确认
func (as *ACKSystem) HandleACK(ackSeq uint16) {
    // 清理已确认的数据包
    for seq := as.ackedSeq + 1; seq <= ackSeq; seq++ {
        delete(as.sendWindow, seq)
    }

    as.ackedSeq = ackSeq

    // 如果发送窗口为空,停止重传定时器
    if len(as.sendWindow) == 0 {
        as.resendTimer.Stop()
    }
}

// ResendTimeout 重传超时
func (as *ACKSystem) ResendTimeout() {
    now := time.Now()

    // 重传未确认的数据包
    for seq, pkt := range as.sendWindow {
        // 超过100ms未确认,重传
        if now.Sub(pkt.Timestamp) > 100*time.Millisecond {
            log.Printf("Resending packet: %d", seq)

            // 重传
            as.sendPacket(pkt)

            // 更新时间戳
            pkt.Timestamp = now
        }
    }

    // 重新启动定时器
    as.startResendTimer()
}

func (as *ACKSystem) startResendTimer() {
    if as.resendTimer != nil {
        as.resendTimer.Stop()
    }

    as.resendTimer = time.AfterFunc(50*time.Millisecond, func() {
        as.ResendTimeout()
    })
}

选择性重传

// SelectiveRepeat 选择性重传
type SelectiveRepeat struct {
    // 发送窗口
    sendWindow   map[uint16]*Packet

    // 窗口大小
    windowSize   int

    // 最早的未确认序列号
    sendUnacked  uint16
}

// SendWithWindow 发送消息(滑动窗口)
func (sr *SelectiveRepeat) SendWithWindow(data []byte) error {
    // 检查窗口是否已满
    if len(sr.sendWindow) >= sr.windowSize {
        return errors.New("send window full")
    }

    // 分配序列号
    seq := sr.sendUnacked + uint16(len(sr.sendWindow))
    if seq < sr.sendUnacked {  // 序列号回绕
        seq = sr.sendUnacked
    }

    // 发送数据包
    pkt := &Packet{
        Sequence: seq,
        Data:     data,
    }

    sr.sendWindow[seq] = pkt

    return sr.sendPacket(pkt)
}

// HandleACK 处理ACK(支持累积ACK和选择性ACK)
func (sr *SelectiveRepeat) HandleACK(ack uint16, ackBits uint16) error {
    // ackBits表示哪些包被接收(SACK机制)
    // 例如:ack=10, ackBits=0b1100 表示10、11、13已接收,12未接收

    // 1. 处理累积ACK
    if ack >= sr.sendUnacked {
        // 清理已确认的数据包
        for seq := sr.sendUnacked; seq <= ack; seq++ {
            delete(sr.sendWindow, seq)
        }
        sr.sendUnacked = ack + 1
    }

    // 2. 处理选择性ACK(SACK)
    for i := 0; i < 16; i++ {
        if ackBits&(1<<i) != 0 {
            seq := ack + uint16(i) + 1
            delete(sr.sendWindow, seq)
        }
    }

    return nil
}

顺序保证

序列号机制

// SequenceManager 序列号管理器
type SequenceManager struct {
    // 发送序列号
    sendSeq      uint16

    // 接收序列号
    recvSeq      uint16

    // 接收缓冲区(用于排序)
    recvBuffer   map[uint16]*Packet

    // 缓冲区大小
    bufferSize   int
}

// SendWithSeq 发送带序列号的消息
func (sm *SequenceManager) SendWithSeq(data []byte) []byte {
    // 1. 分配序列号
    sm.sendSeq++
    seq := sm.sendSeq

    // 2. 序列化数据包
    pkt := &Packet{
        Sequence: seq,
        Data:     data,
    }

    return sm.serialize(pkt)
}

// RecvWithSeq 接收带序列号的消息
func (sm *SequenceManager) RecvWithSeq(data []byte) ([]byte, error) {
    // 1. 反序列化数据包
    pkt, err := sm.deserialize(data)
    if err != nil {
        return nil, err
    }

    // 2. 检查是否是期望的序列号
    if pkt.Sequence == sm.recvSeq+1 {
        // 正常顺序
        sm.recvSeq++
        return pkt.Data, nil
    }

    // 3. 乱序到达,保存到缓冲区
    if pkt.Sequence > sm.recvSeq+1 && pkt.Sequence <= sm.recvSeq+uint16(sm.bufferSize) {
        sm.recvBuffer[pkt.Sequence] = pkt
        return nil, errors.New("waiting for earlier packets")
    }

    // 4. 重复包,丢弃
    if pkt.Sequence <= sm.recvSeq {
        return nil, errors.New("duplicate packet")
    }

    // 5. 缓冲区溢出,丢弃
    return nil, errors.New("buffer overflow")
}

// FlushBuffer 从缓冲区提取已就绪的消息
func (sm *SequenceManager) FlushBuffer() [][]byte {
    ready := make([][]byte, 0)

    // 检查缓冲区是否有下一条消息
    for {
        pkt, ok := sm.recvBuffer[sm.recvSeq+1]
        if !ok {
            break
        }

        // 提取消息
        ready = append(ready, pkt.Data)

        // 删除缓冲区中的消息
        delete(sm.recvBuffer, pkt.Sequence)

        // 更新接收序列号
        sm.recvSeq++
    }

    return ready
}

排序缓冲区

// ReorderingBuffer 排序缓冲区
type ReorderingBuffer struct {
    // 缓冲区(环形队列)
    buffer      []*Packet

    // 头指针
    head        uint16

    // 缓冲区大小
    size        int
}

// NewReorderingBuffer 创建排序缓冲区
func NewReorderingBuffer(size int) *ReorderingBuffer {
    return &ReorderingBuffer{
        buffer: make([]*Packet, size),
        size:   size,
    }
}

// Insert 插入消息到缓冲区
func (rb *ReorderingBuffer) Insert(pkt *Packet) error {
    // 计算索引
    idx := (pkt.Sequence - rb.head) % uint16(rb.size)

    // 检查是否超出缓冲区范围
    if pkt.Sequence < rb.head || pkt.Sequence >= rb.head+uint16(rb.size) {
        return errors.New("packet out of buffer range")
    }

    // 插入到缓冲区
    rb.buffer[idx] = pkt

    return nil
}

// Pop 提取已排序的消息
func (rb *ReorderingBuffer) Pop() []*Packet {
    ready := make([]*Packet, 0)

    // 从头开始提取连续的消息
    for i := 0; i < rb.size; i++ {
        idx := (int(rb.head) + i) % rb.size

        if rb.buffer[idx] == nil {
            break  // 缺失消息,停止提取
        }

        ready = append(ready, rb.buffer[idx])
        rb.buffer[idx] = nil
        rb.head++
    }

    return ready
}

兼容性保证

协议版本管理

// ProtocolVersionManager 协议版本管理器
type ProtocolVersionManager struct {
    // 当前版本
    currentVersion ProtocolVersion

    // 支持的版本
    supportedVersions []ProtocolVersion

    // 版本特定的编解码器
    codecs map[ProtocolVersion]MessageCodec
}

// NegotiateVersion 协商版本
func (pvm *ProtocolVersionManager) NegotiateVersion(clientVersions []ProtocolVersion) (ProtocolVersion, error) {
    // 找到最高版本的共同版本
    for _, clientVer := range clientVersions {
        for _, supportedVer := range pvm.supportedVersions {
            if clientVer == supportedVer {
                return clientVer, nil
            }
        }
    }

    return ProtocolVersion{}, errors.New("no compatible version")
}

// Encode 编码消息(使用指定版本)
func (pvm *ProtocolVersionManager) Encode(version ProtocolVersion, msg interface{}) ([]byte, error) {
    codec, ok := pvm.codecs[version]
    if !ok {
        return nil, fmt.Errorf("unsupported version: %v", version)
    }

    return codec.Encode(msg)
}

// Decode 解码消息(自动检测版本)
func (pvm *ProtocolVersionManager) Decode(data []byte) (interface{}, ProtocolVersion, error) {
    // 从数据中提取版本号
    version := pvm.extractVersion(data)

    // 使用对应版本的解码器
    codec, ok := pvm.codecs[version]
    if !ok {
        return nil, version, fmt.Errorf("unsupported version: %v", version)
    }

    msg, err := codec.Decode(data)
    return msg, version, err
}

向后兼容

// BackwardCompatManager 向后兼容管理器
type BackwardCompatManager struct {
    // 版本差异映射
    versionDiff map[ProtocolVersion]*VersionDiff
}

type VersionDiff struct {
    // 新增字段
    AddedFields []FieldInfo

    // 删除字段
    RemovedFields []FieldInfo

    // 修改字段
    ModifiedFields []FieldInfo
}

type FieldInfo struct {
    Message string
    Field   string
    Type    string
}

// Convert 转换消息(旧版本 → 新版本)
func (bcm *BackwardCompatManager) Convert(msg interface{}, fromVer, toVer ProtocolVersion) (interface{}, error) {
    // 1. 检查是否需要转换
    if fromVer == toVer {
        return msg, nil
    }

    // 2. 查找版本差异
    diff, ok := bcm.versionDiff[fromVer]
    if !ok {
        return nil, fmt.Errorf("unknown version: %v", fromVer)
    }

    // 3. 应用转换
    return bcm.applyTransform(msg, diff)
}

// applyTransform 应用转换
func (bcm *BackwardCompatManager) applyTransform(msg interface{}, diff *VersionDiff) (interface{}, error) {
    // 使用反射修改消息
    v := reflect.ValueOf(msg).Elem()

    // 添加新字段(使用默认值)
    for _, field := range diff.AddedFields {
        f := v.FieldByName(field.Field)
        if f.IsValid() {
            f.Set(reflect.Zero(f.Type()))
        }
    }

    // 删除旧字段(忽略)

    return msg, nil
}

真实案例:KCP的可靠性保证

KCP的可靠性机制

// KCP的可靠性特点
type KCPFeature struct {
    Name        string
    Description string
}

var kcpReliabilityFeatures = []KCPFeature{
    {
        Name:        "快速重传",
        Description: "收到3个重复ACK立即重传(不用等RTO超时)",
    },
    {
        Name:        "选择性重传",
        Description: "只重传丢失的包,不重传已接收的包",
    },
    {
        Name:        "UNA模式",
        Description: "使用UNA(未确认)字段快速重传",
    },
    {
        Name:        "减少RTT",
        Description: "不延迟发送ACK,减少往返时间",
    },
}

// KCP配置优化
func optimizeKCP(sess *kcp.UDPSession) {
    // 1. 启用无延迟模式
    sess.SetNoDelay(1, 10, 2, 1)
    // 参数:nodelay, interval(ms), resend, nc

    // 2. 调整窗口大小
    sess.SetWndSize(256, 256)  // 发送/接收窗口

    // 3. 设置最大传输单元
    sess.SetMtu(1200)

    // 4. 设置流模式
    sess.SetStreamMode(false)  // 消息模式
}

性能优化

批量ACK

// BatchACK 批量ACK
type BatchACK struct {
    // ACK列表
    acks        []uint16

    // ACK累积位图
    ackBitmap   uint32

    // 定时器
    timer       *time.Timer
}

// AddACK 添加ACK
func (ba *BatchACK) AddACK(seq uint16) {
    ba.acks = append(ba.acks, seq)

    // 如果累积到一定数量,立即发送
    if len(ba.acks) >= 10 {
        ba.SendACK()
        return
    }

    // 否则,启动定时器(延迟发送)
    if ba.timer != nil {
        ba.timer.Stop()
    }

    ba.timer = time.AfterFunc(10*time.Millisecond, func() {
        ba.SendACK()
    })
}

// SendACK 发送ACK
func (ba *BatchACK) SendACK() {
    if len(ba.acks) == 0 {
        return
    }

    // 批量发送ACK
    for _, seq := range ba.acks {
        // 发送ACK...
    }

    // 清空ACK列表
    ba.acks = ba.acks[:0]
}

小结

可靠性、顺序与兼容性的核心要点:

  1. 可靠性:ACK + 选择性重传
  2. 顺序:序列号 + 排序缓冲区
  3. 兼容性:版本管理 + 向后转换
  4. 优化:批量ACK + 快速重传

真实案例

  • KCP:快速重传 + 选择性重传
  • TCP:累积ACK + 超时重传

踩坑经验

  • ❌ 不要每条消息都发送ACK
  • ❌ 不要忽略版本兼容
  • ✅ 使用选择性重传减少带宽

下一节(3.7)我们将学习:房间广播、弱网与国内网络环境,深入了解游戏网络优化实战。