Integrating Large Language Model APIs into production applications requires more than just calling an endpoint. Here are battle-tested patterns for building resilient, cost-effective LLM integrations.

The Retry Cascade

LLM APIs are notorious for rate limits and transient failures. A simple exponential backoff isn’t enough — you need a cascade strategy:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
from dataclasses import dataclass
from typing import Optional

@dataclass
class LLMResponse:
    content: str
    model: str
    tokens_used: int

class LLMCascade:
    def __init__(self):
        self.providers = [
            ("anthropic", "claude-sonnet-4-20250514", 3),
            ("openai", "gpt-4o", 2),
            ("anthropic", "claude-3-haiku-20240307", 5),
        ]
    
    async def complete(self, prompt: str) -> Optional[LLMResponse]:
        for provider, model, max_retries in self.providers:
            for attempt in range(max_retries):
                try:
                    return await self._call_provider(provider, model, prompt)
                except RateLimitError:
                    await asyncio.sleep(2 ** attempt)
                except ProviderError:
                    break  # Try next provider
        return None

The cascade falls through primary to fallback models, attempting retries at each level before moving on.

Streaming with Backpressure

When streaming responses, clients can’t always keep up. Implement backpressure to prevent memory blowouts:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
async function* streamWithBackpressure(response, maxBuffer = 1000) {
  const buffer = [];
  let paused = false;
  
  for await (const chunk of response) {
    buffer.push(chunk);
    
    if (buffer.length > maxBuffer && !paused) {
      paused = true;
      console.warn('Backpressure: slowing stream consumption');
    }
    
    while (buffer.length > 0) {
      yield buffer.shift();
      if (paused && buffer.length < maxBuffer / 2) {
        paused = false;
      }
    }
  }
}

Token Budget Management

Runaway costs are a real concern. Implement hard limits at multiple levels:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class TokenBudget:
    def __init__(self, daily_limit: int, per_request_limit: int):
        self.daily_limit = daily_limit
        self.per_request_limit = per_request_limit
        self._daily_used = 0
        self._reset_date = date.today()
    
    def check_budget(self, estimated_tokens: int) -> bool:
        self._maybe_reset()
        
        if estimated_tokens > self.per_request_limit:
            raise TokenBudgetExceeded(f"Request exceeds per-request limit")
        
        if self._daily_used + estimated_tokens > self.daily_limit:
            raise TokenBudgetExceeded(f"Would exceed daily budget")
        
        return True
    
    def record_usage(self, actual_tokens: int):
        self._daily_used += actual_tokens

Prompt Caching

Many LLM calls repeat similar context. Cache at the prompt level:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import hashlib
from functools import lru_cache

class PromptCache:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl = 3600  # 1 hour
    
    def cache_key(self, prompt: str, model: str) -> str:
        content = f"{model}:{prompt}"
        return f"llm:cache:{hashlib.sha256(content.encode()).hexdigest()[:16]}"
    
    async def get_or_call(self, prompt: str, model: str, call_fn):
        key = self.cache_key(prompt, model)
        
        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        
        result = await call_fn(prompt, model)
        await self.redis.setex(key, self.ttl, json.dumps(result))
        return result

For deterministic queries (classification, extraction), aggressive caching can cut costs dramatically.

Structured Output Validation

LLMs can return malformed JSON. Always validate and retry:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pydantic import BaseModel, ValidationError
from typing import TypeVar, Type

T = TypeVar('T', bound=BaseModel)

async def get_structured_output(
    prompt: str, 
    schema: Type[T], 
    max_retries: int = 3
) -> T:
    for attempt in range(max_retries):
        response = await llm.complete(prompt + f"\n\nRespond with valid JSON matching: {schema.schema_json()}")
        
        try:
            # Extract JSON from response
            json_match = re.search(r'\{.*\}', response, re.DOTALL)
            if json_match:
                return schema.parse_raw(json_match.group())
        except (ValidationError, json.JSONDecodeError) as e:
            if attempt == max_retries - 1:
                raise
            prompt += f"\n\nPrevious attempt had error: {e}. Please fix."
    
    raise ValueError("Failed to get valid structured output")

Circuit Breaker Pattern

Prevent cascade failures when an LLM provider is down:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class LLMCircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = timedelta(seconds=recovery_timeout)
        self.failures = 0
        self.last_failure = None
        self.state = CircuitState.CLOSED
    
    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        return True  # HALF_OPEN allows one attempt
    
    def record_success(self):
        self.failures = 0
        self.state = CircuitState.CLOSED
    
    def record_failure(self):
        self.failures += 1
        self.last_failure = datetime.now()
        if self.failures >= self.failure_threshold:
            self.state = CircuitState.OPEN

Observability

Log everything that matters for debugging and cost tracking:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import structlog

logger = structlog.get_logger()

async def observed_completion(prompt: str, **kwargs):
    start = time.time()
    request_id = str(uuid.uuid4())[:8]
    
    logger.info("llm_request_start", 
        request_id=request_id,
        prompt_tokens=estimate_tokens(prompt),
        model=kwargs.get('model'))
    
    try:
        response = await llm.complete(prompt, **kwargs)
        
        logger.info("llm_request_complete",
            request_id=request_id,
            duration_ms=(time.time() - start) * 1000,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            cost_usd=calculate_cost(response))
        
        return response
    except Exception as e:
        logger.error("llm_request_failed",
            request_id=request_id,
            error=str(e),
            duration_ms=(time.time() - start) * 1000)
        raise

Conclusion

Robust LLM integrations require:

  1. Graceful degradation — cascade through providers and models
  2. Cost controls — budget limits at multiple levels
  3. Caching — reduce redundant API calls
  4. Validation — never trust raw LLM output
  5. Circuit breakers — fail fast when providers are down
  6. Observability — log tokens, latency, and costs

These patterns transform brittle API calls into production-ready systems. Start with the basics (retry + logging), then add sophistication as your usage scales.