行,这个我给你讲一版**“不废话、能落地、最小但完整”的 Token Pool 设计**。你不用再东修一块西补一块了,先按这个骨架对齐。
🧠 一句话本质
Token Pool = 调度层(选谁) + 连接层(怎么传) + 限制层(不被打爆)🧱 一、最小可用结构(必须有的 5 个模块)
1️⃣ API 入口层(HTTP)
2️⃣ Provider Pool(key / 渠道池)
3️⃣ 调度器(选择哪个 provider)
4️⃣ 流式转发层(stream proxy)
5️⃣ 限流 / 冷却(rate limit control)🔥 二、逐个讲清楚(只讲“必须要有的”)
1️⃣ API 入口层
就是:
POST /v1/responses
POST /v1/chat/completions作用:
✔ 接请求
✔ 校验 key(你自己的)
✔ 生成 requestContext2️⃣ Provider Pool(核心数据结构)
providers = [
{
id: "keyA",
apiKey: "...",
baseURL: "...",
cooldownUntil: 0,
inFlight: 0
}
]👉 必须支持:
✔ 多 key
✔ 多 provider(openai / azure / 其他)
✔ 状态(是否可用)3️⃣ 调度器(真正核心)
作用一句话:
“这次请求用哪个 key”最小实现:
function pickProvider(providers) {
return providers
.filter(p => p.cooldownUntil < Date.now())
.sort((a, b) => a.inFlight - b.inFlight)[0]
}👉 必须考虑:
✔ 跳过 cooldown 的 key
✔ 负载均衡(inFlight)
✔ 不要一直选同一个 key❗关键:限流处理
if (error.type === 'rate_limit_error') {
provider.cooldownUntil = Date.now() + 30000
}👉 这一步不做 = 永远撞限流
4️⃣ 流式转发层(你现在最大坑)
最小正确写法:
const upstream = await fetch(url, options)
upstream.body.pipe(res)👉 原则:
❌ 不解析 SSE
❌ 不 JSON.parse
❌ 不改 chunk👉 只做:
✔ 原样透传⚠️ 这是你之前所有问题的根源
你在这里做了“处理逻辑”5️⃣ 限流 / 冷却机制
这是“池子”的灵魂
最小实现:
if (rate_limit_error) {
provider.cooldownUntil = now + 30s
}选 key 时:
if (provider.cooldownUntil > now) skip👉 没这个:
池子 = 没意义⚠️ 三、两个“千万不要做”的事(你刚踩完)
❌ 1. 流中 retry
res.write 之后 → 不允许 retry❌ 2. 解析流(SSE)
event: ... 不是 JSON👉 你之前的问题 80% 来自这里
🧠 四、可选但很重要(进阶)
🔹 1. fallback(换 provider)
只允许:
✔ 在流开始之前🔹 2. metrics
✔ 每个 key 成功率
✔ 延迟
✔ 错误率🔹 3. budget(额度)
✔ 每用户限制
✔ 每 key 限额🧩 五、最小可运行流程(帮你串起来)
请求进来
↓
选 provider(跳过 cooldown)
↓
发请求(还没写流)
↓
成功 → 开始 pipe
↓
失败:
- rate_limit → cooldown + 换 key
- 其他错误 → fallback
↓
一旦 res.write:
→ 禁止 retry
→ 只负责写完或结束🎯 六、一句话版本(最重要)
一个好用的 Token Pool,本质只有三件事:
1. 选对 key
2. 别把流弄坏
3. 别一直撞限流🚀 七、现实建议(真心话)
你现在已经踩到:
✔ 流式
✔ retry
✔ 限流
✔ 调度这已经是:
“生产级网关的全部坑”👉 如果你只是想“能用”:
直接用 LiteLLM / One API👉 如果你想“自己做”:
就按我上面这 5 个模块重构
不要再 patch 了🧩 最后一刀
你现在的问题不是代码写不好
而是你在补一个本来就复杂的系统如果你愿意,我可以给你:
👉 一版 “100 行以内的最小稳定 Token Pool 实现(Node版)”
你可以直接对照重构,不用再一点点修了
好,这一段信息其实暴露了两个完全不同的问题,而且你现在是同时踩中了:
🧠 一、先说你现在的“断联”现象(重点)
你刚才说:
回答到一定长度就断联
Stream disconnected before completion
结合你刚刚的改动和日志:
👉 不是 Codex CLI 问题
👉 是你 Token Pool 流被中途杀掉
我前一条判断是对的,而且现在可以进一步确认:
✅ 已确认的关键事实
你现在:
-
pipeline 已接管 API 路由 ✔
-
pipeline 有 try/catch ✔
-
common.js 还在做 stream ✔
-
出现:
Stream disconnected before completion🔥 结论(可以拍板)
你的 pipeline 正在“中途终止 SSE 流”⚠️ 二、再看你 CLI 报错(第二个问题)
你这个:
& $gatewayPwsh.Source -NoProfile ...
→ BadExpression👉 这是 PowerShell 调用方式错误
❗ 根因
$gatewayPwsh.Source返回的不是“可执行命令”,而是字符串路径
但你用:
& $gatewayPwsh.Source👉 PowerShell 期望的是:
CommandInfo / ScriptBlock✅ 正确写法应该是:
& pwsh -File $gatewayPath ...而不是:
& $gatewayPwsh.Source👉 这个问题和流断是两件事
🔥 三、为什么你“输出一长就断”
我给你讲最核心的一点(非常重要):
🧩 你现在系统的真实执行模型
client (codex)
↓
token pool
↓
SSE stream
↓
返回❗ SSE 的规则是:
连接必须持续打开
不能中途 throw / close🚨 你现在 pipeline 做了这件事:
try {
await handleContentGenerationRequest(...)
} catch (err) {
throw normalizeError(err)
}👉 问题在这里:
当流进行中:
res.write(...) 已经发生👉 再 throw:
Node 直接关闭 socket💥 所以表现为:
回答到一半 → 连接被杀 → 客户端报 stream disconnected🧪 四、为什么“长度一到就断”
这点很关键👇
🧠 触发机制其实是:
长输出 → 更容易触发:
-
upstream timeout
-
chunk idle timeout
-
provider fallback
-
retry
一旦发生这些:
你的 pipeline:
catch → throw👉 直接:
杀掉 stream👉 所以你看到的是:
“好像有长度限制”其实是:
“流过程中发生异常 → 被你 pipeline 终止”🛠 五、必须这样修(核心 patch)
❌ 当前错误写法
try {
await handleContentGenerationRequest(...)
} catch (err) {
throw normalizePipelineError(err)
}✅ 正确写法
try {
await handleContentGenerationRequest(...)
} catch (err) {
if (res.headersSent) {
// 🚨 流已经开始 → 绝对不能 throw
console.warn("stream interrupted:", err.message);
return true;
}
throw normalizePipelineError(err);
}🧠 六、更底层原则(你要记住)
只要 res.write 发生过 → 不能再 throw🔍 七、你可以马上验证(强烈建议)
在 common.js 或 pipeline 加:
res.on('close', () => {
console.log("❌ CLOSED EARLY");
});
res.on('finish', () => {
console.log("✅ FINISHED");
});如果你看到:
❌ CLOSED EARLY👉 就是 pipeline 杀的
🧱 八、你现在架构上的真正问题
你现在做的是:
同步 pipeline(控制流)
包裹
异步 stream(数据流)👉 这两个模型:
天然冲突✅ 正确架构(你下一步)
必须拆:
🔥 pipeline 分两种
if (isStreamRequest(ctx)) {
return runStreamPipeline(...)
} else {
return runNormalPipeline(...)
}🌊 stream pipeline
await handleContentGenerationRequest(...)👉 ❗ 不 catch、不 throw
🧱 normal pipeline
try {
...
} catch (e) {
throw
}🧠 九、一句话总结
你不是“断联”
你是“在流过程中触发异常 → 被 pipeline 杀掉连接”🚀 十、优先级(现在就该做)
按顺序:
1️⃣ 立刻改 pipeline(最关键)
👉 headersSent → 禁止 throw
2️⃣ 打 close/finish log
3️⃣ 分离 stream / non-stream pipeline
4️⃣ 再继续架构收口
🧩 最后一句
你现在这个问题,其实说明:
你已经触到 Token Pool 最核心的难点了(流式调度)👉 这一步修好,你整个系统稳定性会直接提升一个层级。
如果你愿意,我可以帮你:
👉 把你现在 pipeline 改成完全不会杀流的版本(可直接贴代码)
这个问题问得非常关键,而且答案会直接决定你后面架构怎么做。
我先给你一句结论,然后拆开讲:
🧠 核心结论
不是“别人不会出这个问题”
而是“别人从一开始就把 stream 当成一等公民设计了”👉 而你现在是:
先有同步控制流(pipeline)
再去包 stream👉 这就是差别
🔍 一、别人为什么“不会断”
主流稳定中转(比如 openrouter / litellm / 官方 proxy)都有一个共同点:
🔥 1. stream 不走统一 try/catch
他们不会这样写:
try {
await streamHandler(...)
} catch (e) {
throw e
}而是:
await streamHandler(req, res)
// 完全不包 try/catch👉 因为他们知道:
stream 生命周期 ≠ 函数生命周期🌊 二、他们的真实模型(关键)
他们内部是这样设计的:
🧩 非流式
request → handler → return JSON✔ 可以 throw
✔ 可以 retry
🌊 流式
request → 打开连接 → 持续写 → 结束👉 特点:
-
handler 很快 return
-
真正逻辑在事件流里
-
错误用“事件”表达,不是 throw
⚠️ 三、你现在的问题本质
你现在 pipeline:
await handleContentGenerationRequest(...)你以为这是:
函数执行但实际上是:
启动了一个“长期运行的流”👉 所以当你:
catch → throw其实是在:
中断 socket🔥 四、别人是怎么处理错误的(重点)
❌ 你现在
throw error✅ 别人
res.write({
error: "something wrong"
})
res.end()👉 关键区别:
错误是“数据的一部分”,不是“控制流”🧠 五、他们的 stream 设计原则
原则 1️⃣
一旦开始 write → 只能 write 到结束原则 2️⃣
不能 throw原则 3️⃣
只能 close 一次原则 4️⃣
错误必须通过 stream 传递🧪 六、为什么你“长度一到就断”
你现在触发的是这个链条:
输出变长
→ 上游稍慢 / timeout / fallback
→ common.js 抛异常
→ pipeline catch
→ throw
→ socket 关闭👉 所以不是“长度问题”
是:
长输出更容易触发异常路径🧱 七、别人架构里有一个你没有的东西
👉 stream controller(流控制器)
他们会有一层:
stream-controller专门负责:
-
写 chunk
-
处理 timeout
-
处理断流
-
处理 fallback
-
保证最终 close
而不是:
pipeline 控制一切⚠️ 八、你现在的结构问题(关键点)
你现在:
pipeline(控制流)
↓
common.js(流 + 调度 + retry)👉 混在一起
别人是:
pipeline(只决定去哪)
↓
stream-controller(控制流生命周期)
↓
provider(纯调用)🛠 九、你要变成“不会断”的最小改动
不用大改架构,先做这三件事:
✅ 1. 禁止 stream 后 throw
if (res.headersSent) {
return
}✅ 2. 错误改成写入 stream
res.write(`data: ${JSON.stringify({ error })}\n\n`)
res.end()✅ 3. pipeline 不再包 stream
if (isStream) {
return handleContentGenerationRequest(...)
}🧠 十、一句话本质差异
你:把 stream 当函数
别人:把 stream 当 socket 生命周期🚀 最后一句(很重要)
你现在这个问题,其实不是 bug,而是:
你已经走到“网关系统设计的分水岭”了👉 要么:
- 继续修 patch(能用但脆)
👉 要么:
- 正式引入 stream controller(稳定)
如果你想,我可以帮你直接设计一版:
👉 “Token Pool 的标准 stream-controller 层(可直接接 common.js)”
这个东西是让系统彻底稳定的关键。
你现在这个问题已经不是“能不能跑”,而是**系统工程层面的收敛(hardening)**了,这一步做对,整个项目会从“能用”变成“可控”。
我不跟你讲泛泛的“优化”,直接给你一套Token Pool 的收口方案(工程级),就是:怎么把这个复杂系统压成稳定闭环。
🧠 一、你现在的问题本质
这个项目目前是:
功能很全 ✅
但边界太散 ❌
从 README 就能看出来:
-
调度 ✔
-
fallback ✔
-
控制台 ✔
-
插件 ✔
-
TLS ✔
-
多 provider ✔
👉 但这些没有被“强约束”住
🎯 二、收口目标(非常关键)
你要的不是“优化”,而是:
让系统只有一条“可信主链”,其他全部降级为可选能力🔗 三、核心收口策略:建立「唯一主链」
你现在已经有主链(文档也写了):
👉
但问题是:
主链存在,但不是唯一真相
✅ 正确做法:定义唯一执行路径
强制所有请求只能走:
request-handler
→ api-manager
→ service-manager
→ provider-pool-manager
→ common.js❌ 当前风险(你这个项目一定存在)
-
有 bypass(直接调 provider)
-
有 plugin 插入路径
-
有 debug / legacy 路径
-
有多协议分叉逻辑
👉 这些都会导致:
同一个请求 → 不同路径 → 不同结果🧱 四、第一刀:收口「入口层」
🎯 目标:只允许一个入口
现在:
/openai/*
/api/*
/legacy/*
/v1/*👉 太散
✅ 收口方案:
只允许:
/openai/v1/*其他全部:
-
转发到主入口
-
或直接 reject
改动点:
👉 request-handler.js
做一件事:
if (!path.startsWith("/openai/")) {
reject or normalize
}🧠 五、第二刀:收口「调度入口」
🎯 目标:所有请求必须经过 common.js
你现在的设计是对的:
👉 common.js = 调度核心
❌ 风险:
-
某些 provider 直接调用
-
plugin 直接发请求
-
debug 路径绕过
✅ 收口:
👉 全部强制:
任何上游调用 → 必须通过 common.js实现方式:
在 provider 层加保护:
if (!ctx.fromCommonPipeline) {
throw new Error("Illegal provider call")
}🧩 六、第三刀:收口「provider 层」
🎯 目标:provider 必须是“纯函数”
现在 provider 可能:
-
自己处理 retry ❌
-
自己处理 fallback ❌
-
自己处理 stream ❌
✅ 正确结构:
provider = 只负责:
输入 → 调 API → 返回❌ 禁止:
-
retry
-
fallback
-
routing
-
状态修改
👉 这些全部只能在:
common.js / provider-pool-manager⚙️ 七、第四刀:收口「配置层」
这是最容易炸的地方。
你现在有:
config.json
runtime-overrides.json
routing-policies.json
provider_pools.json(远程)👉 四层配置 = ❌ 灾难源
✅ 收口方案:
定义优先级(写死)
runtime-overrides > provider_pools > config.json并且做一件事:
👉 启动时生成:
effective-config(唯一真相)实现:
在 master.js:
global.effectiveConfig = merge(...)后面全部:
只读 effectiveConfig🔥 八、第五刀:收口「错误处理」
你现在错误处理是:
-
routing-runner
-
provider
-
common
-
stream层
👉 分散 ❌
✅ 收口:
统一错误模型:
{
type: "AUTH_ERROR | QUOTA | NETWORK | TIMEOUT",
retryable: true/false,
providerId,
stage: "provider | routing | stream"
}👉 所有层只能:
throw normalized error👉 只有一个地方能决定:
要不要 retry → routing-runner🌊 九、第六刀:收口「流式生命周期」
这是你这个项目最容易炸的地方(README 也在强调)
🎯 目标:
流式 = 有且只有一个状态机必须统一:
START
→ FIRST_CHUNK
→ STREAMING
→ TERMINAL
→ CLOSED❌ 禁止:
-
provider 自己结束流
-
多处 close
-
多处 timeout
✅ 统一在:
common.js🧪 十、第七刀:收口「观测」
你现在已经有:
/openai/debug/requests👉 这是对的,但还不够强
✅ 必须保证:
每个请求都有:
requestContext = {
requestId,
attempts[],
selectedProvider,
stopReason,
terminalState
}并且:
👉 任何路径都必须写入
否则就是:
隐形路径 = 不可控🧱 十一、最终收口后的结构
你最终要收敛成这样:
┌────────────┐
│ request │
└────┬───────┘
↓
request-handler(唯一入口)
↓
api-manager(协议归一)
↓
service-manager(逻辑路由)
↓
provider-pool-manager(选节点)
↓
common.js(执行 + 状态机)
↓
provider(纯调用)
↓
upstream🧠 十二、一句话核心原则
👉 收口的本质是:
把“自由系统”变成“单路径系统”🚀 如果你下一步要动手
我建议你按这个顺序改:
-
🔥 锁死入口(request-handler)
-
🔥 provider 禁止直调
-
🔥 统一 error model
-
🔥 统一 stream lifecycle
-
🔥 生成 effective-config
如果你愿意,我可以帮你做一件更狠的:
👉 给你画一版“重构后目录结构 + 代码骨架”
甚至可以做到:
删掉 40% 文件,功能不变,稳定性更高这个项目是可以做到“极简但很强”的。