Integrating LLMs into production applications is deceptively simple. Call an API, get text back. But building reliable, cost-effective systems requires more thought. Here are patterns that work at scale.

The Basic Call

Every LLM integration starts here:

1
2
3
4
5
6
7
8
import openai

def complete(prompt: str) -> str:
    response = openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

This works for prototypes. Production needs more.

Retry with Exponential Backoff

LLM APIs have rate limits and occasional failures:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import openai

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError))
)
def complete_with_retry(prompt: str, model: str = "gpt-4") -> str:
    response = openai.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        timeout=30
    )
    return response.choices[0].message.content

Key details:

  • Retry on rate limits and timeouts, not on auth errors
  • Exponential backoff prevents thundering herd
  • Set explicit timeouts—don’t wait forever

Structured Output

Raw text is hard to parse. Use structured outputs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pydantic import BaseModel
from typing import List
import json

class ExtractedEntity(BaseModel):
    name: str
    type: str
    confidence: float

class ExtractionResult(BaseModel):
    entities: List[ExtractedEntity]
    summary: str

def extract_entities(text: str) -> ExtractionResult:
    response = openai.chat.completions.create(
        model="gpt-4-turbo",
        messages=[
            {"role": "system", "content": "Extract entities from text. Respond with valid JSON only."},
            {"role": "user", "content": text}
        ],
        response_format={"type": "json_object"}
    )
    
    data = json.loads(response.choices[0].message.content)
    return ExtractionResult(**data)

For Anthropic’s Claude, use tool calling:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import anthropic

def extract_with_claude(text: str) -> dict:
    client = anthropic.Anthropic()
    
    response = client.messages.create(
        model="claude-3-sonnet-20240229",
        max_tokens=1024,
        tools=[{
            "name": "extract_entities",
            "description": "Extract named entities from text",
            "input_schema": {
                "type": "object",
                "properties": {
                    "entities": {
                        "type": "array",
                        "items": {
                            "type": "object",
                            "properties": {
                                "name": {"type": "string"},
                                "type": {"type": "string"},
                                "confidence": {"type": "number"}
                            }
                        }
                    }
                },
                "required": ["entities"]
            }
        }],
        messages=[{"role": "user", "content": f"Extract entities from: {text}"}]
    )
    
    # Find the tool use block
    for block in response.content:
        if block.type == "tool_use":
            return block.input
    
    raise ValueError("No structured output returned")

Caching Responses

LLM calls are expensive. Cache when possible:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import hashlib
import json
from functools import lru_cache

def hash_request(model: str, messages: list, **kwargs) -> str:
    """Generate cache key from request parameters."""
    data = {"model": model, "messages": messages, **kwargs}
    return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()

# In-memory cache for development
@lru_cache(maxsize=1000)
def cached_complete(cache_key: str, model: str, messages_json: str) -> str:
    messages = json.loads(messages_json)
    response = openai.chat.completions.create(model=model, messages=messages)
    return response.choices[0].message.content

# Redis cache for production
def complete_with_cache(prompt: str, model: str = "gpt-4", ttl: int = 3600) -> str:
    messages = [{"role": "user", "content": prompt}]
    cache_key = hash_request(model, messages)
    
    cached = redis.get(f"llm:{cache_key}")
    if cached:
        return cached.decode()
    
    response = openai.chat.completions.create(model=model, messages=messages)
    result = response.choices[0].message.content
    
    redis.setex(f"llm:{cache_key}", ttl, result)
    return result

When to cache:

  • Deterministic prompts (temperature=0)
  • Reference data lookups
  • Repeated identical queries

When not to cache:

  • User-specific context
  • Time-sensitive information
  • Creative generation

Fallback Models

Don’t depend on a single provider:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class LLMClient:
    def __init__(self):
        self.providers = [
            ("openai", "gpt-4-turbo", self._call_openai),
            ("anthropic", "claude-3-sonnet", self._call_anthropic),
            ("openai", "gpt-3.5-turbo", self._call_openai),  # Cheaper fallback
        ]
    
    def complete(self, prompt: str) -> str:
        last_error = None
        
        for provider, model, call_fn in self.providers:
            try:
                return call_fn(prompt, model)
            except Exception as e:
                logger.warning(f"{provider}/{model} failed: {e}")
                last_error = e
                continue
        
        raise last_error
    
    def _call_openai(self, prompt: str, model: str) -> str:
        response = openai.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            timeout=30
        )
        return response.choices[0].message.content
    
    def _call_anthropic(self, prompt: str, model: str) -> str:
        response = anthropic.Anthropic().messages.create(
            model=model,
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.content[0].text

Cost Tracking

LLM costs add up. Track them:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from dataclasses import dataclass
from typing import Optional

# Pricing per 1M tokens (as of early 2024)
PRICING = {
    "gpt-4-turbo": {"input": 10.00, "output": 30.00},
    "gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
    "claude-3-sonnet": {"input": 3.00, "output": 15.00},
}

@dataclass
class UsageMetrics:
    model: str
    input_tokens: int
    output_tokens: int
    
    @property
    def cost(self) -> float:
        pricing = PRICING.get(self.model, {"input": 0, "output": 0})
        input_cost = (self.input_tokens / 1_000_000) * pricing["input"]
        output_cost = (self.output_tokens / 1_000_000) * pricing["output"]
        return input_cost + output_cost

def complete_with_tracking(prompt: str, model: str = "gpt-4-turbo") -> tuple[str, UsageMetrics]:
    response = openai.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    
    metrics = UsageMetrics(
        model=model,
        input_tokens=response.usage.prompt_tokens,
        output_tokens=response.usage.completion_tokens
    )
    
    # Log to your metrics system
    statsd.gauge("llm.cost", metrics.cost, tags=[f"model:{model}"])
    statsd.incr("llm.tokens.input", metrics.input_tokens)
    statsd.incr("llm.tokens.output", metrics.output_tokens)
    
    return response.choices[0].message.content, metrics

Streaming for Long Responses

Don’t make users wait for long generations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
async def stream_response(prompt: str):
    """Stream tokens as they're generated."""
    stream = openai.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

# FastAPI endpoint
@app.get("/generate")
async def generate(prompt: str):
    return StreamingResponse(
        stream_response(prompt),
        media_type="text/event-stream"
    )

Quick Checklist

Before going to production:

  • Retry logic with exponential backoff
  • Explicit timeouts on all calls
  • Fallback to alternative models/providers
  • Response caching where appropriate
  • Cost tracking and alerting
  • Rate limit handling
  • Structured output validation
  • Streaming for user-facing responses
  • Logging for debugging (without logging prompts containing PII)

LLM APIs are powerful but unreliable and expensive. Treat them like any other external dependency: wrap them in retries, cache aggressively, monitor costs, and always have a fallback. The patterns aren’t new—they’re just applied to a new domain.