背景
洛玖机器人的插件系统基于 FFI 消息总线:每个插件是独立的动态库(DLL/SO),运行在自己的 OS 线程中,通过 luo9_core 的 extern "C" 函数进行进程内 pub/sub 通信。
传统的消息分发是广播模式——宿主 publish 一条消息,所有 subscriber 同时收到。这简单高效,但有一个问题:无法控制消息的分发顺序,也无法让某个插件”拦截”消息,阻止后续插件处理。
最近为洛玖实现了插件优先级和消息阻断机制,关键在于:插件代码零修改。
核心思路:从广播到定向分发
原来的模型
1 2 3 4 5 6 7 8
| 宿主 dispatch_message() │ ▼ Bus::topic("luo9_message").publish(json) │ ├──► 插件A 的队列 (fan-out 副本) ├──► 插件B 的队列 (fan-out 副本) └──► 插件C 的队列 (fan-out 副本)
|
所有插件同时收到消息,没有顺序,没有阻断。
新的模型
1 2 3 4 5 6 7 8 9
| 宿主 priority_dispatch_message() │ ▼ 按优先级降序遍历 │ ├──► Bus::publish_to("luo9_message", json, [插件A的sub_id]) │ (插件A 是高优先级,block_enabled=true) │ → 插件A 收到消息后,停止分发 │ ✗ 插件B 和 插件C 不会收到这条消息
|
关键变化:用 publish_to(定向推送)替代 publish(广播)。
技术实现
1. Bus 层:新增 publish_to 和 unsubscribe
在 luo9_core 的 Bus 实现中,新增了两个核心函数:
publish_to(定向发布):与 publish 的广播行为不同,publish_to 只向指定的 subscriber ID 列表推送消息。内部实现会跳过已标记为 dead 的 subscriber,并在推送完成后通过 notify_all 唤醒可能阻塞的消费者线程。
unsubscribe(取消订阅):将 subscriber 标记为 dead,移除其消息队列,然后唤醒阻塞在 wait_pop 上的线程。被唤醒的线程会检测到 dead 状态并返回哨兵消息,从而让插件优雅退出。
Bus 内部维护了一个 per-topic 的 dead set,publish、publish_to、pop、wait_pop 都会在操作前检查该 set,确保已取消订阅的 subscriber 不会再收到任何消息。
2. 宿主层:预分配 subscriber
传统模式下,每个插件启动时自行调用 Bus::topic("luo9_message").subscribe() 创建 subscriber。
新模式下,宿主在加载插件时预先为插件创建 subscriber:
1 2 3 4 5 6 7 8 9
| fn create_subscribers(plugin_name: &str) -> HashMap<String, usize> { let topics = ["luo9_message", "luo9_notice", "luo9_meta_event", "luo9_task", "luo9_send"]; let mut ids = HashMap::new(); for topic in &topics { let id = Bus::topic(topic).subscribe().unwrap(); ids.insert(topic.to_string(), id); } ids }
|
然后通过 FFI 将这些 subscriber ID 传递给插件的 SDK:
1 2 3 4 5 6 7 8
| #[repr(C)] pub struct PluginSubscribers { pub message_sub_id: i32, pub meta_event_sub_id: i32, pub notice_sub_id: i32, pub task_sub_id: i32, pub send_sub_id: i32, }
|
宿主在 spawn 插件线程时,先调用 luo9_init_subscribers 传递映射:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| fn run_plugin(lib: Arc<Library>, plugin_name: &str, subscriber_ids: HashMap<String, usize>) { if let Ok(init_fn) = lib.get::<InitSubscribersFn>(b"luo9_init_subscribers\0") { let subs = PluginSubscribersRaw { message_sub_id: subscriber_ids.get("luo9_message").copied().unwrap_or(0) as i32, }; init_fn(&subs); }
let plugin_main: Symbol<unsafe extern "C" fn()> = lib.get(b"plugin_main\0").unwrap(); plugin_main(); }
|
3. SDK 层:透明替换
插件的 SDK 中,Topic::subscribe() 会检查是否有预分配的 subscriber ID:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| pub static PRECREATED_SUBSCRIBERS: OnceLock<Mutex<HashMap<String, usize>>> = OnceLock::new();
impl<'a> Topic<'a> { pub fn subscribe(&self) -> Result<usize, BusError> { if let Some(map) = PRECREATED_SUBSCRIBERS.get() { if let Some(&id) = map.lock().unwrap().get(self.name) { return Ok(id); } } let ret = unsafe { luo9_bus_subscribe(topic.as_ptr()) }; } }
|
这就是插件代码零修改的秘密:插件照常调用 Bus::topic("luo9_message").subscribe(),但 SDK 内部返回的是宿主预分配的 ID。插件完全感知不到区别。
4. 优先级分发
宿主维护一个按优先级排序的分发列表,存储在 RwLock 中实现无锁快速读取:
1 2 3 4 5 6 7 8 9 10
| static DISPATCH_LIST: RwLock<Vec<DispatchEntry>> = RwLock::new(Vec::new());
pub struct DispatchEntry { pub name: String, pub priority: i32, pub block_enabled: bool, pub message_sub_id: Option<usize>, pub notice_sub_id: Option<usize>, pub meta_event_sub_id: Option<usize>, }
|
注意 message_sub_id 的类型是 Option<usize> 而非 usize。这里有一个关键的设计考量——subscriber_id 从 0 开始。
分发逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| pub fn priority_dispatch_message(msg: Message) { let payload = serde_json::to_string(&PluginData::Message(msg)).unwrap(); let list = DISPATCH_LIST.read().unwrap();
for entry in list.iter() { let Some(sub_id) = entry.message_sub_id else { continue; };
Bus::topic("luo9_message") .publish_to(&payload, &[sub_id]) .unwrap();
if entry.block_enabled { break; } } }
|
5. subscriber_id 从 0 开始:一个踩坑记录
实现过程中遇到了一个隐蔽的 bug:第一个加载的插件 subscriber_id 为 0,被错误地当作”未订阅”处理。
最初的设计中,DispatchEntry 使用 usize 类型:
1 2 3
| pub struct DispatchEntry { pub message_sub_id: usize, }
|
分发时用 == 0 判断是否跳过:
1 2 3
| if entry.message_sub_id == 0 { continue; }
|
这在大多数语言中不会有问题(ID 通常从 1 开始),但 luo9_core 的 subscriber_id 是 per-topic 从 0 开始递增的。第一个加载的插件 plugin_doro 获得了 luo9_message 的 subscriber_id = 0,结果被错误跳过。
修复方案:将 DispatchEntry 改为 Option<usize>,用 None 表示”未订阅”,用 Some(0) 表示”合法的 subscriber_id 0”:
分发时用模式匹配替代数值比较:
1 2 3
|
let Some(sub_id) = entry.message_sub_id else { continue };
|
这个教训很简单:永远不要用魔术值表示”无”,用 Option。
6. 热重载:Sentinel 机制
禁用插件时,需要让插件线程退出。但插件的 plugin_main 通常阻塞在 wait_pop 上(Condvar 等待),如何唤醒它?
答案是 sentinel(哨兵消息):luo9_core 定义了一个特殊的哨兵字符串 __luo9_unsubscribed__,当 subscriber 被取消订阅后,wait_pop 会返回该哨兵而非正常消息。
禁用流程:
1 2 3 4 5 6
| pub fn unsubscribe_all(&self) { for (topic, &sub_id) in &self.subscriber_ids { Bus::topic(topic).unsubscribe(sub_id); } }
|
unsubscribe 会:
- 将 subscriber_id 加入 dead set
- 移除 subscriber(队列被丢弃)
notify_all 唤醒阻塞的 wait_pop
被唤醒的 wait_pop 检测到 subscriber 已被移除且在 dead set 中,返回哨兵消息。SDK 的 wait_pop 识别到哨兵后返回 Err(BusError::Unsubscribed),插件的循环自然退出:
1 2 3 4 5 6 7 8 9
| loop { let msg = match Bus::topic("luo9_message").wait_pop(sub_id) { Ok(msg) => msg, Err(BusError::Unsubscribed) => break, Err(e) => { error!("错误: {:?}", e); continue; } }; }
|
插件线程退出后,Arc<Library> 的引用计数归零,dlclose 自动执行,动态库被卸载。热重载时只需重新加载 .dll、创建新 subscriber、spawn 新线程。
7. 配置持久化
插件的优先级和阻断配置通过主配置文件持久化:
1 2 3 4 5 6 7 8 9
| [[plugins.plugins]] name = "plugin_doro" priority = 3 block_enabled = true
[[plugins.plugins]] name = "plugin_epic" priority = 1 block_enabled = false
|
宿主启动时,init_global_manager 将配置中的 priority 和 block_enabled 应用到每个插件的 PluginHandle 和 PluginInfo:
1 2 3 4 5 6 7
| for mut handle in handles { if let Some(entry) = config_entries.iter().find(|e| e.name == handle.name) { handle.priority = entry.priority; handle.block_enabled = entry.block_enabled; } manager.register_handle(handle); }
|
WebUI 修改优先级或阻断时,同步写回配置文件,确保重启后配置不丢失。
架构图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| ┌─────────────────────────────────────┐ │ 宿主 (luo9_bot) │ │ │ │ PluginManager │ │ ├── PluginHandle A (priority=10) │ │ │ ├── subscriber_ids: { │ │ │ │ message: 0, notice: 0 } │ │ │ ├── block_enabled: true │ │ │ └── thread_handle: JoinHandle │ │ ├── PluginHandle B (priority=5) │ │ │ └── subscriber_ids: { │ │ │ message: 1, notice: 1 } │ │ └── PluginHandle C (priority=0) │ │ └── subscriber_ids: { │ │ message: 2, notice: 2 } │ │ │ │ DISPATCH_LIST: RwLock<Vec<...>> │ │ (按 priority 降序排列) │ │ │ │ priority_dispatch_message() │ │ │ │ │ ├─► publish_to(msg, [0]) ← A │ │ │ A.block_enabled → break │ │ ├─► publish_to(msg, [1]) ← B │ │ └─► publish_to(msg, [2]) ← C │ └──────────────┬──────────────────────┘ │ ┌──────────────▼──────────────────────┐ │ luo9_core (FFI 消息总线) │ │ │ │ 每个 topic 维护独立的 subscriber │ │ 队列,per-topic 编号从 0 开始。 │ │ │ │ publish_to: 定向推送到指定队列 │ │ unsubscribe: 标记 dead + 唤醒线程 │ │ wait_pop: dead 时返回哨兵消息 │ └─────────────────────────────────────┘
|
subscriber ID 是 per-topic 的——每个 topic 独立编号,从 0 开始。插件 A 在 luo9_message 上是 0,在 luo9_notice 上也是 0,但它们是不同的 subscriber。
总结
| 层 |
改动 |
插件感知 |
| Core Bus |
新增 publish_to、unsubscribe、dead set、sentinel |
无 |
| SDK |
subscribe() 返回预分配 ID,wait_pop 识别 sentinel |
无 |
| 宿主 |
优先级分发、热重载、句柄管理、配置持久化 |
无 |
| 插件 |
零改动 |
完全透明 |
核心设计原则:将复杂性下沉到基础设施层,让上层保持简单。 插件开发者只需要关心业务逻辑,优先级、阻断、热重载全部由宿主和 SDK 自动处理。
这套机制的灵感来自操作系统的进程调度:高优先级进程先执行,可以抢占低优先级进程的 CPU 时间。只不过这里”抢占”的是消息的接收权。