Rate Limits
Understanding and handling API rate limits
Rate Limits
TL;DR
Default: 60 RPM per API key. When rate limited (429), check Retry-After header and implement exponential backoff.
Rate limits protect the API from abuse and ensure fair access for all users. Learn how limits work and how to handle them gracefully.
Rate Limits
All API keys have a default limit of 60 requests per minute (RPM). To request a higher limit, contact [email protected].
Rate Limit Headers
Every response includes rate limit information:
X-RateLimit-Limit-RPM: 60
X-RateLimit-Remaining-RPM: 55
X-RateLimit-Reset-RPM: 1706745660| Header | Description |
|---|---|
X-RateLimit-Limit-RPM | Your current RPM limit |
X-RateLimit-Remaining-RPM | Remaining requests this minute |
X-RateLimit-Reset-RPM | Unix timestamp when the window resets |
Rate Limit Errors
When you exceed limits, you'll receive a 429 Too Many Requests response:
{
"error": {
"message": "Rate limit exceeded. Please retry after 5 seconds.",
"type": "rate_limit_error",
"code": "rate_limit_exceeded"
}
}The response includes a Retry-After header:
Retry-After: 5Handling Rate Limits
Basic Retry Logic
import time
from openai import OpenAI, RateLimitError
client = OpenAI(
api_key="ask_your_key",
base_url="https://api.assisters.dev/v1"
)
def make_request_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
return client.chat.completions.create(
model="assisters-chat-v1",
messages=messages
)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
wait_time = int(e.response.headers.get("Retry-After", 5))
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)Exponential Backoff
import time
import random
from openai import RateLimitError
def exponential_backoff(func, max_retries=5, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except RateLimitError:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = (base_delay * (2 ** attempt)) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
time.sleep(delay)JavaScript Implementation
async function withRetry(fn, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
if (error.status !== 429 || attempt === maxRetries - 1) {
throw error;
}
const retryAfter = parseInt(error.headers?.['retry-after'] || '5');
console.log(`Rate limited. Waiting ${retryAfter}s...`);
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
}
}
}
// Usage
const response = await withRetry(() =>
client.chat.completions.create({
model: 'assisters-chat-v1',
messages: [{ role: 'user', content: 'Hello!' }]
})
);Best Practices
Implement Retries
Always implement retry logic with exponential backoff
Monitor Headers
Check rate limit headers to proactively slow down
Queue Requests
Use a request queue to control throughput
Cache Responses
Cache responses when possible to reduce API calls
Request Queuing
For high-volume applications, implement a request queue:
import asyncio
from collections import deque
class RateLimitedClient:
def __init__(self, rpm_limit=100):
self.rpm_limit = rpm_limit
self.request_times = deque()
self.lock = asyncio.Lock()
async def wait_for_capacity(self):
async with self.lock:
now = time.time()
# Remove requests older than 1 minute
while self.request_times and now - self.request_times[0] > 60:
self.request_times.popleft()
# If at capacity, wait
if len(self.request_times) >= self.rpm_limit:
wait_time = 60 - (now - self.request_times[0])
await asyncio.sleep(wait_time)
self.request_times.append(time.time())
async def chat(self, messages):
await self.wait_for_capacity()
return await self.client.chat.completions.create(
model="assisters-chat-v1",
messages=messages
)Monitoring Usage
Track your usage proactively:
class UsageTracker:
def __init__(self):
self.minute_requests = 0
self.minute_tokens = 0
self.minute_start = time.time()
def record(self, tokens_used):
# Reset if minute has passed
if time.time() - self.minute_start > 60:
self.minute_requests = 0
self.minute_tokens = 0
self.minute_start = time.time()
self.minute_requests += 1
self.minute_tokens += tokens_used
def get_usage(self):
return {
"requests_this_minute": self.minute_requests,
"tokens_this_minute": self.minute_tokens
}Burst Handling
For batch processing, spread requests over time:
import asyncio
async def process_batch(items, rpm_limit=100):
# Calculate delay between requests
delay = 60 / rpm_limit # seconds per request
results = []
for item in items:
result = await process_item(item)
results.append(result)
await asyncio.sleep(delay)
return resultsNeed Higher Limits?
Contact Support
To request a higher rate limit for your API key, contact support