3.5 消息模式与事件分发
游戏服务器需要处理多种类型的消息:请求-响应、单向通知、发布订阅。选择合适的消息模式对系统架构至关重要。
消息模式
模式1:请求-响应(Request-Response)
// RequestResponse 请求-响应模式
type RequestResponse struct {
RequestID uint64
Request *Message
Response *Message
Timeout time.Duration
Callback func(*Message)
}
// 发送请求并等待响应
func (c *Connection) SendRequest(req *Message, timeout time.Duration) (*Message, error) {
// 1. 生成请求ID
requestID := c.generateRequestID()
req.RequestID = requestID
// 2. 注册回调
c.pendingRequests[requestID] = &RequestResponse{
RequestID: requestID,
Request: req,
Timeout: timeout,
}
// 3. 发送请求
if err := c.Send(req); err != nil {
delete(c.pendingRequests, requestID)
return nil, err
}
// 4. 等待响应
select {
case resp := <-c.responseChan:
return resp, nil
case <-time.After(timeout):
delete(c.pendingRequests, requestID)
return nil, errors.New("timeout")
}
}
// 处理响应
func (c *Connection) HandleResponse(resp *Message) {
requestID := resp.RequestID
// 查找待处理的请求
req, ok := c.pendingRequests[requestID]
if !ok {
log.Printf("Unknown request ID: %d", requestID)
return
}
// 发送响应到channel
c.responseChan <- resp
// 清理
delete(c.pendingRequests, requestID)
}
// 使用示例
func requestResponseExample() {
conn := NewConnection()
// 发送登录请求
req := &Message{
Type: MsgType_Login,
Body: []byte(`{"username":"player1","password":"123456"}`),
}
resp, err := conn.SendRequest(req, 5*time.Second)
if err != nil {
log.Printf("Login failed: %v", err)
return
}
log.Printf("Login success: %+v", resp)
}
模式2:单向通知(Fire-and-Forget)
// FireAndForget 单向通知模式
type FireAndForget struct {
Message *Message
}
// 发送单向通知(不等待响应)
func (c *Connection) SendNotification(msg *Message) error {
// 无需RequestID
// 无需等待响应
// 直接发送
return c.Send(msg)
}
// 使用示例
func notificationExample() {
conn := NewConnection()
// 发送聊天消息(无需响应)
chatMsg := &Message{
Type: MsgType_Chat,
Body: []byte(`{"content":"Hello World"}`),
}
if err := conn.SendNotification(chatMsg); err != nil {
log.Printf("Send chat failed: %v", err)
}
}
模式3:发布-订阅(Pub-Sub)
// PubSub 发布-订阅模式
type PubSub struct {
// 主题订阅
subscriptions map[string][]*Subscriber
// 消息队列
messageQueues map[*Subscriber]chan *Message
}
type Subscriber struct {
ID string
Topics []string
Callback func(*Message)
}
// Subscribe 订阅主题
func (ps *PubSub) Subscribe(subscriber *Subscriber, topics ...string) {
for _, topic := range topics {
ps.subscriptions[topic] = append(ps.subscriptions[topic], subscriber)
subscriber.Topics = append(subscriber.Topics, topic)
}
// 创建消息队列
ps.messageQueues[subscriber] = make(chan *Message, 1000)
// 启动消费协程
go ps.consume(subscriber)
}
// Publish 发布消息
func (ps *PubSub) Publish(topic string, msg *Message) {
subscribers, ok := ps.subscriptions[topic]
if !ok {
return // 没有订阅者
}
for _, subscriber := range subscribers {
select {
case ps.messageQueues[subscriber] <- msg:
// 成功发送
default:
// 队列满,丢弃消息
log.Printf("Subscriber %s queue full", subscriber.ID)
}
}
}
// 消费消息
func (ps *PubSub) consume(subscriber *Subscriber) {
for msg := range ps.messageQueues[subscriber] {
subscriber.Callback(msg)
}
}
// 使用示例
func pubsubExample() {
ps := NewPubSub()
// 订阅"世界聊天"
subscriber := &Subscriber{
ID: "player1",
Callback: func(msg *Message) {
log.Printf("Received: %s", string(msg.Body))
},
}
ps.Subscribe(subscriber, "world_chat")
// 发布消息
ps.Publish("world_chat", &Message{
Type: MsgType_Chat,
Body: []byte(`{"content":"Hello World"}`),
})
}
消息路由
路由器设计
// MessageRouter 消息路由器
type MessageRouter struct {
// 路由表
routes map[uint8]RouteHandler
// 中间件
middlewares []Middleware
}
type RouteHandler func(*Session, *Message) error
type Middleware func(*Session, *Message, RouteHandler) error
// AddRoute 添加路由
func (mr *MessageRouter) AddRoute(msgType uint8, handler RouteHandler) {
mr.routes[msgType] = handler
}
// Use 添加中间件
func (mr *MessageRouter) Use(middleware Middleware) {
mr.middlewares = append(mr.middlewares, middleware)
}
// Route 路由消息
func (mr *MessageRouter) Route(session *Session, msg *Message) error {
// 查找路由
handler, ok := mr.routes[msg.Type]
if !ok {
return fmt.Errorf("unknown message type: %d", msg.Type)
}
// 执行中间件链
var finalHandler RouteHandler = handler
for i := len(mr.middlewares) - 1; i >= 0; i-- {
middleware := mr.middlewares[i]
h := finalHandler
finalHandler = func(s *Session, m *Message) error {
return middleware(s, m, h)
}
}
// 执行处理函数
return finalHandler(session, msg)
}
// 使用示例
func routerExample() {
router := &MessageRouter{
routes: make(map[uint8]RouteHandler),
}
// 添加日志中间件
router.Use(func(s *Session, m *Message, next RouteHandler) error {
log.Printf("Received message: type=%d", m.Type)
err := next(s, m)
log.Printf("Message processed: err=%v", err)
return err
})
// 添加认证中间件
router.Use(func(s *Session, m *Message, next RouteHandler) error {
if !s.IsAuthenticated() {
return errors.New("not authenticated")
}
return next(s, m)
})
// 添加路由
router.AddRoute(MsgType_Login, handleLogin)
router.AddRoute(MsgType_Chat, handleChat)
router.AddRoute(MsgType_Move, handleMove)
// 路由消息
session := &Session{}
msg := &Message{Type: MsgType_Login}
router.Route(session, msg)
}
分布式路由
// DistributedRouter 分布式路由器
type DistributedRouter struct {
// 本地路由
localRouter *MessageRouter
// 远程路由
remoteRoutes map[string]*RemoteRouter // serverID → router
// 路由策略
strategy RoutingStrategy
}
type RoutingStrategy int
const (
RoundRobin RoutingStrategy = iota
LeastConnections
ConsistentHash
)
type RemoteRouter struct {
ServerID string
Address string
Client *RPCClient
}
// Route 分布式路由
func (dr *DistributedRouter) Route(session *Session, msg *Message) error {
// 1. 检查是否需要远程路由
targetServer := dr.selectServer(session, msg)
if targetServer == "" {
// 本地处理
return dr.localRouter.Route(session, msg)
}
// 2. 远程路由
remoteRouter, ok := dr.remoteRoutes[targetServer]
if !ok {
return fmt.Errorf("remote server not found: %s", targetServer)
}
// 3. 转发消息
return remoteRouter.Forward(msg)
}
// selectServer 选择目标服务器
func (dr *DistributedRouter) selectServer(session *Session, msg *Message) string {
switch dr.strategy {
case RoundRobin:
return dr.roundRobinSelect()
case LeastConnections:
return dr.leastConnectionsSelect()
case ConsistentHash:
return dr.consistentHashSelect(session.PlayerID)
default:
return ""
}
}
// 一致性哈希选择
func (dr *DistributedRouter) consistentistentHashSelect(playerID uint64) string {
// 使用一致性哈希算法
hash := crc32.ChecksumIEEE([]byte(fmt.Sprintf("%d", playerID)))
servers := make([]string, 0, len(dr.remoteRoutes))
for serverID := range dr.remoteRoutes {
servers = append(servers, serverID)
}
idx := int(hash) % len(servers)
return servers[idx]
}
Actor模型
Actor实现
// Actor Actor模型
type Actor struct {
ID string
mailbox chan *Message // 邮箱
handler ActorHandler
context *ActorContext
}
type ActorHandler func(*ActorContext, *Message)
type ActorContext struct {
Sender *Actor
Self *Actor
}
// NewActor 创建Actor
func NewActor(id string, handler ActorHandler) *Actor {
return &Actor{
ID: id,
mailbox: make(chan *Message, 1000),
handler: handler,
}
}
// Start 启动Actor
func (a *Actor) Start() {
go a.loop()
}
// loop Actor主循环
func (a *Actor) loop() {
for msg := range a.mailbox {
ctx := &ActorContext{
Self: a,
}
a.handler(ctx, msg)
}
}
// Send 发送消息给Actor
func (a *Actor) Send(msg *Message) {
select {
case a.mailbox <- msg:
// 成功发送
default:
// 邮箱满,丢弃或等待
log.Printf("Actor %s mailbox full", a.ID)
}
}
// Tell 异步发送消息
func (a *Actor) Tell(target *Actor, msg *Message) {
target.Send(msg)
}
// Ask 同步发送消息(等待响应)
func (a *Actor) Ask(target *Actor, msg *Message, timeout time.Duration) (*Message, error) {
// 创建响应channel
responseChan := make(chan *Message, 1)
msg.ResponseChan = responseChan
// 发送消息
target.Send(msg)
// 等待响应
select {
case resp := <-responseChan:
return resp, nil
case <-time.After(timeout):
return nil, errors.New("timeout")
}
}
// 使用示例
func actorExample() {
// 创建玩家Actor
player := NewActor("player1", func(ctx *ActorContext, msg *Message) {
switch msg.Type {
case MsgType_Move:
log.Printf("Player moving")
// 处理移动...
case MsgType_Attack:
log.Printf("Player attacking")
// 处理攻击...
}
})
player.Start()
// 发送消息
player.Send(&Message{Type: MsgType_Move})
}
Actor系统
// ActorSystem Actor系统
type ActorSystem struct {
actors map[string]*Actor
}
// Register 注册Actor
func (as *ActorSystem) Register(actor *Actor) {
as.actors[actor.ID] = actor
actor.Start()
}
// Find 查找Actor
func (as *ActorSystem) Find(id string) (*Actor, error) {
actor, ok := as.actors[id]
if !ok {
return nil, fmt.Errorf("actor not found: %s", id)
}
return actor, nil
}
// Broadcast 广播消息给所有Actor
func (as *ActorSystem) Broadcast(msg *Message) {
for _, actor := range as.actors {
actor.Send(msg)
}
}
// 使用示例
func actorSystemExample() {
system := &ActorSystem{
actors: make(map[string]*Actor),
}
// 创建多个玩家Actor
for i := 1; i <= 100; i++ {
player := NewActor(fmt.Sprintf("player%d", i), playerHandler)
system.Register(player)
}
// 广播消息
system.Broadcast(&Message{
Type: MsgType_WorldSave,
})
}
func playerHandler(ctx *ActorContext, msg *Message) {
log.Printf("Player %s received message: %d", ctx.Self.ID, msg.Type)
}
真实案例:大型MMO的消息路由
背景:
- 10万玩家在线
- 每秒100万条消息
- 需要分布式处理
技术方案:
1. 分层消息路由
// LayeredRouter 分层路由器
type LayeredRouter struct {
// 第一层:按玩家ID路由
playerRouter *ConsistentHashRouter
// 第二层:按消息类型路由
typeRouter *MessageTypeRouter
// 第三层:按功能模块路由
moduleRouter *ModuleRouter
}
// Route 三层路由
func (lr *LayeredRouter) Route(msg *Message) error {
// 1. 第一层:选择服务器
server := lr.playerRouter.SelectServer(msg.PlayerID)
// 2. 第二层:选择处理线程
worker := lr.typeRouter.SelectWorker(msg.Type)
// 3. 第三层:选择处理模块
handler := lr.moduleRouter.SelectHandler(msg.Type)
// 4. 处理消息
return handler(msg)
}
2. 消息优先级
// PriorityQueue 优先级队列
type PriorityQueue struct {
queues [3][]*Message // 高、中、低优先级
}
// Enqueue 入队
func (pq *PriorityQueue) Enqueue(msg *Message) {
priority := msg.Priority
if priority < 0 || priority > 2 {
priority = 1 // 默认中优先级
}
pq.queues[priority] = append(pq.queues[priority], msg)
}
// Dequeue 出队
func (pq *PriorityQueue) Dequeue() *Message {
// 优先处理高优先级
for i := 0; i < 3; i++ {
if len(pq.queues[i]) > 0 {
msg := pq.queues[i][0]
pq.queues[i] = pq.queues[i][1:]
return msg
}
}
return nil
}
效果:
- 消息吞吐量:从50万/秒提升到100万/秒
- 平均延迟:从30ms降到15ms
- 服务器CPU:从85%降到60%
小结
消息模式与事件分发的核心要点:
- 请求-响应:适合需要确认的操作(登录、购买)
- 单向通知:适合无需响应的操作(移动、聊天)
- 发布-订阅:适合广播消息(世界公告、系统消息)
- Actor模型:适合并发处理(玩家Actor、NPC Actor)
真实案例:
- 大型MMO:三层消息路由
- 《王者荣耀》:Actor模型 + 优先级队列
踩坑经验:
- ❌ 不要用请求-响应处理高频消息
- ❌ 不要忘记消息优先级
- ✅ 使用Actor模型提升并发
下一节(3.6)我们将学习:可靠性、顺序与兼容性,深入保证消息可靠传输。