LLM APIs are straightforward to call but tricky to use well in production. Here’s what I’ve learned integrating them into real systems.
Basic API Calls#
OpenAI#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import openai
client = openai.OpenAI(api_key="sk-...")
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain kubernetes in one sentence."}
],
max_tokens=100,
temperature=0.7
)
print(response.choices[0].message.content)
|
Anthropic (Claude)#
1
2
3
4
5
6
7
8
9
10
11
12
13
| import anthropic
client = anthropic.Anthropic(api_key="sk-ant-...")
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain kubernetes in one sentence."}
]
)
print(response.content[0].text)
|
curl (Any Provider)#
1
2
3
4
5
6
7
| curl https://api.openai.com/v1/chat/completions \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o",
"messages": [{"role": "user", "content": "Hello!"}]
}'
|
Streaming Responses#
For better UX, stream tokens as they arrive:
1
2
3
4
5
6
7
8
9
10
| # OpenAI streaming
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a haiku about APIs"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
|
1
2
3
4
5
6
7
8
| # Claude streaming
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about APIs"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
|
Error Handling#
APIs fail. Handle it gracefully:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| import time
from openai import RateLimitError, APIError, APITimeoutError
def call_llm_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
timeout=30
)
return response.choices[0].message.content
except RateLimitError:
wait = 2 ** attempt # Exponential backoff
print(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)
except APITimeoutError:
print(f"Timeout on attempt {attempt + 1}")
continue
except APIError as e:
print(f"API error: {e}")
if attempt == max_retries - 1:
raise
raise Exception("Max retries exceeded")
|
Cost Control#
LLM calls add up fast. Track and limit usage:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import tiktoken
def count_tokens(text, model="gpt-4o"):
"""Estimate token count before sending."""
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
def check_cost_before_call(messages, model="gpt-4o"):
"""Warn if request will be expensive."""
# Rough token counts
total = sum(count_tokens(m["content"]) for m in messages)
# GPT-4o pricing (example)
input_cost = (total / 1000) * 0.005
if input_cost > 0.10: # More than 10 cents
print(f"Warning: Estimated cost ${input_cost:.2f}")
return False
return True
|
Set Budget Limits#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| class UsageTracker:
def __init__(self, daily_budget=5.00):
self.daily_budget = daily_budget
self.daily_spend = 0.0
def track(self, response):
usage = response.usage
# Calculate cost based on model pricing
cost = (usage.prompt_tokens * 0.005 +
usage.completion_tokens * 0.015) / 1000
self.daily_spend += cost
if self.daily_spend >= self.daily_budget:
raise Exception(f"Daily budget exceeded: ${self.daily_spend:.2f}")
return cost
tracker = UsageTracker(daily_budget=10.00)
# After each call:
cost = tracker.track(response)
|
Caching Responses#
Don’t pay for the same answer twice:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| import hashlib
import json
import redis
r = redis.Redis()
def cached_llm_call(messages, model="gpt-4o", ttl=3600):
"""Cache LLM responses by message hash."""
# Create cache key from messages
key = hashlib.sha256(
json.dumps(messages, sort_keys=True).encode()
).hexdigest()
cache_key = f"llm:{model}:{key}"
# Check cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Call API
response = client.chat.completions.create(
model=model,
messages=messages
)
result = response.choices[0].message.content
# Cache result
r.setex(cache_key, ttl, json.dumps(result))
return result
|
Structured Output#
Get JSON back reliably:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| # OpenAI function calling
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Extract: John Smith, age 30, from NYC"}],
functions=[{
"name": "extract_person",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"city": {"type": "string"}
},
"required": ["name", "age", "city"]
}
}],
function_call={"name": "extract_person"}
)
import json
data = json.loads(response.choices[0].message.function_call.arguments)
# {"name": "John Smith", "age": 30, "city": "NYC"}
|
Multi-Provider Fallback#
Don’t depend on one provider:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| PROVIDERS = [
("openai", "gpt-4o"),
("anthropic", "claude-sonnet-4-20250514"),
("openai", "gpt-4o-mini"), # Cheaper fallback
]
def resilient_llm_call(messages):
"""Try multiple providers before failing."""
for provider, model in PROVIDERS:
try:
if provider == "openai":
response = openai_client.chat.completions.create(
model=model, messages=messages
)
return response.choices[0].message.content
elif provider == "anthropic":
# Convert message format for Claude
response = anthropic_client.messages.create(
model=model,
max_tokens=1024,
messages=messages
)
return response.content[0].text
except Exception as e:
print(f"{provider}/{model} failed: {e}")
continue
raise Exception("All providers failed")
|
Async for Throughput#
Process multiple requests concurrently:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def process_batch(prompts):
"""Process multiple prompts concurrently."""
tasks = [
async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": p}]
)
for p in prompts
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for r in responses:
if isinstance(r, Exception):
results.append(f"Error: {r}")
else:
results.append(r.choices[0].message.content)
return results
# Usage
prompts = ["Summarize X", "Summarize Y", "Summarize Z"]
results = asyncio.run(process_batch(prompts))
|
Production Checklist#
Quick Reference#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # Environment setup
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
# Basic patterns
response = client.chat.completions.create(...) # Sync
stream = client.chat.completions.create(stream=True) # Stream
response = await async_client.chat.completions.create(...) # Async
# Key parameters
model="gpt-4o" # Model selection
max_tokens=1024 # Limit response length
temperature=0.7 # Creativity (0=deterministic, 1=creative)
timeout=30 # Prevent hangs
|
LLM APIs are easy to call, hard to call well. Build in resilience from day one — your future self will thank you when that 3 AM outage hits.
📬 Get the Newsletter
Weekly insights on DevOps, automation, and CLI mastery. No spam, unsubscribe anytime.