Rate limiting is the bouncer at your API’s door. Too strict and legitimate users bounce. Too loose and bad actors overwhelm your service. Here’s how to get it right.

Why Rate Limit?

Without rate limiting:

  • One misbehaving client can DOS your entire service
  • Costs spiral when someone scrapes your API
  • Bugs in client code create accidental amplification
  • You have no defense against credential stuffing

Rate limiting provides fairness, stability, and cost control.

The Basic Algorithms

Fixed Window

Count requests in fixed time windows (e.g., 100 requests per minute).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from collections import defaultdict

class FixedWindowLimiter:
    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window = window_seconds
        self.counts = defaultdict(int)
        self.windows = {}
    
    def is_allowed(self, key: str) -> bool:
        now = time.time()
        current_window = int(now // self.window)
        
        # Reset if new window
        if self.windows.get(key) != current_window:
            self.counts[key] = 0
            self.windows[key] = current_window
        
        if self.counts[key] >= self.limit:
            return False
        
        self.counts[key] += 1
        return True

Problem: Burst at window boundaries. At 11:59:59, user sends 100 requests. At 12:00:01, they send 100 more. 200 requests in 2 seconds while technically “within limits.”

Sliding Window Log

Track exact timestamp of each request. More accurate, more memory.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class SlidingWindowLogLimiter:
    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window = window_seconds
        self.requests = defaultdict(list)
    
    def is_allowed(self, key: str) -> bool:
        now = time.time()
        cutoff = now - self.window
        
        # Remove old requests
        self.requests[key] = [t for t in self.requests[key] if t > cutoff]
        
        if len(self.requests[key]) >= self.limit:
            return False
        
        self.requests[key].append(now)
        return True

Problem: Memory grows with request volume. Not practical at scale.

Sliding Window Counter

Hybrid approach. Interpolate between current and previous window.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class SlidingWindowCounterLimiter:
    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window = window_seconds
        self.current_counts = defaultdict(int)
        self.previous_counts = defaultdict(int)
        self.current_windows = {}
    
    def is_allowed(self, key: str) -> bool:
        now = time.time()
        current_window = int(now // self.window)
        window_position = (now % self.window) / self.window
        
        # Rotate windows if needed
        if self.current_windows.get(key, 0) < current_window:
            self.previous_counts[key] = self.current_counts.get(key, 0)
            self.current_counts[key] = 0
            self.current_windows[key] = current_window
        
        # Weighted count
        weighted = (
            self.previous_counts[key] * (1 - window_position) +
            self.current_counts[key]
        )
        
        if weighted >= self.limit:
            return False
        
        self.current_counts[key] += 1
        return True

Best balance of accuracy and memory. Used by most production systems.

Token Bucket

Tokens accumulate over time. Each request consumes tokens. Allows bursting while maintaining average rate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TokenBucketLimiter:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate  # tokens per second
        self.tokens = defaultdict(lambda: capacity)
        self.last_refill = {}
    
    def is_allowed(self, key: str, tokens_needed: int = 1) -> bool:
        now = time.time()
        
        # Refill tokens
        if key in self.last_refill:
            elapsed = now - self.last_refill[key]
            self.tokens[key] = min(
                self.capacity,
                self.tokens[key] + elapsed * self.refill_rate
            )
        
        self.last_refill[key] = now
        
        if self.tokens[key] >= tokens_needed:
            self.tokens[key] -= tokens_needed
            return True
        
        return False

Use case: APIs where bursts are acceptable but sustained high load isn’t.

Leaky Bucket

Requests queue up and process at fixed rate. Smooths traffic completely.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
from collections import defaultdict

class LeakyBucketLimiter:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # requests per second
        self.capacity = capacity
        self.queues = defaultdict(asyncio.Queue)
    
    async def process(self, key: str):
        while True:
            await self.queues[key].get()
            # Process request here
            await asyncio.sleep(1 / self.rate)
    
    async def submit(self, key: str) -> bool:
        if self.queues[key].qsize() >= self.capacity:
            return False  # Queue full, reject
        
        await self.queues[key].put(True)
        return True

Use case: When downstream services need consistent load (database writes, external APIs).

Distributed Rate Limiting

Single-server rate limiting fails when you scale horizontally. Options:

Redis-Based

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import redis
import time

class RedisRateLimiter:
    def __init__(self, redis_client: redis.Redis, limit: int, window: int):
        self.redis = redis_client
        self.limit = limit
        self.window = window
    
    def is_allowed(self, key: str) -> bool:
        pipe = self.redis.pipeline()
        now = time.time()
        window_key = f"ratelimit:{key}:{int(now // self.window)}"
        
        pipe.incr(window_key)
        pipe.expire(window_key, self.window + 1)
        
        results = pipe.execute()
        count = results[0]
        
        return count <= self.limit

Redis with Lua (Atomic)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
-- rate_limit.lua
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = redis.call('INCR', key)
if current == 1 then
    redis.call('EXPIRE', key, window)
end

if current > limit then
    return 0
end
return 1
1
2
3
4
5
6
7
8
9
def is_allowed_lua(self, key: str) -> bool:
    result = self.redis.eval(
        self.lua_script,
        1,
        f"ratelimit:{key}",
        self.limit,
        self.window
    )
    return result == 1

Sliding Window in Redis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])

local clearBefore = now - window
redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore)

local count = redis.call('ZCARD', key)
if count < limit then
    redis.call('ZADD', key, now, now .. '-' .. math.random())
    redis.call('EXPIRE', key, window)
    return 1
end
return 0

Rate Limit Headers

Tell clients their limits. Standard headers:

1
2
3
4
5
HTTP/1.1 200 OK
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 67
X-RateLimit-Reset: 1678704000
Retry-After: 30

Implementation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    key = get_client_key(request)
    
    allowed, remaining, reset_at = limiter.check(key)
    
    if not allowed:
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded"},
            headers={
                "X-RateLimit-Limit": str(limiter.limit),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(reset_at),
                "Retry-After": str(reset_at - time.time())
            }
        )
    
    response = await call_next(request)
    response.headers["X-RateLimit-Limit"] = str(limiter.limit)
    response.headers["X-RateLimit-Remaining"] = str(remaining)
    response.headers["X-RateLimit-Reset"] = str(reset_at)
    
    return response

Identifying Clients

What key to rate limit on?

IP Address

1
2
3
4
5
6
def get_client_ip(request: Request) -> str:
    # Check forwarded headers (behind proxy)
    forwarded = request.headers.get("X-Forwarded-For")
    if forwarded:
        return forwarded.split(",")[0].strip()
    return request.client.host

Problem: NAT, shared IPs, VPNs. One corporate network looks like one client.

API Key

1
2
3
4
5
def get_client_key(request: Request) -> str:
    api_key = request.headers.get("X-API-Key")
    if api_key:
        return f"apikey:{api_key}"
    return f"ip:{get_client_ip(request)}"

Better: Authenticated users get their own limits. Anonymous falls back to IP.

User ID + Endpoint

1
2
3
4
def get_client_key(request: Request) -> str:
    user_id = request.state.user_id or "anonymous"
    endpoint = request.url.path
    return f"{user_id}:{endpoint}"

Different limits per endpoint. Expensive operations get stricter limits.

Tiered Rate Limits

Not all users are equal:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
RATE_LIMITS = {
    "free": {"requests": 100, "window": 3600},
    "basic": {"requests": 1000, "window": 3600},
    "pro": {"requests": 10000, "window": 3600},
    "enterprise": {"requests": 100000, "window": 3600},
}

def get_limit(user):
    tier = user.subscription_tier
    return RATE_LIMITS.get(tier, RATE_LIMITS["free"])

Endpoint-Specific Limits

Some endpoints are expensive:

1
2
3
4
5
6
7
8
9
ENDPOINT_LIMITS = {
    "/api/search": {"limit": 10, "window": 60},
    "/api/export": {"limit": 5, "window": 3600},
    "/api/users": {"limit": 100, "window": 60},
    "default": {"limit": 60, "window": 60},
}

def get_endpoint_limit(path: str):
    return ENDPOINT_LIMITS.get(path, ENDPOINT_LIMITS["default"])

Graceful Degradation

Instead of hard 429, consider:

Slow Down

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async def rate_limit_with_delay(request):
    usage = get_usage_ratio(request)
    
    if usage > 0.8:
        # Approaching limit, slow down
        delay = (usage - 0.8) * 5  # Up to 1 second delay
        await asyncio.sleep(delay)
    
    if usage >= 1.0:
        raise HTTPException(429)

Reduce Quality

1
2
3
4
5
6
def search(query: str, request: Request):
    if is_rate_limited(request):
        # Return cached/simplified results instead of 429
        return get_cached_results(query)
    
    return full_search(query)

Testing Rate Limits

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pytest
import time

def test_rate_limit_enforced():
    limiter = FixedWindowLimiter(limit=5, window_seconds=60)
    
    # First 5 should pass
    for _ in range(5):
        assert limiter.is_allowed("test-user")
    
    # 6th should fail
    assert not limiter.is_allowed("test-user")

def test_rate_limit_resets():
    limiter = FixedWindowLimiter(limit=5, window_seconds=1)
    
    for _ in range(5):
        limiter.is_allowed("test-user")
    
    assert not limiter.is_allowed("test-user")
    
    time.sleep(1.1)
    
    assert limiter.is_allowed("test-user")

Common Mistakes

Rate Limiting After Auth

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Bad: Auth happens, then rate limit
@app.post("/login")
async def login(creds: Credentials):
    user = authenticate(creds)  # DB hit before rate limit
    if not rate_limiter.check(user.id):
        raise HTTPException(429)

# Good: Rate limit before expensive operations
@app.post("/login")
async def login(request: Request, creds: Credentials):
    if not rate_limiter.check(get_client_ip(request)):
        raise HTTPException(429)
    user = authenticate(creds)

Forgetting Internal Services

1
2
3
4
5
def get_client_key(request: Request) -> str:
    # Skip rate limiting for internal services
    if request.headers.get("X-Internal-Service"):
        return None
    return get_client_ip(request)

No Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from prometheus_client import Counter

rate_limit_hits = Counter(
    'rate_limit_hits_total',
    'Number of rate limited requests',
    ['endpoint', 'tier']
)

def check_rate_limit(request):
    if is_limited:
        rate_limit_hits.labels(
            endpoint=request.path,
            tier=get_user_tier(request)
        ).inc()

Key Takeaways

  1. Choose the right algorithm — Token bucket for burst tolerance, sliding window for accuracy
  2. Use Redis for distributed — Single-server limits break at scale
  3. Return proper headers — Clients need to know their limits
  4. Tier your limits — Paying customers deserve more
  5. Rate limit early — Before expensive operations, not after
  6. Monitor everything — Know when limits are being hit

Rate limiting is a balance. Too aggressive and you lose users. Too permissive and you lose your service. Start conservative, monitor, and adjust. 🌍