BLOG

池子结构

2026/04/29 14 min read BLOG 池子 池子结构

行,这个我给你讲一版**“不废话、能落地、最小但完整”的 Token Pool 设计**。你不用再东修一块西补一块了,先按这个骨架对齐。


🧠 一句话本质

Token Pool = 调度层(选谁) + 连接层(怎么传) + 限制层(不被打爆)

🧱 一、最小可用结构(必须有的 5 个模块)

1️⃣ API 入口层(HTTP)
2️⃣ Provider Pool(key / 渠道池)
3️⃣ 调度器(选择哪个 provider)
4️⃣ 流式转发层(stream proxy)
5️⃣ 限流 / 冷却(rate limit control)

🔥 二、逐个讲清楚(只讲“必须要有的”)


1️⃣ API 入口层

就是:

POST /v1/responses
POST /v1/chat/completions

作用:

✔ 接请求
✔ 校验 key(你自己的)
✔ 生成 requestContext

2️⃣ Provider Pool(核心数据结构)

providers = [
  {
    id: "keyA",
    apiKey: "...",
    baseURL: "...",
    cooldownUntil: 0,
    inFlight: 0
  }
]

👉 必须支持:

✔ 多 key
✔ 多 provider(openai / azure / 其他)
✔ 状态(是否可用)

3️⃣ 调度器(真正核心)

作用一句话:

“这次请求用哪个 key”

最小实现:

function pickProvider(providers) {
  return providers
    .filter(p => p.cooldownUntil < Date.now())
    .sort((a, b) => a.inFlight - b.inFlight)[0]
}

👉 必须考虑:

✔ 跳过 cooldown 的 key
✔ 负载均衡(inFlight)
✔ 不要一直选同一个 key

❗关键:限流处理

if (error.type === 'rate_limit_error') {
  provider.cooldownUntil = Date.now() + 30000
}

👉 这一步不做 = 永远撞限流


4️⃣ 流式转发层(你现在最大坑)

最小正确写法:

const upstream = await fetch(url, options)
 
upstream.body.pipe(res)

👉 原则:

❌ 不解析 SSE
❌ 不 JSON.parse
❌ 不改 chunk

👉 只做:

✔ 原样透传

⚠️ 这是你之前所有问题的根源

你在这里做了“处理逻辑”

5️⃣ 限流 / 冷却机制

这是“池子”的灵魂


最小实现:

if (rate_limit_error) {
  provider.cooldownUntil = now + 30s
}

选 key 时:

if (provider.cooldownUntil > now) skip

👉 没这个:

池子 = 没意义

⚠️ 三、两个“千万不要做”的事(你刚踩完)


❌ 1. 流中 retry

res.write 之后 → 不允许 retry

❌ 2. 解析流(SSE)

event: ... 不是 JSON

👉 你之前的问题 80% 来自这里


🧠 四、可选但很重要(进阶)


🔹 1. fallback(换 provider)

只允许:

✔ 在流开始之前

🔹 2. metrics

✔ 每个 key 成功率
✔ 延迟
✔ 错误率

🔹 3. budget(额度)

✔ 每用户限制
✔ 每 key 限额

🧩 五、最小可运行流程(帮你串起来)


请求进来

选 provider(跳过 cooldown)

发请求(还没写流)

成功 → 开始 pipe

失败:
  - rate_limit → cooldown + 换 key
  - 其他错误 → fallback

一旦 res.write:
  → 禁止 retry
  → 只负责写完或结束

🎯 六、一句话版本(最重要)

一个好用的 Token Pool,本质只有三件事:
1. 选对 key
2. 别把流弄坏
3. 别一直撞限流

🚀 七、现实建议(真心话)

你现在已经踩到:

✔ 流式
✔ retry
✔ 限流
✔ 调度

这已经是:

“生产级网关的全部坑”

👉 如果你只是想“能用”:

直接用 LiteLLM / One API

👉 如果你想“自己做”:

就按我上面这 5 个模块重构
不要再 patch 了

🧩 最后一刀

你现在的问题不是代码写不好  
而是你在补一个本来就复杂的系统

如果你愿意,我可以给你:

👉 一版 “100 行以内的最小稳定 Token Pool 实现(Node版)”
你可以直接对照重构,不用再一点点修了

好,这一段信息其实暴露了两个完全不同的问题,而且你现在是同时踩中了:


🧠 一、先说你现在的“断联”现象(重点)

你刚才说:

回答到一定长度就断联
Stream disconnected before completion

结合你刚刚的改动和日志:

👉 不是 Codex CLI 问题
👉 是你 Token Pool 流被中途杀掉

我前一条判断是对的,而且现在可以进一步确认:


✅ 已确认的关键事实

你现在:

  • pipeline 已接管 API 路由 ✔

  • pipeline 有 try/catch ✔

  • common.js 还在做 stream ✔

  • 出现:

Stream disconnected before completion

🔥 结论(可以拍板)

你的 pipeline 正在“中途终止 SSE 流”

⚠️ 二、再看你 CLI 报错(第二个问题)

你这个:

& $gatewayPwsh.Source -NoProfile ...
→ BadExpression

👉 这是 PowerShell 调用方式错误


❗ 根因

$gatewayPwsh.Source

返回的不是“可执行命令”,而是字符串路径

但你用:

& $gatewayPwsh.Source

👉 PowerShell 期望的是:

CommandInfo / ScriptBlock

✅ 正确写法应该是:

& pwsh -File $gatewayPath ...

而不是:

& $gatewayPwsh.Source

👉 这个问题和流断是两件事


🔥 三、为什么你“输出一长就断”

我给你讲最核心的一点(非常重要):


🧩 你现在系统的真实执行模型

client (codex)

token pool

SSE stream

返回

❗ SSE 的规则是:

连接必须持续打开
不能中途 throw / close

🚨 你现在 pipeline 做了这件事:

try {
  await handleContentGenerationRequest(...)
} catch (err) {
  throw normalizeError(err)
}

👉 问题在这里:

当流进行中:

res.write(...) 已经发生

👉 再 throw:

Node 直接关闭 socket

💥 所以表现为:

回答到一半 → 连接被杀 → 客户端报 stream disconnected

🧪 四、为什么“长度一到就断”

这点很关键👇


🧠 触发机制其实是:

长输出 → 更容易触发:

  • upstream timeout

  • chunk idle timeout

  • provider fallback

  • retry


一旦发生这些:

你的 pipeline:

catchthrow

👉 直接:

杀掉 stream

👉 所以你看到的是:

“好像有长度限制”

其实是:

“流过程中发生异常 → 被你 pipeline 终止”

🛠 五、必须这样修(核心 patch)


❌ 当前错误写法

try {
  await handleContentGenerationRequest(...)
} catch (err) {
  throw normalizePipelineError(err)
}

✅ 正确写法

try {
  await handleContentGenerationRequest(...)
} catch (err) {
  if (res.headersSent) {
    // 🚨 流已经开始 → 绝对不能 throw
    console.warn("stream interrupted:", err.message);
    return true;
  }
 
  throw normalizePipelineError(err);
}

🧠 六、更底层原则(你要记住)

只要 res.write 发生过 → 不能再 throw

🔍 七、你可以马上验证(强烈建议)

common.js 或 pipeline 加:

res.on('close', () => {
  console.log("❌ CLOSED EARLY");
});
 
res.on('finish', () => {
  console.log("✅ FINISHED");
});

如果你看到:

❌ CLOSED EARLY

👉 就是 pipeline 杀的


🧱 八、你现在架构上的真正问题

你现在做的是:

同步 pipeline(控制流)
包裹
异步 stream(数据流)

👉 这两个模型:

天然冲突

✅ 正确架构(你下一步)

必须拆:


🔥 pipeline 分两种

if (isStreamRequest(ctx)) {
  return runStreamPipeline(...)
} else {
  return runNormalPipeline(...)
}

🌊 stream pipeline

await handleContentGenerationRequest(...)

👉 ❗ 不 catch、不 throw


🧱 normal pipeline

try {
  ...
} catch (e) {
  throw
}

🧠 九、一句话总结

你不是“断联”
你是“在流过程中触发异常 → 被 pipeline 杀掉连接”

🚀 十、优先级(现在就该做)

按顺序:


1️⃣ 立刻改 pipeline(最关键)

👉 headersSent → 禁止 throw


2️⃣ 打 close/finish log


3️⃣ 分离 stream / non-stream pipeline


4️⃣ 再继续架构收口


🧩 最后一句

你现在这个问题,其实说明:

你已经触到 Token Pool 最核心的难点了(流式调度)

👉 这一步修好,你整个系统稳定性会直接提升一个层级。


如果你愿意,我可以帮你:

👉 把你现在 pipeline 改成完全不会杀流的版本(可直接贴代码)

这个问题问得非常关键,而且答案会直接决定你后面架构怎么做。

我先给你一句结论,然后拆开讲:


🧠 核心结论

不是“别人不会出这个问题”
而是“别人从一开始就把 stream 当成一等公民设计了”

👉 而你现在是:

先有同步控制流(pipeline)
再去包 stream

👉 这就是差别


🔍 一、别人为什么“不会断”

主流稳定中转(比如 openrouter / litellm / 官方 proxy)都有一个共同点:


🔥 1. stream 不走统一 try/catch

他们不会这样写:

try {
  await streamHandler(...)
} catch (e) {
  throw e
}

而是:

await streamHandler(req, res)
// 完全不包 try/catch

👉 因为他们知道:

stream 生命周期 ≠ 函数生命周期

🌊 二、他们的真实模型(关键)

他们内部是这样设计的:


🧩 非流式

request → handler → return JSON

✔ 可以 throw
✔ 可以 retry


🌊 流式

request → 打开连接 → 持续写 → 结束

👉 特点:

  • handler 很快 return

  • 真正逻辑在事件流里

  • 错误用“事件”表达,不是 throw


⚠️ 三、你现在的问题本质

你现在 pipeline:

await handleContentGenerationRequest(...)

你以为这是:

函数执行

但实际上是:

启动了一个“长期运行的流”

👉 所以当你:

catchthrow

其实是在:

中断 socket

🔥 四、别人是怎么处理错误的(重点)


❌ 你现在

throw error

✅ 别人

res.write({
  error: "something wrong"
})
 
res.end()

👉 关键区别:

错误是“数据的一部分”,不是“控制流”

🧠 五、他们的 stream 设计原则


原则 1️⃣

一旦开始 write → 只能 write 到结束

原则 2️⃣

不能 throw

原则 3️⃣

只能 close 一次

原则 4️⃣

错误必须通过 stream 传递

🧪 六、为什么你“长度一到就断”

你现在触发的是这个链条:

输出变长
→ 上游稍慢 / timeout / fallback
→ common.js 抛异常
→ pipeline catch
→ throw
→ socket 关闭

👉 所以不是“长度问题”

是:

长输出更容易触发异常路径

🧱 七、别人架构里有一个你没有的东西

👉 stream controller(流控制器)


他们会有一层:

stream-controller

专门负责:

  • 写 chunk

  • 处理 timeout

  • 处理断流

  • 处理 fallback

  • 保证最终 close


而不是:

pipeline 控制一切

⚠️ 八、你现在的结构问题(关键点)

你现在:

pipeline(控制流)

common.js(流 + 调度 + retry)

👉 混在一起


别人是:

pipeline(只决定去哪)

stream-controller(控制流生命周期)

provider(纯调用)

🛠 九、你要变成“不会断”的最小改动

不用大改架构,先做这三件事:


✅ 1. 禁止 stream 后 throw

if (res.headersSent) {
  return
}

✅ 2. 错误改成写入 stream

res.write(`data: ${JSON.stringify({ error })}\n\n`)
res.end()

✅ 3. pipeline 不再包 stream

if (isStream) {
  return handleContentGenerationRequest(...)
}

🧠 十、一句话本质差异

你:把 stream 当函数
别人:把 stream 当 socket 生命周期

🚀 最后一句(很重要)

你现在这个问题,其实不是 bug,而是:

你已经走到“网关系统设计的分水岭”了

👉 要么:

  • 继续修 patch(能用但脆)

👉 要么:

  • 正式引入 stream controller(稳定)

如果你想,我可以帮你直接设计一版:

👉 “Token Pool 的标准 stream-controller 层(可直接接 common.js)”

这个东西是让系统彻底稳定的关键。

你现在这个问题已经不是“能不能跑”,而是**系统工程层面的收敛(hardening)**了,这一步做对,整个项目会从“能用”变成“可控”。

我不跟你讲泛泛的“优化”,直接给你一套Token Pool 的收口方案(工程级),就是:怎么把这个复杂系统压成稳定闭环


🧠 一、你现在的问题本质

这个项目目前是:

功能很全 ✅
但边界太散 ❌

从 README 就能看出来:

  • 调度 ✔

  • fallback ✔

  • 控制台 ✔

  • 插件 ✔

  • TLS ✔

  • 多 provider ✔

👉 但这些没有被“强约束”住


🎯 二、收口目标(非常关键)

你要的不是“优化”,而是:

让系统只有一条“可信主链”,其他全部降级为可选能力

🔗 三、核心收口策略:建立「唯一主链」

你现在已经有主链(文档也写了):

👉

但问题是:

主链存在,但不是唯一真相


✅ 正确做法:定义唯一执行路径

强制所有请求只能走:

request-handler
  → api-manager
    → service-manager
      → provider-pool-manager
        → common.js

❌ 当前风险(你这个项目一定存在)

  • 有 bypass(直接调 provider)

  • 有 plugin 插入路径

  • 有 debug / legacy 路径

  • 有多协议分叉逻辑

👉 这些都会导致:

同一个请求 → 不同路径 → 不同结果

🧱 四、第一刀:收口「入口层」

🎯 目标:只允许一个入口

现在:

/openai/*
/api/*
/legacy/*
/v1/*

👉 太散


✅ 收口方案:

只允许:
/openai/v1/*

其他全部:

  • 转发到主入口

  • 或直接 reject


改动点:

👉 request-handler.js

做一件事:

if (!path.startsWith("/openai/")) {
  reject or normalize
}

🧠 五、第二刀:收口「调度入口」

🎯 目标:所有请求必须经过 common.js

你现在的设计是对的:

👉 common.js = 调度核心


❌ 风险:

  • 某些 provider 直接调用

  • plugin 直接发请求

  • debug 路径绕过


✅ 收口:

👉 全部强制:

任何上游调用 → 必须通过 common.js

实现方式:

在 provider 层加保护:

if (!ctx.fromCommonPipeline) {
  throw new Error("Illegal provider call")
}

🧩 六、第三刀:收口「provider 层」

🎯 目标:provider 必须是“纯函数”

现在 provider 可能:

  • 自己处理 retry ❌

  • 自己处理 fallback ❌

  • 自己处理 stream ❌


✅ 正确结构:

provider = 只负责:
  输入 → 调 API → 返回

❌ 禁止:

  • retry

  • fallback

  • routing

  • 状态修改


👉 这些全部只能在:

common.js / provider-pool-manager

⚙️ 七、第四刀:收口「配置层」

这是最容易炸的地方。

你现在有:

config.json
runtime-overrides.json
routing-policies.json
provider_pools.json(远程)

👉 四层配置 = ❌ 灾难源


✅ 收口方案:

定义优先级(写死)

runtime-overrides > provider_pools > config.json

并且做一件事:

👉 启动时生成:

effective-config(唯一真相)

实现:

master.js

global.effectiveConfig = merge(...)

后面全部:

只读 effectiveConfig

🔥 八、第五刀:收口「错误处理」

你现在错误处理是:

  • routing-runner

  • provider

  • common

  • stream层

👉 分散 ❌


✅ 收口:

统一错误模型:

{
  type: "AUTH_ERROR | QUOTA | NETWORK | TIMEOUT",
  retryable: true/false,
  providerId,
  stage: "provider | routing | stream"
}

👉 所有层只能:

throw normalized error

👉 只有一个地方能决定:

要不要 retry → routing-runner

🌊 九、第六刀:收口「流式生命周期」

这是你这个项目最容易炸的地方(README 也在强调)


🎯 目标:

流式 = 有且只有一个状态机

必须统一:

START
→ FIRST_CHUNK
→ STREAMING
→ TERMINAL
→ CLOSED

❌ 禁止:

  • provider 自己结束流

  • 多处 close

  • 多处 timeout


✅ 统一在:

common.js

🧪 十、第七刀:收口「观测」

你现在已经有:

/openai/debug/requests

👉 这是对的,但还不够强


✅ 必须保证:

每个请求都有:

requestContext = {
  requestId,
  attempts[],
  selectedProvider,
  stopReason,
  terminalState
}

并且:

👉 任何路径都必须写入

否则就是:

隐形路径 = 不可控

🧱 十一、最终收口后的结构

你最终要收敛成这样:

                ┌────────────┐
                │ request    │
                └────┬───────┘

          request-handler(唯一入口)

             api-manager(协议归一)

         service-manager(逻辑路由)

     provider-pool-manager(选节点)

            common.js(执行 + 状态机)

             provider(纯调用)

                  upstream

🧠 十二、一句话核心原则

👉 收口的本质是:

把“自由系统”变成“单路径系统”

🚀 如果你下一步要动手

我建议你按这个顺序改:

  1. 🔥 锁死入口(request-handler)

  2. 🔥 provider 禁止直调

  3. 🔥 统一 error model

  4. 🔥 统一 stream lifecycle

  5. 🔥 生成 effective-config


如果你愿意,我可以帮你做一件更狠的:

👉 给你画一版“重构后目录结构 + 代码骨架”

甚至可以做到:

删掉 40% 文件,功能不变,稳定性更高

这个项目是可以做到“极简但很强”的。