实战教程:企业路由网关示例

目标

搭建一个最小可用的企业级 LLM 路由网关,支持多供应商选择、重试与退避、断路器与降级、基本缓存与可观测性,便于在生产环境统一入口与治理。

准备与依赖

# 初始化项目
mkdir llm-gateway && cd llm-gateway
npm init -y
npm i express pino openai axios dotenv cors

配置与密钥

# .env(示例)
OPENAI_API_KEY=...
OPENAI_BASE=https://api.openai.com/v1
ANTHROPIC_API_KEY=...
ANTHROPIC_BASE=https://api.anthropic.com
DEEPSEEK_API_KEY=...
DEEPSEEK_BASE=https://api.deepseek.com
PRIMARY=OPENAI
FALLBACK=DEEPSEEK
TIMEOUT_MS=15000
RETRIES=2

核心路由逻辑

// gateway.js(最小示例)
import express from 'express';
import cors from 'cors';
import pino from 'pino';
import 'dotenv/config';
import OpenAI from 'openai';
const log = pino();
const app = express(); app.use(cors()); app.use(express.json());

const clients = {
  OPENAI: new OpenAI({ apiKey: process.env.OPENAI_API_KEY, baseURL: process.env.OPENAI_BASE }),
  ANTHROPIC: { type: 'anthropic' }, // 省略,按需接入
  DEEPSEEK: new OpenAI({ apiKey: process.env.DEEPSEEK_API_KEY, baseURL: process.env.DEEPSEEK_BASE }) // 兼容形态示例
};

async function call(model, messages, { provider }={}){
  const ctl = new AbortController();
  const t = setTimeout(()=>ctl.abort(), Number(process.env.TIMEOUT_MS||15000));
  try{
    if(provider==='OPENAI' || !provider){
      const res = await clients.OPENAI.chat.completions.create({ model, messages, signal: ctl.signal });
      clearTimeout(t); return { ok: true, res };
    }
    if(provider==='DEEPSEEK'){
      const res = await clients.DEEPSEEK.chat.completions.create({ model, messages, signal: ctl.signal });
      clearTimeout(t); return { ok: true, res };
    }
    // TODO: 其它供应商
    throw new Error('Unsupported provider');
  }catch(e){ clearTimeout(t); return { ok: false, err: e } }
}

async function route(model, messages){
  const primary = process.env.PRIMARY || 'OPENAI';
  const fallback = process.env.FALLBACK || 'DEEPSEEK';
  const retries = Number(process.env.RETRIES || 2);

  let attempt = 0;
  let lastErr;
  while(attempt <= retries){
    attempt++;
    const { ok, res, err } = await call(model, messages, { provider: primary });
    if(ok) return res;
    lastErr = err; const backoff = Math.min(1000 * 2 ** attempt, 8000);
    await new Promise(r=>setTimeout(r, backoff));
  }
  log.warn({ err: String(lastErr) }, 'primary failed, fallback to secondary');
  const { ok, res } = await call(model, messages, { provider: fallback });
  if(ok) return res;
  return { choices: [{ message: { content: '抱歉,服务繁忙,请稍后再试。' } }] };
}

const cache = new Map();
app.post('/v1/chat/completions', async (req,res)=>{
  const { model, messages } = req.body;
  const key = JSON.stringify({ model, messages });
  if(cache.has(key)) return res.json(cache.get(key));
  const out = await route(model, messages);
  cache.set(key, out);
  log.info({ model, size: JSON.stringify(out).length }, 'llm proxy');
  res.json(out);
});

app.get('/health', (_,res)=>res.json({ ok: true }));
app.listen(8899, ()=>log.info('LLM 网关启动 http://localhost:8899'));

监控与审计

安全与合规建议

客户端接入示例(Anthropic / Qwen)与错误分类

以下示例展示两种方式:直接调用厂商 API 与通过企业网关的 OpenAI 兼容接口统一调用;并提供可复用的错误分类与重试策略。

Anthropic Messages API(原生)

// node: fetch 示例
import 'dotenv/config';
const resp = await fetch('https://api.anthropic.com/v1/messages', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'x-api-key': process.env.ANTHROPIC_API_KEY,
    'anthropic-version': '2023-06-01'
  },
  body: JSON.stringify({
    model: 'claude-3-5-sonnet-latest',
    max_tokens: 1024,
    tools: [ { name: 'search', input_schema: { type: 'object', properties: { q: { type: 'string' } }, required: ['q'] } } ],
    messages: [ { role: 'user', content: [ { type: 'text', text: '用一句话说明事件驱动架构的优点' } ] } ],
    system: '回答使用中文,必要时调用工具。'
  })
});
const data = await resp.json();
console.log(data);

Qwen DashScope Chat(原生 REST)

// node: fetch 示例(REST 形态,具体路径以官方最新文档为准)
import 'dotenv/config';
const resp2 = await fetch('https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Authorization': `Bearer ${process.env.DASHSCOPE_API_KEY}`
  },
  body: JSON.stringify({
    model: 'qwen-plus',
    input: '请用 3 点总结零信任的核心原则',
    parameters: { result_format: 'message', temperature: 0.7 }
  })
});
const data2 = await resp2.json();
console.log(data2);

通过企业路由网关(OpenAI 兼容)统一调用

// 使用 openai SDK 指向企业网关 baseURL,实现统一接口
import OpenAI from 'openai';
import 'dotenv/config';
const client = new OpenAI({ apiKey: process.env.GATEWAY_TOKEN, baseURL: 'https://gateway.example.com/v1' });

// Anthropic 统一路由(模型名由网关映射)
const r1 = await client.chat.completions.create({
  model: 'anthropic/claude-3-5-sonnet',
  messages: [ { role: 'user', content: '给出一份 5 条面试问题,主题是系统设计。' } ],
  temperature: 0.7,
  tools: [ { type: 'function', function: { name: 'search', parameters: { type: 'object', properties: { q: { type: 'string' } }, required: ['q'] } } } ]
});

// Qwen 统一路由
const r2 = await client.chat.completions.create({
  model: 'qwen/qwen-plus',
  messages: [ { role: 'user', content: '把这段英文翻译成中文,并保留专有名词' } ],
  temperature: 0.3
});

错误分类与重试策略(可复用)

// 将厂商返回与网络异常映射为统一分类,便于网关策略
export function classifyError(err) {
  const msg = String(err?.message || '');
  const code = err?.status || err?.code;
  if (code === 401 || /unauth|invalid[-_ ]api[-_ ]key/i.test(msg)) return { category: 'Authentication', retryable: false };
  if (code === 429 || /rate|quota|Too Many Requests/i.test(msg)) return { category: 'RateLimit', retryable: true, backoffMs: 2000 };
  if (code === 400 && /context|length|parameter|invalid request/i.test(msg)) return { category: 'InvalidRequest', retryable: false };
  if (code === 408 || /timeout|ETIMEDOUT/i.test(msg)) return { category: 'Timeout', retryable: true, backoffMs: 1000 };
  if ((code >= 500) || /overloaded|Internal Server Error|Bad Gateway|Service Unavailable/i.test(msg)) return { category: 'ServerError', retryable: true, backoffMs: 3000 };
  if (/ECONNRESET|ENETUNREACH|DNS|fetch failed|network/i.test(msg)) return { category: 'Network', retryable: true, backoffMs: 1000 };
  if (/tool[_-]?error|function[_-]?call/i.test(msg)) return { category: 'ToolExecution', retryable: false };
  return { category: 'Unknown', retryable: false };
}

export async function callWithRetry(fn, { maxAttempts = 3, providerHint } = {}) {
  let lastErr;
  for (let i = 0; i < maxAttempts; i++) {
    try { return await fn(); } catch (e) {
      lastErr = e;
      const info = classifyError(e);
      if (!info.retryable) break;
      const backoff = (info.backoffMs || 1000) * (i + 1);
      await new Promise(r => setTimeout(r, backoff));
      // 可选:根据 providerHint 切换路由(如从 Anthropic 切换到 Qwen)
    }
  }
  throw lastErr;
}
实践建议: 1)记录统一错误字段(category、code、message、provider、model); 2)按分类配置退避与切换策略; 3)在工具调用失败时降级为文本回答或移除工具依赖; 4)对 429/超载场景结合缓存与批处理减少压力。