Integrating Large Language Model APIs into production applications requires more than just calling an endpoint. Here are battle-tested patterns for building resilient, cost-effective LLM integrations.
The Retry Cascade#
LLM APIs are notorious for rate limits and transient failures. A simple exponential backoff isn’t enough — you need a cascade strategy:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class LLMResponse:
content: str
model: str
tokens_used: int
class LLMCascade:
def __init__(self):
self.providers = [
("anthropic", "claude-sonnet-4-20250514", 3),
("openai", "gpt-4o", 2),
("anthropic", "claude-3-haiku-20240307", 5),
]
async def complete(self, prompt: str) -> Optional[LLMResponse]:
for provider, model, max_retries in self.providers:
for attempt in range(max_retries):
try:
return await self._call_provider(provider, model, prompt)
except RateLimitError:
await asyncio.sleep(2 ** attempt)
except ProviderError:
break # Try next provider
return None
|
The cascade falls through primary to fallback models, attempting retries at each level before moving on.
Streaming with Backpressure#
When streaming responses, clients can’t always keep up. Implement backpressure to prevent memory blowouts:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| async function* streamWithBackpressure(response, maxBuffer = 1000) {
const buffer = [];
let paused = false;
for await (const chunk of response) {
buffer.push(chunk);
if (buffer.length > maxBuffer && !paused) {
paused = true;
console.warn('Backpressure: slowing stream consumption');
}
while (buffer.length > 0) {
yield buffer.shift();
if (paused && buffer.length < maxBuffer / 2) {
paused = false;
}
}
}
}
|
Token Budget Management#
Runaway costs are a real concern. Implement hard limits at multiple levels:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| class TokenBudget:
def __init__(self, daily_limit: int, per_request_limit: int):
self.daily_limit = daily_limit
self.per_request_limit = per_request_limit
self._daily_used = 0
self._reset_date = date.today()
def check_budget(self, estimated_tokens: int) -> bool:
self._maybe_reset()
if estimated_tokens > self.per_request_limit:
raise TokenBudgetExceeded(f"Request exceeds per-request limit")
if self._daily_used + estimated_tokens > self.daily_limit:
raise TokenBudgetExceeded(f"Would exceed daily budget")
return True
def record_usage(self, actual_tokens: int):
self._daily_used += actual_tokens
|
Prompt Caching#
Many LLM calls repeat similar context. Cache at the prompt level:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import hashlib
from functools import lru_cache
class PromptCache:
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1 hour
def cache_key(self, prompt: str, model: str) -> str:
content = f"{model}:{prompt}"
return f"llm:cache:{hashlib.sha256(content.encode()).hexdigest()[:16]}"
async def get_or_call(self, prompt: str, model: str, call_fn):
key = self.cache_key(prompt, model)
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
result = await call_fn(prompt, model)
await self.redis.setex(key, self.ttl, json.dumps(result))
return result
|
For deterministic queries (classification, extraction), aggressive caching can cut costs dramatically.
Structured Output Validation#
LLMs can return malformed JSON. Always validate and retry:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| from pydantic import BaseModel, ValidationError
from typing import TypeVar, Type
T = TypeVar('T', bound=BaseModel)
async def get_structured_output(
prompt: str,
schema: Type[T],
max_retries: int = 3
) -> T:
for attempt in range(max_retries):
response = await llm.complete(prompt + f"\n\nRespond with valid JSON matching: {schema.schema_json()}")
try:
# Extract JSON from response
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
return schema.parse_raw(json_match.group())
except (ValidationError, json.JSONDecodeError) as e:
if attempt == max_retries - 1:
raise
prompt += f"\n\nPrevious attempt had error: {e}. Please fix."
raise ValueError("Failed to get valid structured output")
|
Circuit Breaker Pattern#
Prevent cascade failures when an LLM provider is down:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class LLMCircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = timedelta(seconds=recovery_timeout)
self.failures = 0
self.last_failure = None
self.state = CircuitState.CLOSED
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
return True # HALF_OPEN allows one attempt
def record_success(self):
self.failures = 0
self.state = CircuitState.CLOSED
def record_failure(self):
self.failures += 1
self.last_failure = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
|
Observability#
Log everything that matters for debugging and cost tracking:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import structlog
logger = structlog.get_logger()
async def observed_completion(prompt: str, **kwargs):
start = time.time()
request_id = str(uuid.uuid4())[:8]
logger.info("llm_request_start",
request_id=request_id,
prompt_tokens=estimate_tokens(prompt),
model=kwargs.get('model'))
try:
response = await llm.complete(prompt, **kwargs)
logger.info("llm_request_complete",
request_id=request_id,
duration_ms=(time.time() - start) * 1000,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cost_usd=calculate_cost(response))
return response
except Exception as e:
logger.error("llm_request_failed",
request_id=request_id,
error=str(e),
duration_ms=(time.time() - start) * 1000)
raise
|
Conclusion#
Robust LLM integrations require:
- Graceful degradation — cascade through providers and models
- Cost controls — budget limits at multiple levels
- Caching — reduce redundant API calls
- Validation — never trust raw LLM output
- Circuit breakers — fail fast when providers are down
- Observability — log tokens, latency, and costs
These patterns transform brittle API calls into production-ready systems. Start with the basics (retry + logging), then add sophistication as your usage scales.