Every API needs rate limiting. Without it, a single misbehaving client can overwhelm your service, intentional attacks become trivial, and cost management becomes impossible. But implement it poorly, and you’ll frustrate legitimate users while barely slowing down bad actors.

Let’s explore rate limiting patterns that actually work.

The Fundamentals: Why Rate Limit?

Rate limiting serves multiple purposes:

  • Protection: Prevent abuse, DDoS attacks, and runaway scripts
  • Fairness: Ensure one client can’t monopolize resources
  • Cost control: Limit expensive operations (API calls, LLM tokens, etc.)
  • Stability: Maintain consistent performance under load

Algorithm 1: Token Bucket

The token bucket is the most flexible approach. Imagine a bucket that fills with tokens at a steady rate. Each request consumes a token. If the bucket is empty, the request is denied.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import time
import redis

class TokenBucket:
    def __init__(self, redis_client, key, capacity, refill_rate):
        """
        capacity: max tokens in bucket
        refill_rate: tokens added per second
        """
        self.redis = redis_client
        self.key = f"ratelimit:{key}"
        self.capacity = capacity
        self.refill_rate = refill_rate
    
    def allow_request(self, tokens=1):
        now = time.time()
        
        # Lua script for atomic operation
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local requested = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])
        
        local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
        local tokens = tonumber(bucket[1]) or capacity
        local last_update = tonumber(bucket[2]) or now
        
        -- Refill tokens based on elapsed time
        local elapsed = now - last_update
        tokens = math.min(capacity, tokens + (elapsed * refill_rate))
        
        local allowed = tokens >= requested
        if allowed then
            tokens = tokens - requested
        end
        
        redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
        redis.call('EXPIRE', key, 3600)
        
        return {allowed and 1 or 0, tokens}
        """
        
        result = self.redis.eval(
            lua_script, 1, self.key,
            self.capacity, self.refill_rate, tokens, now
        )
        
        return {
            'allowed': bool(result[0]),
            'remaining': int(result[1])
        }

Why token bucket works: It allows bursts (up to bucket capacity) while enforcing an average rate. A user can make 10 quick requests if they’ve been idle, but can’t sustain more than the refill rate.

Algorithm 2: Sliding Window Log

Track exact timestamps of requests for precise limiting:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class SlidingWindowLog:
    def __init__(self, redis_client, key, limit, window_seconds):
        self.redis = redis_client
        self.key = f"ratelimit:swl:{key}"
        self.limit = limit
        self.window = window_seconds
    
    def allow_request(self):
        now = time.time()
        window_start = now - self.window
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(self.key, 0, window_start)
        
        # Count current entries
        pipe.zcard(self.key)
        
        # Add new entry (if we'll allow it)
        pipe.zadd(self.key, {f"{now}": now})
        
        # Set expiry
        pipe.expire(self.key, self.window + 1)
        
        results = pipe.execute()
        current_count = results[1]
        
        if current_count >= self.limit:
            # Remove the entry we speculatively added
            self.redis.zrem(self.key, f"{now}")
            return {'allowed': False, 'remaining': 0}
        
        return {
            'allowed': True,
            'remaining': self.limit - current_count - 1
        }

Tradeoff: More memory (stores every timestamp) but perfectly accurate. Good for strict limits where precision matters.

Algorithm 3: Sliding Window Counter

A hybrid approach — less memory than the log, more accurate than fixed windows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class SlidingWindowCounter:
    def __init__(self, redis_client, key, limit, window_seconds):
        self.redis = redis_client
        self.key_prefix = f"ratelimit:swc:{key}"
        self.limit = limit
        self.window = window_seconds
    
    def allow_request(self):
        now = time.time()
        current_window = int(now // self.window)
        previous_window = current_window - 1
        
        # How far into current window (0.0 to 1.0)
        window_progress = (now % self.window) / self.window
        
        current_key = f"{self.key_prefix}:{current_window}"
        previous_key = f"{self.key_prefix}:{previous_window}"
        
        # Get counts from both windows
        pipe = self.redis.pipeline()
        pipe.get(current_key)
        pipe.get(previous_key)
        results = pipe.execute()
        
        current_count = int(results[0] or 0)
        previous_count = int(results[1] or 0)
        
        # Weighted count: full current + proportional previous
        weighted_count = current_count + (previous_count * (1 - window_progress))
        
        if weighted_count >= self.limit:
            return {'allowed': False, 'remaining': 0}
        
        # Increment current window
        pipe = self.redis.pipeline()
        pipe.incr(current_key)
        pipe.expire(current_key, self.window * 2)
        pipe.execute()
        
        return {
            'allowed': True,
            'remaining': int(self.limit - weighted_count - 1)
        }

Why it works: By blending the current and previous window counts, you get smooth rate limiting without storing individual timestamps.

Distributed Rate Limiting

For multi-server deployments, centralize your rate limiting:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()
redis_client = redis.Redis(host='redis', port=6379)

# Different limits for different tiers
RATE_LIMITS = {
    'free': {'requests': 100, 'window': 3600},      # 100/hour
    'pro': {'requests': 1000, 'window': 3600},      # 1000/hour
    'enterprise': {'requests': 10000, 'window': 3600}
}

def get_rate_limiter(user_id: str, tier: str):
    config = RATE_LIMITS.get(tier, RATE_LIMITS['free'])
    return SlidingWindowCounter(
        redis_client,
        f"user:{user_id}",
        config['requests'],
        config['window']
    )

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Extract user info (from JWT, API key, etc.)
    user_id = request.headers.get('X-User-ID', request.client.host)
    tier = request.headers.get('X-User-Tier', 'free')
    
    limiter = get_rate_limiter(user_id, tier)
    result = limiter.allow_request()
    
    if not result['allowed']:
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded"},
            headers={
                "X-RateLimit-Remaining": "0",
                "Retry-After": "60"
            }
        )
    
    response = await call_next(request)
    response.headers["X-RateLimit-Remaining"] = str(result['remaining'])
    return response

Best Practices

1. Return Useful Headers

Always tell clients their rate limit status:

1
2
3
4
5
6
headers = {
    "X-RateLimit-Limit": str(limit),
    "X-RateLimit-Remaining": str(remaining),
    "X-RateLimit-Reset": str(reset_timestamp),
    "Retry-After": str(retry_seconds)  # On 429 responses
}

2. Use Multiple Limits

Layer different limits for different concerns:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Per-second burst protection
second_limiter = TokenBucket(redis, f"sec:{user_id}", capacity=10, refill_rate=5)

# Per-minute sustained limit  
minute_limiter = SlidingWindowCounter(redis, f"min:{user_id}", 100, 60)

# Per-day quota
day_limiter = SlidingWindowCounter(redis, f"day:{user_id}", 10000, 86400)

def check_all_limits():
    for limiter in [second_limiter, minute_limiter, day_limiter]:
        if not limiter.allow_request()['allowed']:
            return False
    return True

3. Differentiate by Endpoint

Some endpoints cost more than others:

1
2
3
4
5
6
7
8
ENDPOINT_COSTS = {
    '/api/search': 1,
    '/api/generate': 10,      # LLM calls are expensive
    '/api/bulk-export': 100,  # Heavy operations
}

cost = ENDPOINT_COSTS.get(request.path, 1)
result = limiter.allow_request(tokens=cost)

4. Graceful Degradation

When approaching limits, degrade gracefully instead of hard-cutting:

1
2
3
4
5
6
7
def get_response_quality(remaining_tokens):
    if remaining_tokens > 50:
        return 'full'      # Full response
    elif remaining_tokens > 20:
        return 'reduced'   # Smaller page sizes, less detail
    else:
        return 'minimal'   # Bare minimum response

When Rate Limiting Isn’t Enough

Rate limiting is one layer. For comprehensive API protection, combine it with:

  • Authentication: Know who’s making requests
  • Request validation: Reject malformed input early
  • Timeouts: Don’t let slow requests pile up
  • Circuit breakers: Fail fast when downstream services struggle
  • WAF rules: Block obvious attack patterns at the edge

Rate limiting done right protects your service while respecting your users. Start simple with a token bucket, add complexity as needed, and always give clients the information they need to behave well.