Rate limiting is the immune system of your API. Without it, a single misbehaving client can take down your service for everyone. With poorly designed limits, you’ll frustrate legitimate users while sophisticated attackers route around you.

The goal isn’t just protection—it’s fairness. Every user gets a reasonable share of your capacity.

The Basic Algorithms

Fixed Window

The simplest approach: count requests per time window, reject when over limit.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import time
import redis

def is_rate_limited(user_id: str, limit: int = 100, window: int = 60) -> bool:
    """Fixed window: 100 requests per minute."""
    r = redis.Redis()
    
    # Window key based on current minute
    window_key = f"ratelimit:{user_id}:{int(time.time() // window)}"
    
    current = r.incr(window_key)
    if current == 1:
        r.expire(window_key, window)
    
    return current > limit

Problem: Burst at window boundaries. A user can make 100 requests at 0:59 and 100 more at 1:00—200 requests in 2 seconds while technically staying under “100/minute.”

Sliding Window Log

Track exact timestamps of all requests:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def is_rate_limited_sliding(user_id: str, limit: int = 100, window: int = 60) -> bool:
    """Sliding window log: 100 requests in any 60-second period."""
    r = redis.Redis()
    key = f"ratelimit:{user_id}"
    now = time.time()
    
    # Remove old entries
    r.zremrangebyscore(key, 0, now - window)
    
    # Count current entries
    count = r.zcard(key)
    
    if count >= limit:
        return True
    
    # Add current request
    r.zadd(key, {str(now): now})
    r.expire(key, window)
    
    return False

Accurate but expensive: O(n) memory per user, cleanup overhead. Fine for low-volume APIs, problematic at scale.

Sliding Window Counter

Hybrid approach—approximate sliding window using two fixed windows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def is_rate_limited_sliding_counter(user_id: str, limit: int = 100, window: int = 60) -> bool:
    """Sliding window counter: approximates sliding window with O(1) space."""
    r = redis.Redis()
    now = time.time()
    
    current_window = int(now // window)
    previous_window = current_window - 1
    window_progress = (now % window) / window  # 0.0 to 1.0
    
    current_key = f"ratelimit:{user_id}:{current_window}"
    previous_key = f"ratelimit:{user_id}:{previous_window}"
    
    current_count = int(r.get(current_key) or 0)
    previous_count = int(r.get(previous_key) or 0)
    
    # Weight previous window by remaining time
    weighted_count = current_count + (previous_count * (1 - window_progress))
    
    if weighted_count >= limit:
        return True
    
    r.incr(current_key)
    r.expire(current_key, window * 2)
    
    return False

Best of both worlds: O(1) space, smooth rate limiting, good enough accuracy for most use cases.

Token Bucket

Allow bursts while maintaining average rate:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        """
        capacity: Maximum tokens (burst size)
        refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
    
    def is_allowed(self, user_id: str, tokens: int = 1) -> bool:
        r = redis.Redis()
        key = f"bucket:{user_id}"
        now = time.time()
        
        # Get current state
        data = r.hgetall(key)
        
        if not data:
            # Initialize bucket
            current_tokens = self.capacity
            last_refill = now
        else:
            current_tokens = float(data[b'tokens'])
            last_refill = float(data[b'last_refill'])
            
            # Add tokens based on elapsed time
            elapsed = now - last_refill
            current_tokens = min(
                self.capacity,
                current_tokens + (elapsed * self.refill_rate)
            )
        
        if current_tokens >= tokens:
            # Consume tokens
            r.hset(key, mapping={
                'tokens': current_tokens - tokens,
                'last_refill': now
            })
            r.expire(key, int(self.capacity / self.refill_rate) + 60)
            return True
        
        return False

# Example: 100 tokens capacity, refills at 10/second
# Allows burst of 100, then sustained 10/second
bucket = TokenBucket(capacity=100, refill_rate=10)

Perfect for: APIs where occasional bursts are acceptable but average rate must be controlled.

Multi-Level Limits

Real APIs need multiple limit tiers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
RATE_LIMITS = {
    'per_second': {'limit': 10, 'window': 1},
    'per_minute': {'limit': 100, 'window': 60},
    'per_hour': {'limit': 1000, 'window': 3600},
    'per_day': {'limit': 10000, 'window': 86400},
}

def check_all_limits(user_id: str) -> tuple[bool, str | None]:
    """Check all rate limit tiers. Returns (allowed, violated_tier)."""
    for tier_name, config in RATE_LIMITS.items():
        if is_rate_limited_sliding_counter(
            f"{user_id}:{tier_name}",
            config['limit'],
            config['window']
        ):
            return False, tier_name
    return True, None

This prevents both burst abuse (per-second) and sustained abuse (per-day) while allowing reasonable usage patterns.

Differentiated Limits

Not all users deserve the same limits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
TIER_LIMITS = {
    'free': {'requests_per_hour': 100, 'requests_per_day': 1000},
    'basic': {'requests_per_hour': 1000, 'requests_per_day': 10000},
    'pro': {'requests_per_hour': 10000, 'requests_per_day': 100000},
    'enterprise': {'requests_per_hour': 100000, 'requests_per_day': None},  # Unlimited daily
}

async def get_rate_limit_config(user_id: str) -> dict:
    user = await get_user(user_id)
    return TIER_LIMITS.get(user.tier, TIER_LIMITS['free'])

Cost-Based Limits

Some endpoints cost more than others:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
ENDPOINT_COSTS = {
    '/api/v1/search': 10,        # Expensive database query
    '/api/v1/export': 100,       # Heavy processing
    '/api/v1/users/{id}': 1,     # Simple lookup
    'default': 1,
}

def check_rate_limit_with_cost(user_id: str, endpoint: str) -> bool:
    cost = ENDPOINT_COSTS.get(endpoint, ENDPOINT_COSTS['default'])
    return bucket.is_allowed(user_id, tokens=cost)

A user with 100 tokens/minute can make 100 simple lookups OR 10 searches OR 1 export.

The Response Headers

Always tell clients their limit status:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from fastapi import Request, Response

async def rate_limit_middleware(request: Request, call_next):
    user_id = get_user_id(request)
    config = await get_rate_limit_config(user_id)
    
    allowed, remaining, reset_at = check_limit_with_info(user_id, config)
    
    if not allowed:
        return Response(
            content='{"error": "Rate limit exceeded"}',
            status_code=429,
            headers={
                'X-RateLimit-Limit': str(config['requests_per_hour']),
                'X-RateLimit-Remaining': '0',
                'X-RateLimit-Reset': str(reset_at),
                'Retry-After': str(reset_at - int(time.time())),
            }
        )
    
    response = await call_next(request)
    
    response.headers['X-RateLimit-Limit'] = str(config['requests_per_hour'])
    response.headers['X-RateLimit-Remaining'] = str(remaining)
    response.headers['X-RateLimit-Reset'] = str(reset_at)
    
    return response

Good clients will respect Retry-After. Bad clients will hammer anyway—but at least you’ve given them the information.

Graceful Degradation

Don’t just reject—degrade gracefully:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
async def search_endpoint(query: str, user_id: str):
    limits = await get_rate_limit_config(user_id)
    remaining = get_remaining_tokens(user_id)
    
    if remaining <= 0:
        raise HTTPException(429, "Rate limit exceeded")
    
    if remaining < 10:
        # Low on quota: return cached/simplified results
        return await get_cached_search_results(query)
    
    if remaining < 50:
        # Getting low: limit result count
        return await search(query, max_results=10)
    
    # Full quota: full results
    return await search(query, max_results=100)

Users get degraded service instead of errors. Better UX, same protection.

Distributed Rate Limiting

Single Redis works until it doesn’t. Options:

Redis Cluster

1
2
3
4
from redis.cluster import RedisCluster

rc = RedisCluster(host='redis-cluster', port=6379)
# Same operations, automatic sharding by key

Local + Global Limits

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import threading

class HybridRateLimiter:
    def __init__(self, local_limit: int, global_limit: int):
        self.local_limit = local_limit
        self.global_limit = global_limit
        self.local_counts = {}
        self.lock = threading.Lock()
    
    def is_allowed(self, user_id: str) -> bool:
        # Fast local check (no network)
        with self.lock:
            local_count = self.local_counts.get(user_id, 0)
            if local_count >= self.local_limit:
                # Check global (network call)
                if self.check_global(user_id):
                    return False
            self.local_counts[user_id] = local_count + 1
        return True

Local limits catch obvious abuse without network calls. Global limits ensure consistency across instances.

The Checklist

Before shipping rate limiting:

  • Algorithm chosen (sliding window counter for most cases)
  • Multiple tiers (second/minute/hour/day)
  • Cost-based for expensive endpoints
  • Differentiated by user tier
  • Response headers (Limit, Remaining, Reset, Retry-After)
  • Graceful degradation (not just 429)
  • Monitoring (limit hits, by user, by endpoint)
  • Bypass for internal services
  • Documentation for API consumers

Rate limiting is a feature, not just protection. Done well, it makes your API predictable, fair, and sustainable.