Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

3.5 消息模式与事件分发

游戏服务器需要处理多种类型的消息:请求-响应、单向通知、发布订阅。选择合适的消息模式对系统架构至关重要。

消息模式

模式1:请求-响应(Request-Response)

// RequestResponse 请求-响应模式
type RequestResponse struct {
    RequestID  uint64
    Request    *Message
    Response   *Message
    Timeout    time.Duration
    Callback   func(*Message)
}

// 发送请求并等待响应
func (c *Connection) SendRequest(req *Message, timeout time.Duration) (*Message, error) {
    // 1. 生成请求ID
    requestID := c.generateRequestID()
    req.RequestID = requestID

    // 2. 注册回调
    c.pendingRequests[requestID] = &RequestResponse{
        RequestID: requestID,
        Request:   req,
        Timeout:   timeout,
    }

    // 3. 发送请求
    if err := c.Send(req); err != nil {
        delete(c.pendingRequests, requestID)
        return nil, err
    }

    // 4. 等待响应
    select {
    case resp := <-c.responseChan:
        return resp, nil
    case <-time.After(timeout):
        delete(c.pendingRequests, requestID)
        return nil, errors.New("timeout")
    }
}

// 处理响应
func (c *Connection) HandleResponse(resp *Message) {
    requestID := resp.RequestID

    // 查找待处理的请求
    req, ok := c.pendingRequests[requestID]
    if !ok {
        log.Printf("Unknown request ID: %d", requestID)
        return
    }

    // 发送响应到channel
    c.responseChan <- resp

    // 清理
    delete(c.pendingRequests, requestID)
}

// 使用示例
func requestResponseExample() {
    conn := NewConnection()

    // 发送登录请求
    req := &Message{
        Type:   MsgType_Login,
        Body:   []byte(`{"username":"player1","password":"123456"}`),
    }

    resp, err := conn.SendRequest(req, 5*time.Second)
    if err != nil {
        log.Printf("Login failed: %v", err)
        return
    }

    log.Printf("Login success: %+v", resp)
}

模式2:单向通知(Fire-and-Forget)

// FireAndForget 单向通知模式
type FireAndForget struct {
    Message *Message
}

// 发送单向通知(不等待响应)
func (c *Connection) SendNotification(msg *Message) error {
    // 无需RequestID
    // 无需等待响应
    // 直接发送
    return c.Send(msg)
}

// 使用示例
func notificationExample() {
    conn := NewConnection()

    // 发送聊天消息(无需响应)
    chatMsg := &Message{
        Type: MsgType_Chat,
        Body: []byte(`{"content":"Hello World"}`),
    }

    if err := conn.SendNotification(chatMsg); err != nil {
        log.Printf("Send chat failed: %v", err)
    }
}

模式3:发布-订阅(Pub-Sub)

// PubSub 发布-订阅模式
type PubSub struct {
    // 主题订阅
    subscriptions map[string][]*Subscriber

    // 消息队列
    messageQueues map[*Subscriber]chan *Message
}

type Subscriber struct {
    ID       string
    Topics   []string
    Callback func(*Message)
}

// Subscribe 订阅主题
func (ps *PubSub) Subscribe(subscriber *Subscriber, topics ...string) {
    for _, topic := range topics {
        ps.subscriptions[topic] = append(ps.subscriptions[topic], subscriber)
        subscriber.Topics = append(subscriber.Topics, topic)
    }

    // 创建消息队列
    ps.messageQueues[subscriber] = make(chan *Message, 1000)

    // 启动消费协程
    go ps.consume(subscriber)
}

// Publish 发布消息
func (ps *PubSub) Publish(topic string, msg *Message) {
    subscribers, ok := ps.subscriptions[topic]
    if !ok {
        return  // 没有订阅者
    }

    for _, subscriber := range subscribers {
        select {
        case ps.messageQueues[subscriber] <- msg:
            // 成功发送
        default:
            // 队列满,丢弃消息
            log.Printf("Subscriber %s queue full", subscriber.ID)
        }
    }
}

// 消费消息
func (ps *PubSub) consume(subscriber *Subscriber) {
    for msg := range ps.messageQueues[subscriber] {
        subscriber.Callback(msg)
    }
}

// 使用示例
func pubsubExample() {
    ps := NewPubSub()

    // 订阅"世界聊天"
    subscriber := &Subscriber{
        ID: "player1",
        Callback: func(msg *Message) {
            log.Printf("Received: %s", string(msg.Body))
        },
    }
    ps.Subscribe(subscriber, "world_chat")

    // 发布消息
    ps.Publish("world_chat", &Message{
        Type: MsgType_Chat,
        Body: []byte(`{"content":"Hello World"}`),
    })
}

消息路由

路由器设计

// MessageRouter 消息路由器
type MessageRouter struct {
    // 路由表
    routes map[uint8]RouteHandler

    // 中间件
    middlewares []Middleware
}

type RouteHandler func(*Session, *Message) error
type Middleware func(*Session, *Message, RouteHandler) error

// AddRoute 添加路由
func (mr *MessageRouter) AddRoute(msgType uint8, handler RouteHandler) {
    mr.routes[msgType] = handler
}

// Use 添加中间件
func (mr *MessageRouter) Use(middleware Middleware) {
    mr.middlewares = append(mr.middlewares, middleware)
}

// Route 路由消息
func (mr *MessageRouter) Route(session *Session, msg *Message) error {
    // 查找路由
    handler, ok := mr.routes[msg.Type]
    if !ok {
        return fmt.Errorf("unknown message type: %d", msg.Type)
    }

    // 执行中间件链
    var finalHandler RouteHandler = handler
    for i := len(mr.middlewares) - 1; i >= 0; i-- {
        middleware := mr.middlewares[i]
        h := finalHandler
        finalHandler = func(s *Session, m *Message) error {
            return middleware(s, m, h)
        }
    }

    // 执行处理函数
    return finalHandler(session, msg)
}

// 使用示例
func routerExample() {
    router := &MessageRouter{
        routes: make(map[uint8]RouteHandler),
    }

    // 添加日志中间件
    router.Use(func(s *Session, m *Message, next RouteHandler) error {
        log.Printf("Received message: type=%d", m.Type)
        err := next(s, m)
        log.Printf("Message processed: err=%v", err)
        return err
    })

    // 添加认证中间件
    router.Use(func(s *Session, m *Message, next RouteHandler) error {
        if !s.IsAuthenticated() {
            return errors.New("not authenticated")
        }
        return next(s, m)
    })

    // 添加路由
    router.AddRoute(MsgType_Login, handleLogin)
    router.AddRoute(MsgType_Chat, handleChat)
    router.AddRoute(MsgType_Move, handleMove)

    // 路由消息
    session := &Session{}
    msg := &Message{Type: MsgType_Login}
    router.Route(session, msg)
}

分布式路由

// DistributedRouter 分布式路由器
type DistributedRouter struct {
    // 本地路由
    localRouter *MessageRouter

    // 远程路由
    remoteRoutes map[string]*RemoteRouter  // serverID → router

    // 路由策略
    strategy RoutingStrategy
}

type RoutingStrategy int

const (
    RoundRobin RoutingStrategy = iota
    LeastConnections
    ConsistentHash
)

type RemoteRouter struct {
    ServerID string
    Address  string
    Client   *RPCClient
}

// Route 分布式路由
func (dr *DistributedRouter) Route(session *Session, msg *Message) error {
    // 1. 检查是否需要远程路由
    targetServer := dr.selectServer(session, msg)
    if targetServer == "" {
        // 本地处理
        return dr.localRouter.Route(session, msg)
    }

    // 2. 远程路由
    remoteRouter, ok := dr.remoteRoutes[targetServer]
    if !ok {
        return fmt.Errorf("remote server not found: %s", targetServer)
    }

    // 3. 转发消息
    return remoteRouter.Forward(msg)
}

// selectServer 选择目标服务器
func (dr *DistributedRouter) selectServer(session *Session, msg *Message) string {
    switch dr.strategy {
    case RoundRobin:
        return dr.roundRobinSelect()
    case LeastConnections:
        return dr.leastConnectionsSelect()
    case ConsistentHash:
        return dr.consistentHashSelect(session.PlayerID)
    default:
        return ""
    }
}

// 一致性哈希选择
func (dr *DistributedRouter) consistentistentHashSelect(playerID uint64) string {
    // 使用一致性哈希算法
    hash := crc32.ChecksumIEEE([]byte(fmt.Sprintf("%d", playerID)))

    servers := make([]string, 0, len(dr.remoteRoutes))
    for serverID := range dr.remoteRoutes {
        servers = append(servers, serverID)
    }

    idx := int(hash) % len(servers)
    return servers[idx]
}

Actor模型

Actor实现

// Actor Actor模型
type Actor struct {
    ID        string
    mailbox   chan *Message  // 邮箱
    handler   ActorHandler
    context   *ActorContext
}

type ActorHandler func(*ActorContext, *Message)
type ActorContext struct {
    Sender    *Actor
    Self      *Actor
}

// NewActor 创建Actor
func NewActor(id string, handler ActorHandler) *Actor {
    return &Actor{
        ID:      id,
        mailbox: make(chan *Message, 1000),
        handler: handler,
    }
}

// Start 启动Actor
func (a *Actor) Start() {
    go a.loop()
}

// loop Actor主循环
func (a *Actor) loop() {
    for msg := range a.mailbox {
        ctx := &ActorContext{
            Self: a,
        }
        a.handler(ctx, msg)
    }
}

// Send 发送消息给Actor
func (a *Actor) Send(msg *Message) {
    select {
    case a.mailbox <- msg:
        // 成功发送
    default:
        // 邮箱满,丢弃或等待
        log.Printf("Actor %s mailbox full", a.ID)
    }
}

// Tell 异步发送消息
func (a *Actor) Tell(target *Actor, msg *Message) {
    target.Send(msg)
}

// Ask 同步发送消息(等待响应)
func (a *Actor) Ask(target *Actor, msg *Message, timeout time.Duration) (*Message, error) {
    // 创建响应channel
    responseChan := make(chan *Message, 1)
    msg.ResponseChan = responseChan

    // 发送消息
    target.Send(msg)

    // 等待响应
    select {
    case resp := <-responseChan:
        return resp, nil
    case <-time.After(timeout):
        return nil, errors.New("timeout")
    }
}

// 使用示例
func actorExample() {
    // 创建玩家Actor
    player := NewActor("player1", func(ctx *ActorContext, msg *Message) {
        switch msg.Type {
        case MsgType_Move:
            log.Printf("Player moving")
            // 处理移动...
        case MsgType_Attack:
            log.Printf("Player attacking")
            // 处理攻击...
        }
    })

    player.Start()

    // 发送消息
    player.Send(&Message{Type: MsgType_Move})
}

Actor系统

// ActorSystem Actor系统
type ActorSystem struct {
    actors map[string]*Actor
}

// Register 注册Actor
func (as *ActorSystem) Register(actor *Actor) {
    as.actors[actor.ID] = actor
    actor.Start()
}

// Find 查找Actor
func (as *ActorSystem) Find(id string) (*Actor, error) {
    actor, ok := as.actors[id]
    if !ok {
        return nil, fmt.Errorf("actor not found: %s", id)
    }
    return actor, nil
}

// Broadcast 广播消息给所有Actor
func (as *ActorSystem) Broadcast(msg *Message) {
    for _, actor := range as.actors {
        actor.Send(msg)
    }
}

// 使用示例
func actorSystemExample() {
    system := &ActorSystem{
        actors: make(map[string]*Actor),
    }

    // 创建多个玩家Actor
    for i := 1; i <= 100; i++ {
        player := NewActor(fmt.Sprintf("player%d", i), playerHandler)
        system.Register(player)
    }

    // 广播消息
    system.Broadcast(&Message{
        Type: MsgType_WorldSave,
    })
}

func playerHandler(ctx *ActorContext, msg *Message) {
    log.Printf("Player %s received message: %d", ctx.Self.ID, msg.Type)
}

真实案例:大型MMO的消息路由

背景

  • 10万玩家在线
  • 每秒100万条消息
  • 需要分布式处理

技术方案

1. 分层消息路由

// LayeredRouter 分层路由器
type LayeredRouter struct {
    // 第一层:按玩家ID路由
    playerRouter *ConsistentHashRouter

    // 第二层:按消息类型路由
    typeRouter *MessageTypeRouter

    // 第三层:按功能模块路由
    moduleRouter *ModuleRouter
}

// Route 三层路由
func (lr *LayeredRouter) Route(msg *Message) error {
    // 1. 第一层:选择服务器
    server := lr.playerRouter.SelectServer(msg.PlayerID)

    // 2. 第二层:选择处理线程
    worker := lr.typeRouter.SelectWorker(msg.Type)

    // 3. 第三层:选择处理模块
    handler := lr.moduleRouter.SelectHandler(msg.Type)

    // 4. 处理消息
    return handler(msg)
}

2. 消息优先级

// PriorityQueue 优先级队列
type PriorityQueue struct {
    queues [3][]*Message  // 高、中、低优先级
}

// Enqueue 入队
func (pq *PriorityQueue) Enqueue(msg *Message) {
    priority := msg.Priority
    if priority < 0 || priority > 2 {
        priority = 1  // 默认中优先级
    }

    pq.queues[priority] = append(pq.queues[priority], msg)
}

// Dequeue 出队
func (pq *PriorityQueue) Dequeue() *Message {
    // 优先处理高优先级
    for i := 0; i < 3; i++ {
        if len(pq.queues[i]) > 0 {
            msg := pq.queues[i][0]
            pq.queues[i] = pq.queues[i][1:]
            return msg
        }
    }
    return nil
}

效果

  • 消息吞吐量:从50万/秒提升到100万/秒
  • 平均延迟:从30ms降到15ms
  • 服务器CPU:从85%降到60%

小结

消息模式与事件分发的核心要点:

  1. 请求-响应:适合需要确认的操作(登录、购买)
  2. 单向通知:适合无需响应的操作(移动、聊天)
  3. 发布-订阅:适合广播消息(世界公告、系统消息)
  4. Actor模型:适合并发处理(玩家Actor、NPC Actor)

真实案例

  • 大型MMO:三层消息路由
  • 《王者荣耀》:Actor模型 + 优先级队列

踩坑经验

  • ❌ 不要用请求-响应处理高频消息
  • ❌ 不要忘记消息优先级
  • ✅ 使用Actor模型提升并发

下一节(3.6)我们将学习:可靠性、顺序与兼容性,深入保证消息可靠传输。