Apollo 技术文档Apollo 技术文档
指南
  • 架构概述
  • BigWorld 架构深度解析
  • BigWorld 进程架构与玩家生命周期
  • AOI九宫格系统详解
  • AOI广播与消息去重
  • Base 模块
  • Core 模块
  • Runtime 模块
  • Data 模块
  • Network 模块
  • /modules/actor.html
  • Game 模块
  • BigWorld 模块
服务器应用
API 参考
QA
GitHub
指南
  • 架构概述
  • BigWorld 架构深度解析
  • BigWorld 进程架构与玩家生命周期
  • AOI九宫格系统详解
  • AOI广播与消息去重
  • Base 模块
  • Core 模块
  • Runtime 模块
  • Data 模块
  • Network 模块
  • /modules/actor.html
  • Game 模块
  • BigWorld 模块
服务器应用
API 参考
QA
GitHub
  • MMORPG 架构 QA

Q99: 如何管理服务器集群?

问题分析

本题考察对集群管理的理解:

  • 服务发现
  • 负载均衡
  • 故障转移
  • KBEngine 集群架构

一、集群架构

1.1 KBEngine 集群拓扑

┌─────────────────────────────────────────────────────────────┐
│                    KBEngine 集群架构                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  [客户端]                                                    │
│     │                                                       │
│     ▼                                                       │
│  ┌─────────┐                                               │
│  │LoginApp │  登录服务器 (可多实例)                           │
│  └────┬────┘                                               │
│       │                                                     │
│       ▼                                                     │
│  ┌────────────┐                                           │
│  │BaseAppMgr  │  BaseApp 管理器 (单实例, 主节点)               │
│  └────┬───────┘                                           │
│       │                                                     │
│       ├──────────────────────┐                             │
│       ▼                      ▼                             │
│  ┌─────────┐            ┌─────────┐                        │
│  │BaseApp  │  ←─→      │BaseApp  │  基础应用服务器 (可多实例)   │
│  │  #1    │            │  #2    │                         │
│  └────┬────┘            └────┬────┘                         │
│       │                     │                              │
│       └──────────┬──────────┘                              │
│                  ▼                                          │
│          ┌──────────────┐                                  │
│          │CellAppMgr    │  CellApp 管理器 (单实例, 主节点)    │
│          └──────┬───────┘                                  │
│                 │                                           │
│     ┌───────────┼───────────┐                              │
│     ▼           ▼           ▼                              │
│  ┌───────┐  ┌───────┐  ┌───────┐                            │
│  │CellApp│  │CellApp│  │CellApp│  游戏逻辑服务器 (可多实例)   │
│  │  #1   │  │  #2   │  │  #3   │                         │
│  └───────┘  └───────┘  └───────┘                             │
│                 │                                           │
│                 ▼                                           │
│          ┌──────────────┐                                  │
│          │DBMgr         │  数据库管理器 (单实例)              │
│          └──────────────┘                                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

二、服务发现

2.1 组件注册

# KBEngine 组件发现

"""
KBEngine 组件注册机制:

1. 各组件启动时向 Logger/Mgr 注册
2. Mgr 维护组件列表
3. 组件定期发送心跳
4. 心跳超时则标记为不可用
"""

class ComponentRegistry:
    """组件注册表"""

    def __init__(self):
        self.components = {
            "loginapp": [],
            "baseapp": [],
            "cellapp": [],
            "dbmgr": [],
            "baseappmgr": [],
            "cellappmgr": []
        }
        self.heartbeats = {}

    def register(self, component_type, component_id, address, port):
        """注册组件"""
        component = {
            "id": component_id,
            "type": component_type,
            "address": address,
            "port": port,
            "status": "starting",
            "load": 0,
            "register_time": time.time()
        }

        self.components[component_type].append(component)
        self.heartbeats[component_id] = time.time()

        INFO_MSG(f"Registered {component_type} {component_id} at {address}:{port}")

        return component

    def heartbeat(self, component_id):
        """心跳更新"""
        if component_id in self.heartbeats:
            self.heartbeats[component_id] = time.time()
            return True
        return False

    def get_component(self, component_type, component_id):
        """获取组件"""
        for comp in self.components[component_type]:
            if comp["id"] == component_id:
                return comp
        return None

    def get_components_by_type(self, component_type):
        """获取类型的所有组件"""
        return self.components.get(component_type, [])

    def get_healthy_components(self, component_type):
        """获取健康的组件"""
        healthy = []
        current_time = time.time()

        for comp in self.components[component_type]:
            # 检查心跳
            last_heartbeat = self.heartbeats.get(comp["id"], 0)
            if current_time - last_heartbeat < 30:  # 30秒超时
                healthy.append(comp)

        return healthy

    def mark_dead(self, component_id):
        """标记组件死亡"""
        for comp_type, comps in self.components.items():
            for comp in comps:
                if comp["id"] == component_id:
                    comp["status"] = "dead"
                    WARNING_MSG(f"Component {component_id} marked as dead")
                    return True
        return False


# KBEngine 组件发现扩展
class KBEngineComponentDiscovery:
    """KBEngine 组件发现"""

    def __init__(self):
        self.registry = ComponentRegistry()
        self.watcher_update_interval = 5  # 秒

    def start_discovery(self):
        """启动发现"""
        # 定期更新组件状态
        KBEngine.addTimer(self.watcher_update_interval, 0,
                          self.update_component_status)

    def update_component_status(self):
        """更新组件状态"""
        # 从 KBEngine Watcher 获取组件状态
        self.update_baseapps()
        self.update_cellapps()

    def update_baseapps(self):
        """更新 BaseApp 状态"""
        # 获取所有 BaseApp
        baseapps = KBEngine.getWatcher().get("components/baseapp")

        for baseapp_id, baseapp_data in baseapps.items():
            # 检查是否已注册
            comp = self.registry.get_component("baseapp", baseapp_id)

            if comp is None:
                # 新组件,注册
                self.registry.register(
                    "baseapp",
                    baseapp_id,
                    baseapp_data.get("address", "localhost"),
                    baseapp_data.get("port", 0)
                )
            else:
                # 更新状态
                comp["load"] = baseapp_data.get("load", 0)
                comp["entity_count"] = baseapp_data.get("entityCount", 0)
                self.registry.heartbeat(baseapp_id)

    def update_cellapps(self):
        """更新 CellApp 状态"""
        cellapps = KBEngine.getWatcher().get("components/cellapp")

        for cellapp_id, cellapp_data in cellapps.items():
            comp = self.registry.get_component("cellapp", cellapp_id)

            if comp is None:
                self.registry.register(
                    "cellapp",
                    cellapp_id,
                    cellapp_data.get("address", "localhost"),
                    cellapp_data.get("port", 0)
                )
            else:
                comp["load"] = cellapp_data.get("load", 0)
                comp["entity_count"] = cellapp_data.get("entityCount", 0)
                comp["space_count"] = cellapp_data.get("spaceCount", 0)
                self.registry.heartbeat(cellapp_id)

三、负载均衡

3.1 BaseApp 负载均衡

# BaseApp 负载均衡

class BaseAppLoadBalancer:
    """BaseApp 负载均衡器"""

    # 负载均衡策略
    STRATEGY_ROUND_ROBIN = "round_robin"
    STRATEGY_LEAST_CONNECTIONS = "least_connections"
    STRATEGY_WEIGHTED = "weighted"
    STRATEGY_CONSISTENT_HASH = "consistent_hash"

    def __init__(self, strategy=STRATEGY_LEAST_CONNECTIONS):
        self.strategy = strategy
        self.round_robin_index = 0

    def select_baseapp(self, player_id=None):
        """选择 BaseApp"""
        baseapps = self.get_healthy_baseapps()

        if not baseapps:
            ERROR_MSG("No healthy BaseApps available")
            return None

        if self.strategy == self.STRATEGY_ROUND_ROBIN:
            return self._round_robin_select(baseapps)
        elif self.strategy == self.STRATEGY_LEAST_CONNECTIONS:
            return self._least_connections_select(baseapps)
        elif self.strategy == self.STRATEGY_WEIGHTED:
            return self._weighted_select(baseapps)
        elif self.strategy == self.STRATEGY_CONSISTENT_HASH:
            return self._consistent_hash_select(baseapps, player_id)

        return baseapps[0]

    def get_healthy_baseapps(self):
        """获取健康的 BaseApp"""
        # 从 KBEngine 获取
        baseapps = []
        baseapp_list = KBEngine.getWatcher().get("components/baseapp")

        for baseapp_id, data in baseapp_list.items():
            if data.get("state", "") == "running":
                baseapps.append({
                    "id": baseapp_id,
                    "load": data.get("load", 0),
                    "connections": data.get("connections", 0),
                    "weight": data.get("weight", 1)
                })

        return baseapps

    def _round_robin_select(self, baseapps):
        """轮询选择"""
        selected = baseapps[self.round_robin_index % len(baseapps)]
        self.round_robin_index += 1
        return selected

    def _least_connections_select(self, baseapps):
        """最少连接选择"""
        return min(baseapps, key=lambda x: x["connections"])

    def _weighted_select(self, baseapps):
        """加权选择"""
        # 计算总权重
        total_weight = sum(b["weight"] for b in baseapps)

        if total_weight == 0:
            return baseapps[0]

        # 随机选择
        import random
        rand = random.uniform(0, total_weight)

        current = 0
        for baseapp in baseapps:
            current += baseapp["weight"]
            if rand <= current:
                return baseapp

        return baseapps[-1]

    def _consistent_hash_select(self, baseapps, player_id):
        """一致性哈希选择"""
        if player_id is None:
            return baseapps[0]

        # 简单哈希
        hash_val = hash(player_id) % len(baseapps)
        return baseapps[hash_val]


# KBEngine BaseAppMgr 中的负载均衡
class BaseAppManager(KBEngine.Entity):
    """BaseApp 管理器"""

    def __init__(self):
        KBEngine.Entity.__init__(self)
        self.load_balancer = BaseAppLoadBalancer(
            BaseAppLoadBalancer.STRATEGY_LEAST_CONNECTIONS
        )
        self.baseapps = {}

    def onPlayerLogin(self, player_id):
        """玩家登录"""
        # 选择 BaseApp
        baseapp = self.load_balancer.select_baseapp(player_id)

        if baseapp is None:
            ERROR_MSG("No BaseApp available for login")
            return None

        INFO_MSG(f"Player {player_id} assigned to BaseApp {baseapp['id']}")

        # 返回 BaseApp 地址
        return {
            "address": baseapp["address"],
            "port": baseapp["port"]
        }

四、故障转移

4.1 组件故障检测

# 故障检测和转移

class FailoverManager:
    """故障转移管理器"""

    def __init__(self):
        self.check_interval = 10  # 秒
        self.failure_threshold = 3  # 连续失败次数
        self.failure_counts = {}
        self.recovery_callbacks = {}

    def start_monitoring(self):
        """启动监控"""
        KBEngine.addTimer(self.check_interval, 0, self.check_all_components)

    def check_all_components(self):
        """检查所有组件"""
        self.check_baseapps()
        self.check_cellapps()
        self.check_dbmgr()

    def check_baseapps(self):
        """检查 BaseApp"""
        baseapps = KBEngine.getWatcher().get("components/baseapp")

        for baseapp_id, data in baseapps.items():
            # 检查状态
            if data.get("state") != "running":
                self.handle_failure("baseapp", baseapp_id)
            else:
                self.handle_recovery("baseapp", baseapp_id)

    def check_cellapps(self):
        """检查 CellApp"""
        cellapps = KBEngine.getWatcher().get("components/cellapp")

        for cellapp_id, data in cellapps.items():
            if data.get("state") != "running":
                self.handle_failure("cellapp", cellapp_id)
            else:
                self.handle_recovery("cellapp", cellapp_id)

    def check_dbmgr(self):
        """检查 DBMgr"""
        dbmgr = KBEngine.getWatcher().get("components/dbmgr")

        if dbmgr.get("state") != "running":
            CRITICAL_MSG("DBMgr is down!")
            # DBMgr 故障需要特殊处理
            self.handle_dbmgr_failure()

    def handle_failure(self, component_type, component_id):
        """处理组件故障"""
        key = f"{component_type}:{component_id}"

        # 增加失败计数
        self.failure_counts[key] = self.failure_counts.get(key, 0) + 1

        # 检查是否超过阈值
        if self.failure_counts[key] >= self.failure_threshold:
            ERROR_MSG(f"Component {component_type} {component_id} failed!")

            # 执行故障转移
            if component_type == "baseapp":
                self.failover_baseapp(component_id)
            elif component_type == "cellapp":
                self.failover_cellapp(component_id)

    def handle_recovery(self, component_type, component_id):
        """处理组件恢复"""
        key = f"{component_type}:{component_id}"

        if key in self.failure_counts:
            del self.failure_counts[key]
            INFO_MSG(f"Component {component_type} {component_id} recovered")

            # 执行恢复回调
            if key in self.recovery_callbacks:
                self.recovery_callbacks[key]()
                del self.recovery_callbacks[key]

    def failover_baseapp(self, failed_id):
        """BaseApp 故障转移"""
        ERROR_MSG(f"Failing over BaseApp {failed_id}")

        # 1. 获取受影响的玩家
        affected_players = self.get_baseapp_players(failed_id)

        # 2. 将玩家迁移到其他 BaseApp
        for player_id in affected_players:
            self.migrate_player(player_id, failed_id)

        # 3. 标记 BaseApp 为不可用
        self.mark_component_unavailable("baseapp", failed_id)

    def failover_cellapp(self, failed_id):
        """CellApp 故障转移"""
        ERROR_MSG(f"Failing over CellApp {failed_id}")

        # 1. 获取受影响的空间
        affected_spaces = self.get_cellapp_spaces(failed_id)

        # 2. 迁移空间到其他 CellApp
        for space_id in affected_spaces:
            self.migrate_space(space_id, failed_id)

        # 3. 标记 CellApp 为不可用
        self.mark_component_unavailable("cellapp", failed_id)

    def handle_dbmgr_failure(self):
        """DBMgr 故障处理"""
        CRITICAL_MSG("DBMgr failure detected!")

        # DBMgr 是单点,需要特殊处理
        # 1. 尝试重启 DBMgr
        # 2. 如果重启失败,切换到备用数据库
        # 3. 通知所有组件等待恢复

    def migrate_player(self, player_id, from_baseapp):
        """迁移玩家"""
        # 选择新的 BaseApp
        load_balancer = BaseAppLoadBalancer()
        new_baseapp = load_balancer.select_baseapp(player_id)

        if new_baseapp:
            # 通知玩家重连到新 BaseApp
            player = KBEngine.getEntity(player_id)
            if player and hasattr(player, 'client'):
                player.client.onBaseAppChange({
                    "new_address": new_baseapp["address"],
                    "new_port": new_baseapp["port"]
                })

            INFO_MSG(f"Player {player_id} migrated from BaseApp {from_baseapp} "
                    f"to {new_baseapp['id']}")

    def migrate_space(self, space_id, from_cellapp):
        """迁移空间"""
        # 选择新的 CellApp
        cellapps = [c for c in KBEngine.getWatcher().get("components/cellapp").items()
                   if c[0] != from_cellapp and c[1].get("state") == "running"]

        if not cellapps:
            ERROR_MSG(f"No available CellApp for space {space_id} migration")
            return

        new_cellapp_id, new_cellapp = cellapps[0]

        # 迁移空间
        INFO_MSG(f"Migrating space {space_id} from CellApp {from_cellapp} "
                f"to {new_cellapp_id}")

        # KBEngine 提供空间迁移接口
        # KBEngine.migrateSpace(space_id, new_cellapp_id)

五、集群配置

5.1 集群配置管理

<!-- KBEngine 集群配置 -->

<root>
    <!-- 机器配置 -->
    <machines>
        <machine>
            <!-- 机器 1 -->
            <ip>192.168.1.10</ip>
            <internal>
                <hostname>server-1</hostname>
            </internal>
            <external>
                <hostname>192.168.1.10</hostname>
            </external>
            <!-- 该机器上运行的组件 -->
            <components>
                <baseapp>
                    <count>2</count>
                </baseapp>
                <cellapp>
                    <count>3</count>
                </cellapp>
            </components>
        </machine>

        <machine>
            <!-- 机器 2 -->
            <ip>192.168.1.11</ip>
            <internal>
                <hostname>server-2</hostname>
            </internal>
            <external>
                <hostname>192.168.1.11</hostname>
            </external>
            <components>
                <baseapp>
                    <count>2</count>
                </baseapp>
                <cellapp>
                    <count>3</count>
                </cellapp>
            </components>
        </machine>
    </machines>

    <!-- 全局组件 -->
    <globalComponents>
        <dbmgr>
            <count>1</count>
        </dbmgr>
        <baseappmgr>
            <count>1</count>
        </baseappmgr>
        <cellappmgr>
            <count>1</count>
        </cellappmgr>
        <loginapp>
            <count>2</count>
        </loginapp>
    </globalComponents>
</root>

六、最佳实践

6.1 集群管理建议

实践说明
高可用关键组件多实例
故障检测定期心跳检查
自动转移故障自动迁移
负载均衡合理的负载分配
监控告警及时发现问题
容量规划预留扩展空间

七、总结

集群管理核心

集群管理 = 服务发现 + 负载均衡 + 故障转移 + 配置管理
- KBEngine 组件自动注册
- BaseApp/CellApp 负载均衡
- 组件故障自动转移
- 分布式部署配置

参考资料

  • KBEngine Cluster Deployment
  • Distributed Systems Patterns
  • Consistent Hashing
在 GitHub 上编辑此页
最后更新: 3/20/26, 6:06 AM
贡献者: cuihairu