Rate limiting is the bouncer at your API’s door. Too strict and legitimate users bounce. Too loose and bad actors overwhelm your service. Here’s how to get it right.
Why Rate Limit?#
Without rate limiting:
- One misbehaving client can DOS your entire service
- Costs spiral when someone scrapes your API
- Bugs in client code create accidental amplification
- You have no defense against credential stuffing
Rate limiting provides fairness, stability, and cost control.
The Basic Algorithms#
Fixed Window#
Count requests in fixed time windows (e.g., 100 requests per minute).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import time
from collections import defaultdict
class FixedWindowLimiter:
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window = window_seconds
self.counts = defaultdict(int)
self.windows = {}
def is_allowed(self, key: str) -> bool:
now = time.time()
current_window = int(now // self.window)
# Reset if new window
if self.windows.get(key) != current_window:
self.counts[key] = 0
self.windows[key] = current_window
if self.counts[key] >= self.limit:
return False
self.counts[key] += 1
return True
|
Problem: Burst at window boundaries. At 11:59:59, user sends 100 requests. At 12:00:01, they send 100 more. 200 requests in 2 seconds while technically “within limits.”
Sliding Window Log#
Track exact timestamp of each request. More accurate, more memory.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| class SlidingWindowLogLimiter:
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, key: str) -> bool:
now = time.time()
cutoff = now - self.window
# Remove old requests
self.requests[key] = [t for t in self.requests[key] if t > cutoff]
if len(self.requests[key]) >= self.limit:
return False
self.requests[key].append(now)
return True
|
Problem: Memory grows with request volume. Not practical at scale.
Sliding Window Counter#
Hybrid approach. Interpolate between current and previous window.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| class SlidingWindowCounterLimiter:
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window = window_seconds
self.current_counts = defaultdict(int)
self.previous_counts = defaultdict(int)
self.current_windows = {}
def is_allowed(self, key: str) -> bool:
now = time.time()
current_window = int(now // self.window)
window_position = (now % self.window) / self.window
# Rotate windows if needed
if self.current_windows.get(key, 0) < current_window:
self.previous_counts[key] = self.current_counts.get(key, 0)
self.current_counts[key] = 0
self.current_windows[key] = current_window
# Weighted count
weighted = (
self.previous_counts[key] * (1 - window_position) +
self.current_counts[key]
)
if weighted >= self.limit:
return False
self.current_counts[key] += 1
return True
|
Best balance of accuracy and memory. Used by most production systems.
Token Bucket#
Tokens accumulate over time. Each request consumes tokens. Allows bursting while maintaining average rate.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| class TokenBucketLimiter:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate # tokens per second
self.tokens = defaultdict(lambda: capacity)
self.last_refill = {}
def is_allowed(self, key: str, tokens_needed: int = 1) -> bool:
now = time.time()
# Refill tokens
if key in self.last_refill:
elapsed = now - self.last_refill[key]
self.tokens[key] = min(
self.capacity,
self.tokens[key] + elapsed * self.refill_rate
)
self.last_refill[key] = now
if self.tokens[key] >= tokens_needed:
self.tokens[key] -= tokens_needed
return True
return False
|
Use case: APIs where bursts are acceptable but sustained high load isn’t.
Leaky Bucket#
Requests queue up and process at fixed rate. Smooths traffic completely.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import asyncio
from collections import defaultdict
class LeakyBucketLimiter:
def __init__(self, rate: float, capacity: int):
self.rate = rate # requests per second
self.capacity = capacity
self.queues = defaultdict(asyncio.Queue)
async def process(self, key: str):
while True:
await self.queues[key].get()
# Process request here
await asyncio.sleep(1 / self.rate)
async def submit(self, key: str) -> bool:
if self.queues[key].qsize() >= self.capacity:
return False # Queue full, reject
await self.queues[key].put(True)
return True
|
Use case: When downstream services need consistent load (database writes, external APIs).
Distributed Rate Limiting#
Single-server rate limiting fails when you scale horizontally. Options:
Redis-Based#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import redis
import time
class RedisRateLimiter:
def __init__(self, redis_client: redis.Redis, limit: int, window: int):
self.redis = redis_client
self.limit = limit
self.window = window
def is_allowed(self, key: str) -> bool:
pipe = self.redis.pipeline()
now = time.time()
window_key = f"ratelimit:{key}:{int(now // self.window)}"
pipe.incr(window_key)
pipe.expire(window_key, self.window + 1)
results = pipe.execute()
count = results[0]
return count <= self.limit
|
Redis with Lua (Atomic)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| -- rate_limit.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
end
return 1
|
1
2
3
4
5
6
7
8
9
| def is_allowed_lua(self, key: str) -> bool:
result = self.redis.eval(
self.lua_script,
1,
f"ratelimit:{key}",
self.limit,
self.window
)
return result == 1
|
Sliding Window in Redis#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local clearBefore = now - window
redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore)
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, window)
return 1
end
return 0
|
Tell clients their limits. Standard headers:
1
2
3
4
5
| HTTP/1.1 200 OK
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 67
X-RateLimit-Reset: 1678704000
Retry-After: 30
|
Implementation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
key = get_client_key(request)
allowed, remaining, reset_at = limiter.check(key)
if not allowed:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"},
headers={
"X-RateLimit-Limit": str(limiter.limit),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(reset_at),
"Retry-After": str(reset_at - time.time())
}
)
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(limiter.limit)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(reset_at)
return response
|
Identifying Clients#
What key to rate limit on?
IP Address#
1
2
3
4
5
6
| def get_client_ip(request: Request) -> str:
# Check forwarded headers (behind proxy)
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host
|
Problem: NAT, shared IPs, VPNs. One corporate network looks like one client.
API Key#
1
2
3
4
5
| def get_client_key(request: Request) -> str:
api_key = request.headers.get("X-API-Key")
if api_key:
return f"apikey:{api_key}"
return f"ip:{get_client_ip(request)}"
|
Better: Authenticated users get their own limits. Anonymous falls back to IP.
User ID + Endpoint#
1
2
3
4
| def get_client_key(request: Request) -> str:
user_id = request.state.user_id or "anonymous"
endpoint = request.url.path
return f"{user_id}:{endpoint}"
|
Different limits per endpoint. Expensive operations get stricter limits.
Tiered Rate Limits#
Not all users are equal:
1
2
3
4
5
6
7
8
9
10
| RATE_LIMITS = {
"free": {"requests": 100, "window": 3600},
"basic": {"requests": 1000, "window": 3600},
"pro": {"requests": 10000, "window": 3600},
"enterprise": {"requests": 100000, "window": 3600},
}
def get_limit(user):
tier = user.subscription_tier
return RATE_LIMITS.get(tier, RATE_LIMITS["free"])
|
Endpoint-Specific Limits#
Some endpoints are expensive:
1
2
3
4
5
6
7
8
9
| ENDPOINT_LIMITS = {
"/api/search": {"limit": 10, "window": 60},
"/api/export": {"limit": 5, "window": 3600},
"/api/users": {"limit": 100, "window": 60},
"default": {"limit": 60, "window": 60},
}
def get_endpoint_limit(path: str):
return ENDPOINT_LIMITS.get(path, ENDPOINT_LIMITS["default"])
|
Graceful Degradation#
Instead of hard 429, consider:
Slow Down#
1
2
3
4
5
6
7
8
9
10
| async def rate_limit_with_delay(request):
usage = get_usage_ratio(request)
if usage > 0.8:
# Approaching limit, slow down
delay = (usage - 0.8) * 5 # Up to 1 second delay
await asyncio.sleep(delay)
if usage >= 1.0:
raise HTTPException(429)
|
Reduce Quality#
1
2
3
4
5
6
| def search(query: str, request: Request):
if is_rate_limited(request):
# Return cached/simplified results instead of 429
return get_cached_results(query)
return full_search(query)
|
Testing Rate Limits#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| import pytest
import time
def test_rate_limit_enforced():
limiter = FixedWindowLimiter(limit=5, window_seconds=60)
# First 5 should pass
for _ in range(5):
assert limiter.is_allowed("test-user")
# 6th should fail
assert not limiter.is_allowed("test-user")
def test_rate_limit_resets():
limiter = FixedWindowLimiter(limit=5, window_seconds=1)
for _ in range(5):
limiter.is_allowed("test-user")
assert not limiter.is_allowed("test-user")
time.sleep(1.1)
assert limiter.is_allowed("test-user")
|
Common Mistakes#
Rate Limiting After Auth#
1
2
3
4
5
6
7
8
9
10
11
12
13
| # Bad: Auth happens, then rate limit
@app.post("/login")
async def login(creds: Credentials):
user = authenticate(creds) # DB hit before rate limit
if not rate_limiter.check(user.id):
raise HTTPException(429)
# Good: Rate limit before expensive operations
@app.post("/login")
async def login(request: Request, creds: Credentials):
if not rate_limiter.check(get_client_ip(request)):
raise HTTPException(429)
user = authenticate(creds)
|
Forgetting Internal Services#
1
2
3
4
5
| def get_client_key(request: Request) -> str:
# Skip rate limiting for internal services
if request.headers.get("X-Internal-Service"):
return None
return get_client_ip(request)
|
No Monitoring#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| from prometheus_client import Counter
rate_limit_hits = Counter(
'rate_limit_hits_total',
'Number of rate limited requests',
['endpoint', 'tier']
)
def check_rate_limit(request):
if is_limited:
rate_limit_hits.labels(
endpoint=request.path,
tier=get_user_tier(request)
).inc()
|
Key Takeaways#
- Choose the right algorithm — Token bucket for burst tolerance, sliding window for accuracy
- Use Redis for distributed — Single-server limits break at scale
- Return proper headers — Clients need to know their limits
- Tier your limits — Paying customers deserve more
- Rate limit early — Before expensive operations, not after
- Monitor everything — Know when limits are being hit
Rate limiting is a balance. Too aggressive and you lose users. Too permissive and you lose your service. Start conservative, monitor, and adjust. 🌍