3.6 可靠性、顺序与兼容性
游戏网络中,有些消息必须可靠送达(如购买物品),有些必须按顺序到达(如技能连招),有些需要版本兼容(如客户端更新)。
可靠性保证
ACK机制
// ACKSystem ACK确认系统
type ACKSystem struct {
// 发送窗口
sendWindow map[uint16]*Packet
// 接收窗口
recvWindow map[uint16]*Packet
// 序列号
sendSeq uint16
recvSeq uint16
// ACK确认
ackedSeq uint16
// 重传定时器
resendTimer *time.Timer
}
// SendReliable 发送可靠消息
func (as *ACKSystem) SendReliable(data []byte) error {
// 1. 分配序列号
as.sendSeq++
seq := as.sendSeq
// 2. 创建数据包
pkt := &Packet{
Sequence: seq,
Data: data,
Timestamp: time.Now(),
}
// 3. 保存到发送窗口
as.sendWindow[seq] = pkt
// 4. 发送数据
if err := as.sendPacket(pkt); err != nil {
return err
}
// 5. 启动重传定时器
as.startResendTimer()
return nil
}
// HandleACK 处理ACK确认
func (as *ACKSystem) HandleACK(ackSeq uint16) {
// 清理已确认的数据包
for seq := as.ackedSeq + 1; seq <= ackSeq; seq++ {
delete(as.sendWindow, seq)
}
as.ackedSeq = ackSeq
// 如果发送窗口为空,停止重传定时器
if len(as.sendWindow) == 0 {
as.resendTimer.Stop()
}
}
// ResendTimeout 重传超时
func (as *ACKSystem) ResendTimeout() {
now := time.Now()
// 重传未确认的数据包
for seq, pkt := range as.sendWindow {
// 超过100ms未确认,重传
if now.Sub(pkt.Timestamp) > 100*time.Millisecond {
log.Printf("Resending packet: %d", seq)
// 重传
as.sendPacket(pkt)
// 更新时间戳
pkt.Timestamp = now
}
}
// 重新启动定时器
as.startResendTimer()
}
func (as *ACKSystem) startResendTimer() {
if as.resendTimer != nil {
as.resendTimer.Stop()
}
as.resendTimer = time.AfterFunc(50*time.Millisecond, func() {
as.ResendTimeout()
})
}
选择性重传
// SelectiveRepeat 选择性重传
type SelectiveRepeat struct {
// 发送窗口
sendWindow map[uint16]*Packet
// 窗口大小
windowSize int
// 最早的未确认序列号
sendUnacked uint16
}
// SendWithWindow 发送消息(滑动窗口)
func (sr *SelectiveRepeat) SendWithWindow(data []byte) error {
// 检查窗口是否已满
if len(sr.sendWindow) >= sr.windowSize {
return errors.New("send window full")
}
// 分配序列号
seq := sr.sendUnacked + uint16(len(sr.sendWindow))
if seq < sr.sendUnacked { // 序列号回绕
seq = sr.sendUnacked
}
// 发送数据包
pkt := &Packet{
Sequence: seq,
Data: data,
}
sr.sendWindow[seq] = pkt
return sr.sendPacket(pkt)
}
// HandleACK 处理ACK(支持累积ACK和选择性ACK)
func (sr *SelectiveRepeat) HandleACK(ack uint16, ackBits uint16) error {
// ackBits表示哪些包被接收(SACK机制)
// 例如:ack=10, ackBits=0b1100 表示10、11、13已接收,12未接收
// 1. 处理累积ACK
if ack >= sr.sendUnacked {
// 清理已确认的数据包
for seq := sr.sendUnacked; seq <= ack; seq++ {
delete(sr.sendWindow, seq)
}
sr.sendUnacked = ack + 1
}
// 2. 处理选择性ACK(SACK)
for i := 0; i < 16; i++ {
if ackBits&(1<<i) != 0 {
seq := ack + uint16(i) + 1
delete(sr.sendWindow, seq)
}
}
return nil
}
顺序保证
序列号机制
// SequenceManager 序列号管理器
type SequenceManager struct {
// 发送序列号
sendSeq uint16
// 接收序列号
recvSeq uint16
// 接收缓冲区(用于排序)
recvBuffer map[uint16]*Packet
// 缓冲区大小
bufferSize int
}
// SendWithSeq 发送带序列号的消息
func (sm *SequenceManager) SendWithSeq(data []byte) []byte {
// 1. 分配序列号
sm.sendSeq++
seq := sm.sendSeq
// 2. 序列化数据包
pkt := &Packet{
Sequence: seq,
Data: data,
}
return sm.serialize(pkt)
}
// RecvWithSeq 接收带序列号的消息
func (sm *SequenceManager) RecvWithSeq(data []byte) ([]byte, error) {
// 1. 反序列化数据包
pkt, err := sm.deserialize(data)
if err != nil {
return nil, err
}
// 2. 检查是否是期望的序列号
if pkt.Sequence == sm.recvSeq+1 {
// 正常顺序
sm.recvSeq++
return pkt.Data, nil
}
// 3. 乱序到达,保存到缓冲区
if pkt.Sequence > sm.recvSeq+1 && pkt.Sequence <= sm.recvSeq+uint16(sm.bufferSize) {
sm.recvBuffer[pkt.Sequence] = pkt
return nil, errors.New("waiting for earlier packets")
}
// 4. 重复包,丢弃
if pkt.Sequence <= sm.recvSeq {
return nil, errors.New("duplicate packet")
}
// 5. 缓冲区溢出,丢弃
return nil, errors.New("buffer overflow")
}
// FlushBuffer 从缓冲区提取已就绪的消息
func (sm *SequenceManager) FlushBuffer() [][]byte {
ready := make([][]byte, 0)
// 检查缓冲区是否有下一条消息
for {
pkt, ok := sm.recvBuffer[sm.recvSeq+1]
if !ok {
break
}
// 提取消息
ready = append(ready, pkt.Data)
// 删除缓冲区中的消息
delete(sm.recvBuffer, pkt.Sequence)
// 更新接收序列号
sm.recvSeq++
}
return ready
}
排序缓冲区
// ReorderingBuffer 排序缓冲区
type ReorderingBuffer struct {
// 缓冲区(环形队列)
buffer []*Packet
// 头指针
head uint16
// 缓冲区大小
size int
}
// NewReorderingBuffer 创建排序缓冲区
func NewReorderingBuffer(size int) *ReorderingBuffer {
return &ReorderingBuffer{
buffer: make([]*Packet, size),
size: size,
}
}
// Insert 插入消息到缓冲区
func (rb *ReorderingBuffer) Insert(pkt *Packet) error {
// 计算索引
idx := (pkt.Sequence - rb.head) % uint16(rb.size)
// 检查是否超出缓冲区范围
if pkt.Sequence < rb.head || pkt.Sequence >= rb.head+uint16(rb.size) {
return errors.New("packet out of buffer range")
}
// 插入到缓冲区
rb.buffer[idx] = pkt
return nil
}
// Pop 提取已排序的消息
func (rb *ReorderingBuffer) Pop() []*Packet {
ready := make([]*Packet, 0)
// 从头开始提取连续的消息
for i := 0; i < rb.size; i++ {
idx := (int(rb.head) + i) % rb.size
if rb.buffer[idx] == nil {
break // 缺失消息,停止提取
}
ready = append(ready, rb.buffer[idx])
rb.buffer[idx] = nil
rb.head++
}
return ready
}
兼容性保证
协议版本管理
// ProtocolVersionManager 协议版本管理器
type ProtocolVersionManager struct {
// 当前版本
currentVersion ProtocolVersion
// 支持的版本
supportedVersions []ProtocolVersion
// 版本特定的编解码器
codecs map[ProtocolVersion]MessageCodec
}
// NegotiateVersion 协商版本
func (pvm *ProtocolVersionManager) NegotiateVersion(clientVersions []ProtocolVersion) (ProtocolVersion, error) {
// 找到最高版本的共同版本
for _, clientVer := range clientVersions {
for _, supportedVer := range pvm.supportedVersions {
if clientVer == supportedVer {
return clientVer, nil
}
}
}
return ProtocolVersion{}, errors.New("no compatible version")
}
// Encode 编码消息(使用指定版本)
func (pvm *ProtocolVersionManager) Encode(version ProtocolVersion, msg interface{}) ([]byte, error) {
codec, ok := pvm.codecs[version]
if !ok {
return nil, fmt.Errorf("unsupported version: %v", version)
}
return codec.Encode(msg)
}
// Decode 解码消息(自动检测版本)
func (pvm *ProtocolVersionManager) Decode(data []byte) (interface{}, ProtocolVersion, error) {
// 从数据中提取版本号
version := pvm.extractVersion(data)
// 使用对应版本的解码器
codec, ok := pvm.codecs[version]
if !ok {
return nil, version, fmt.Errorf("unsupported version: %v", version)
}
msg, err := codec.Decode(data)
return msg, version, err
}
向后兼容
// BackwardCompatManager 向后兼容管理器
type BackwardCompatManager struct {
// 版本差异映射
versionDiff map[ProtocolVersion]*VersionDiff
}
type VersionDiff struct {
// 新增字段
AddedFields []FieldInfo
// 删除字段
RemovedFields []FieldInfo
// 修改字段
ModifiedFields []FieldInfo
}
type FieldInfo struct {
Message string
Field string
Type string
}
// Convert 转换消息(旧版本 → 新版本)
func (bcm *BackwardCompatManager) Convert(msg interface{}, fromVer, toVer ProtocolVersion) (interface{}, error) {
// 1. 检查是否需要转换
if fromVer == toVer {
return msg, nil
}
// 2. 查找版本差异
diff, ok := bcm.versionDiff[fromVer]
if !ok {
return nil, fmt.Errorf("unknown version: %v", fromVer)
}
// 3. 应用转换
return bcm.applyTransform(msg, diff)
}
// applyTransform 应用转换
func (bcm *BackwardCompatManager) applyTransform(msg interface{}, diff *VersionDiff) (interface{}, error) {
// 使用反射修改消息
v := reflect.ValueOf(msg).Elem()
// 添加新字段(使用默认值)
for _, field := range diff.AddedFields {
f := v.FieldByName(field.Field)
if f.IsValid() {
f.Set(reflect.Zero(f.Type()))
}
}
// 删除旧字段(忽略)
return msg, nil
}
真实案例:KCP的可靠性保证
KCP的可靠性机制:
// KCP的可靠性特点
type KCPFeature struct {
Name string
Description string
}
var kcpReliabilityFeatures = []KCPFeature{
{
Name: "快速重传",
Description: "收到3个重复ACK立即重传(不用等RTO超时)",
},
{
Name: "选择性重传",
Description: "只重传丢失的包,不重传已接收的包",
},
{
Name: "UNA模式",
Description: "使用UNA(未确认)字段快速重传",
},
{
Name: "减少RTT",
Description: "不延迟发送ACK,减少往返时间",
},
}
// KCP配置优化
func optimizeKCP(sess *kcp.UDPSession) {
// 1. 启用无延迟模式
sess.SetNoDelay(1, 10, 2, 1)
// 参数:nodelay, interval(ms), resend, nc
// 2. 调整窗口大小
sess.SetWndSize(256, 256) // 发送/接收窗口
// 3. 设置最大传输单元
sess.SetMtu(1200)
// 4. 设置流模式
sess.SetStreamMode(false) // 消息模式
}
性能优化
批量ACK
// BatchACK 批量ACK
type BatchACK struct {
// ACK列表
acks []uint16
// ACK累积位图
ackBitmap uint32
// 定时器
timer *time.Timer
}
// AddACK 添加ACK
func (ba *BatchACK) AddACK(seq uint16) {
ba.acks = append(ba.acks, seq)
// 如果累积到一定数量,立即发送
if len(ba.acks) >= 10 {
ba.SendACK()
return
}
// 否则,启动定时器(延迟发送)
if ba.timer != nil {
ba.timer.Stop()
}
ba.timer = time.AfterFunc(10*time.Millisecond, func() {
ba.SendACK()
})
}
// SendACK 发送ACK
func (ba *BatchACK) SendACK() {
if len(ba.acks) == 0 {
return
}
// 批量发送ACK
for _, seq := range ba.acks {
// 发送ACK...
}
// 清空ACK列表
ba.acks = ba.acks[:0]
}
小结
可靠性、顺序与兼容性的核心要点:
- 可靠性:ACK + 选择性重传
- 顺序:序列号 + 排序缓冲区
- 兼容性:版本管理 + 向后转换
- 优化:批量ACK + 快速重传
真实案例:
- KCP:快速重传 + 选择性重传
- TCP:累积ACK + 超时重传
踩坑经验:
- ❌ 不要每条消息都发送ACK
- ❌ 不要忽略版本兼容
- ✅ 使用选择性重传减少带宽
下一节(3.7)我们将学习:房间广播、弱网与国内网络环境,深入了解游戏网络优化实战。