Integrating LLM APIs into production applications requires more than just making API calls. These patterns address the real challenges: rate limits, token costs, latency, and reliability.
Basic Client Setup#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import os
from anthropic import Anthropic
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=60.0,
max_retries=3,
)
def chat(message: str, system: str = None) -> str:
"""Simple completion with sensible defaults."""
messages = [{"role": "user", "content": message}]
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=system or "You are a helpful assistant.",
messages=messages,
)
return response.content[0].text
|
Retry with Exponential Backoff#
Built-in retries help, but custom logic handles edge cases:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import time
import random
from anthropic import RateLimitError, APIStatusError
def chat_with_retry(message: str, max_retries: int = 5) -> str:
"""Retry with exponential backoff and jitter."""
for attempt in range(max_retries):
try:
return chat(message)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, waiting {wait_time:.1f}s...")
time.sleep(wait_time)
except APIStatusError as e:
if e.status_code >= 500:
# Server error, retry
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
else:
# Client error, don't retry
raise
raise Exception("Max retries exceeded")
|
Streaming Responses#
For better UX, stream responses instead of waiting for completion:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def chat_stream(message: str):
"""Stream response tokens as they arrive."""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": message}],
) as stream:
for text in stream.text_stream:
yield text
# Usage
for chunk in chat_stream("Explain quantum computing"):
print(chunk, end="", flush=True)
|
Token Counting and Cost Management#
Track usage to avoid budget surprises:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| from dataclasses import dataclass
from typing import Optional
@dataclass
class UsageTracker:
input_tokens: int = 0
output_tokens: int = 0
# Pricing per million tokens (example rates)
INPUT_COST_PER_M = 3.00
OUTPUT_COST_PER_M = 15.00
def add(self, response):
self.input_tokens += response.usage.input_tokens
self.output_tokens += response.usage.output_tokens
@property
def cost(self) -> float:
input_cost = (self.input_tokens / 1_000_000) * self.INPUT_COST_PER_M
output_cost = (self.output_tokens / 1_000_000) * self.OUTPUT_COST_PER_M
return input_cost + output_cost
def __str__(self):
return f"Tokens: {self.input_tokens:,} in / {self.output_tokens:,} out | Cost: ${self.cost:.4f}"
# Usage
tracker = UsageTracker()
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}],
)
tracker.add(response)
print(tracker) # Tokens: 12 in / 45 out | Cost: $0.0007
|
Conversation Management#
Maintain context across multiple turns:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| class Conversation:
def __init__(self, system: str = None, model: str = "claude-sonnet-4-20250514"):
self.messages = []
self.system = system
self.model = model
self.tracker = UsageTracker()
def chat(self, user_message: str) -> str:
self.messages.append({"role": "user", "content": user_message})
response = client.messages.create(
model=self.model,
max_tokens=2048,
system=self.system,
messages=self.messages,
)
assistant_message = response.content[0].text
self.messages.append({"role": "assistant", "content": assistant_message})
self.tracker.add(response)
return assistant_message
def trim_history(self, keep_last: int = 10):
"""Prevent context from growing too large."""
if len(self.messages) > keep_last * 2:
self.messages = self.messages[-(keep_last * 2):]
# Usage
conv = Conversation(system="You are a Python expert.")
print(conv.chat("How do I read a JSON file?"))
print(conv.chat("What if it's very large?")) # Has context from first question
|
Get reliable structured data from LLMs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| import json
tools = [
{
"name": "extract_entities",
"description": "Extract named entities from text",
"input_schema": {
"type": "object",
"properties": {
"people": {
"type": "array",
"items": {"type": "string"},
"description": "Names of people mentioned"
},
"organizations": {
"type": "array",
"items": {"type": "string"},
"description": "Organization names mentioned"
},
"locations": {
"type": "array",
"items": {"type": "string"},
"description": "Location names mentioned"
}
},
"required": ["people", "organizations", "locations"]
}
}
]
def extract_entities(text: str) -> dict:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
tool_choice={"type": "tool", "name": "extract_entities"},
messages=[{
"role": "user",
"content": f"Extract entities from this text:\n\n{text}"
}]
)
# Tool use response contains structured data
tool_use = response.content[0]
return tool_use.input
# Usage
text = "Tim Cook announced that Apple will open a new office in Austin, Texas."
entities = extract_entities(text)
# {"people": ["Tim Cook"], "organizations": ["Apple"], "locations": ["Austin", "Texas"]}
|
Caching Responses#
Avoid redundant API calls:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| import hashlib
import json
from pathlib import Path
class ResponseCache:
def __init__(self, cache_dir: str = ".llm_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _key(self, model: str, messages: list, system: str = None) -> str:
data = json.dumps({"model": model, "messages": messages, "system": system}, sort_keys=True)
return hashlib.sha256(data.encode()).hexdigest()
def get(self, model: str, messages: list, system: str = None) -> str | None:
key = self._key(model, messages, system)
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
return json.loads(cache_file.read_text())["response"]
return None
def set(self, model: str, messages: list, response: str, system: str = None):
key = self._key(model, messages, system)
cache_file = self.cache_dir / f"{key}.json"
cache_file.write_text(json.dumps({"response": response}))
cache = ResponseCache()
def chat_cached(message: str) -> str:
messages = [{"role": "user", "content": message}]
# Check cache
cached = cache.get("claude-sonnet-4-20250514", messages)
if cached:
return cached
# Make API call
response = chat(message)
# Cache response
cache.set("claude-sonnet-4-20250514", messages, response)
return response
|
Parallel Requests with Rate Limiting#
Process multiple items without hitting rate limits:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic()
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
async def chat_async(message: str) -> str:
async with semaphore:
response = await async_client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": message}],
)
return response.content[0].text
async def process_batch(items: list[str]) -> list[str]:
tasks = [chat_async(f"Summarize: {item}") for item in items]
return await asyncio.gather(*tasks)
# Usage
items = ["Article 1...", "Article 2...", "Article 3..."]
summaries = asyncio.run(process_batch(items))
|
Error Handling Patterns#
Graceful degradation for production systems:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| from anthropic import APIError, AuthenticationError
def safe_chat(message: str, fallback: str = "I'm unable to process that request.") -> str:
try:
return chat(message)
except AuthenticationError:
# Log and alert - this is a configuration issue
print("ERROR: Invalid API key")
return fallback
except RateLimitError:
# Could queue for later or return cached response
print("Rate limited, returning fallback")
return fallback
except APIError as e:
# Log the error details
print(f"API error: {e.status_code} - {e.message}")
return fallback
except Exception as e:
# Unexpected error
print(f"Unexpected error: {e}")
return fallback
|
Best Practices Summary#
- Always set timeouts — Don’t let requests hang forever
- Implement retries — Transient failures are normal
- Track token usage — Costs add up quickly
- Stream for UX — Users prefer seeing progress
- Cache when possible — Same input = same output
- Use structured outputs — Tools/functions beat parsing prose
- Limit concurrency — Respect rate limits
- Graceful degradation — Have fallbacks for failures
LLM APIs are powerful but require the same production patterns as any external service: retry logic, error handling, observability, and cost awareness.
Start with the simple patterns, add complexity as needed, and always keep an eye on that token counter.