Skip to content

速率限制

TopRouter 对 API 请求设有速率限制,以确保所有用户都能获得稳定的服务质量。

速率限制概述

速率限制基于以下维度:

维度说明
每分钟请求数 (RPM)每分钟允许的最大请求次数
每分钟 Token 数 (TPM)每分钟允许处理的最大 Token 数量
并发请求数同时进行的最大请求数

ℹ️ 说明

具体的速率限制数值取决于您的账户等级和所使用的模型。不同模型可能有不同的限制。

速率限制响应

当超过速率限制时,API 会返回 429 Too Many Requests 状态码:

json
{
  "error": {
    "message": "Rate limit exceeded. Please retry after some time.",
    "type": "rate_limit_error",
    "code": "rate_limit_exceeded"
  }
}

响应头

速率限制相关的响应头信息:

Header说明
x-ratelimit-limit-requests允许的最大请求数
x-ratelimit-remaining-requests剩余的请求数
x-ratelimit-reset-requests请求限制重置时间

最佳实践

1. 实现指数退避重试

python
import time
from openai import OpenAI, RateLimitError

client = OpenAI(
    base_url="https://toprouter.cc",
    api_key="your-api-key"
)

def chat_with_retry(messages, model="anthropic/claude-4.6-sonnet", max_retries=5):
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=messages
            )
            return response
        except RateLimitError:
            wait_time = min(2 ** attempt, 60)
            print(f"速率限制,等待 {wait_time} 秒后重试...")
            time.sleep(wait_time)

    raise Exception("达到最大重试次数")

2. 请求队列控制

python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    base_url="https://toprouter.cc",
    api_key="your-api-key"
)

async def process_batch(messages_list, max_concurrent=5):
    """控制并发请求数"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def make_request(messages):
        async with semaphore:
            response = await client.chat.completions.create(
                model="openai/gpt-5.5",
                messages=messages
            )
            return response

    tasks = [make_request(msgs) for msgs in messages_list]
    return await asyncio.gather(*tasks)

3. 合理选择模型

不同模型的速率限制不同。对于批量处理任务,可以考虑使用限制更宽松的轻量模型:

场景推荐模型原因
批量处理openai/gpt-5.5-instant限制宽松,速度快
实时对话anthropic/claude-4.6-sonnet性能均衡
低频复杂任务anthropic/claude-4.8-opus强大能力

4. 缓存常见请求

对于相同输入可能产生相同输出的场景,使用缓存减少不必要的 API 调用:

python
from functools import lru_cache
import hashlib
import json

# 简单的内存缓存
cache = {}

def cached_chat(messages, model="openai/gpt-5.5"):
    # 生成缓存键
    key = hashlib.md5(
        json.dumps({"model": model, "messages": messages}).encode()
    ).hexdigest()

    if key in cache:
        return cache[key]

    response = client.chat.completions.create(
        model=model,
        messages=messages
    )

    result = response.choices[0].message.content
    cache[key] = result
    return result

5. 使用流式传输

流式传输可以更快获得首个 token 的响应,改善用户体验:

python
stream = client.chat.completions.create(
    model="anthropic/claude-4.6-sonnet",
    messages=[{"role": "user", "content": "你好"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

常见问题

频繁遇到 429 错误?

  1. 实现指数退避重试机制
  2. 降低请求频率
  3. 使用请求队列控制并发数
  4. 考虑使用轻量模型分担负载

如何提高速率限制?

如需更高的速率限制,请通过社区渠道联系我们:

📖 相关文档

Unified AI API Gateway — Access 200+ models through one endpoint.