速率限制
TopRouter 对 API 请求设有速率限制,以确保所有用户都能获得稳定的服务质量。
速率限制概述
速率限制基于以下维度:
| 维度 | 说明 |
|---|---|
| 每分钟请求数 (RPM) | 每分钟允许的最大请求次数 |
| 每分钟 Token 数 (TPM) | 每分钟允许处理的最大 Token 数量 |
| 并发请求数 | 同时进行的最大请求数 |
ℹ️ 说明
具体的速率限制数值取决于您的账户等级和所使用的模型。不同模型可能有不同的限制。
速率限制响应
当超过速率限制时,API 会返回 429 Too Many Requests 状态码:
json
{
"error": {
"message": "Rate limit exceeded. Please retry after some time.",
"type": "rate_limit_error",
"code": "rate_limit_exceeded"
}
}响应头
速率限制相关的响应头信息:
| Header | 说明 |
|---|---|
x-ratelimit-limit-requests | 允许的最大请求数 |
x-ratelimit-remaining-requests | 剩余的请求数 |
x-ratelimit-reset-requests | 请求限制重置时间 |
最佳实践
1. 实现指数退避重试
python
import time
from openai import OpenAI, RateLimitError
client = OpenAI(
base_url="https://toprouter.cc",
api_key="your-api-key"
)
def chat_with_retry(messages, model="anthropic/claude-4.6-sonnet", max_retries=5):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=messages
)
return response
except RateLimitError:
wait_time = min(2 ** attempt, 60)
print(f"速率限制,等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
raise Exception("达到最大重试次数")2. 请求队列控制
python
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://toprouter.cc",
api_key="your-api-key"
)
async def process_batch(messages_list, max_concurrent=5):
"""控制并发请求数"""
semaphore = asyncio.Semaphore(max_concurrent)
async def make_request(messages):
async with semaphore:
response = await client.chat.completions.create(
model="openai/gpt-5.5",
messages=messages
)
return response
tasks = [make_request(msgs) for msgs in messages_list]
return await asyncio.gather(*tasks)3. 合理选择模型
不同模型的速率限制不同。对于批量处理任务,可以考虑使用限制更宽松的轻量模型:
| 场景 | 推荐模型 | 原因 |
|---|---|---|
| 批量处理 | openai/gpt-5.5-instant | 限制宽松,速度快 |
| 实时对话 | anthropic/claude-4.6-sonnet | 性能均衡 |
| 低频复杂任务 | anthropic/claude-4.8-opus | 强大能力 |
4. 缓存常见请求
对于相同输入可能产生相同输出的场景,使用缓存减少不必要的 API 调用:
python
from functools import lru_cache
import hashlib
import json
# 简单的内存缓存
cache = {}
def cached_chat(messages, model="openai/gpt-5.5"):
# 生成缓存键
key = hashlib.md5(
json.dumps({"model": model, "messages": messages}).encode()
).hexdigest()
if key in cache:
return cache[key]
response = client.chat.completions.create(
model=model,
messages=messages
)
result = response.choices[0].message.content
cache[key] = result
return result5. 使用流式传输
流式传输可以更快获得首个 token 的响应,改善用户体验:
python
stream = client.chat.completions.create(
model="anthropic/claude-4.6-sonnet",
messages=[{"role": "user", "content": "你好"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")常见问题
频繁遇到 429 错误?
- 实现指数退避重试机制
- 降低请求频率
- 使用请求队列控制并发数
- 考虑使用轻量模型分担负载
如何提高速率限制?
如需更高的速率限制,请通过社区渠道联系我们:
- Telegram:t.me/+usySmwAIO-I3YjBl
- Discord:discord.gg/URHXXkVJ6y
