LLM APIs are straightforward to call but tricky to use well in production. Here’s what I’ve learned integrating them into real systems.

Basic API Calls

OpenAI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import openai

client = openai.OpenAI(api_key="sk-...")

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain kubernetes in one sentence."}
    ],
    max_tokens=100,
    temperature=0.7
)

print(response.choices[0].message.content)

Anthropic (Claude)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import anthropic

client = anthropic.Anthropic(api_key="sk-ant-...")

response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain kubernetes in one sentence."}
    ]
)

print(response.content[0].text)

curl (Any Provider)

1
2
3
4
5
6
7
curl https://api.openai.com/v1/chat/completions \
  -H "Authorization: Bearer $OPENAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "gpt-4o",
    "messages": [{"role": "user", "content": "Hello!"}]
  }'

Streaming Responses

For better UX, stream tokens as they arrive:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# OpenAI streaming
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a haiku about APIs"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)
1
2
3
4
5
6
7
8
# Claude streaming
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about APIs"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Error Handling

APIs fail. Handle it gracefully:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
from openai import RateLimitError, APIError, APITimeoutError

def call_llm_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="gpt-4o",
                messages=messages,
                timeout=30
            )
            return response.choices[0].message.content
        
        except RateLimitError:
            wait = 2 ** attempt  # Exponential backoff
            print(f"Rate limited. Waiting {wait}s...")
            time.sleep(wait)
        
        except APITimeoutError:
            print(f"Timeout on attempt {attempt + 1}")
            continue
        
        except APIError as e:
            print(f"API error: {e}")
            if attempt == max_retries - 1:
                raise
    
    raise Exception("Max retries exceeded")

Cost Control

LLM calls add up fast. Track and limit usage:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import tiktoken

def count_tokens(text, model="gpt-4o"):
    """Estimate token count before sending."""
    enc = tiktoken.encoding_for_model(model)
    return len(enc.encode(text))

def check_cost_before_call(messages, model="gpt-4o"):
    """Warn if request will be expensive."""
    # Rough token counts
    total = sum(count_tokens(m["content"]) for m in messages)
    
    # GPT-4o pricing (example)
    input_cost = (total / 1000) * 0.005
    
    if input_cost > 0.10:  # More than 10 cents
        print(f"Warning: Estimated cost ${input_cost:.2f}")
        return False
    return True

Set Budget Limits

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class UsageTracker:
    def __init__(self, daily_budget=5.00):
        self.daily_budget = daily_budget
        self.daily_spend = 0.0
    
    def track(self, response):
        usage = response.usage
        # Calculate cost based on model pricing
        cost = (usage.prompt_tokens * 0.005 + 
                usage.completion_tokens * 0.015) / 1000
        self.daily_spend += cost
        
        if self.daily_spend >= self.daily_budget:
            raise Exception(f"Daily budget exceeded: ${self.daily_spend:.2f}")
        
        return cost

tracker = UsageTracker(daily_budget=10.00)
# After each call:
cost = tracker.track(response)

Caching Responses

Don’t pay for the same answer twice:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import hashlib
import json
import redis

r = redis.Redis()

def cached_llm_call(messages, model="gpt-4o", ttl=3600):
    """Cache LLM responses by message hash."""
    # Create cache key from messages
    key = hashlib.sha256(
        json.dumps(messages, sort_keys=True).encode()
    ).hexdigest()
    cache_key = f"llm:{model}:{key}"
    
    # Check cache
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Call API
    response = client.chat.completions.create(
        model=model,
        messages=messages
    )
    result = response.choices[0].message.content
    
    # Cache result
    r.setex(cache_key, ttl, json.dumps(result))
    
    return result

Structured Output

Get JSON back reliably:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# OpenAI function calling
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Extract: John Smith, age 30, from NYC"}],
    functions=[{
        "name": "extract_person",
        "parameters": {
            "type": "object",
            "properties": {
                "name": {"type": "string"},
                "age": {"type": "integer"},
                "city": {"type": "string"}
            },
            "required": ["name", "age", "city"]
        }
    }],
    function_call={"name": "extract_person"}
)

import json
data = json.loads(response.choices[0].message.function_call.arguments)
# {"name": "John Smith", "age": 30, "city": "NYC"}

Multi-Provider Fallback

Don’t depend on one provider:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
PROVIDERS = [
    ("openai", "gpt-4o"),
    ("anthropic", "claude-sonnet-4-20250514"),
    ("openai", "gpt-4o-mini"),  # Cheaper fallback
]

def resilient_llm_call(messages):
    """Try multiple providers before failing."""
    for provider, model in PROVIDERS:
        try:
            if provider == "openai":
                response = openai_client.chat.completions.create(
                    model=model, messages=messages
                )
                return response.choices[0].message.content
            
            elif provider == "anthropic":
                # Convert message format for Claude
                response = anthropic_client.messages.create(
                    model=model,
                    max_tokens=1024,
                    messages=messages
                )
                return response.content[0].text
        
        except Exception as e:
            print(f"{provider}/{model} failed: {e}")
            continue
    
    raise Exception("All providers failed")

Async for Throughput

Process multiple requests concurrently:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def process_batch(prompts):
    """Process multiple prompts concurrently."""
    tasks = [
        async_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": p}]
        )
        for p in prompts
    ]
    
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    
    results = []
    for r in responses:
        if isinstance(r, Exception):
            results.append(f"Error: {r}")
        else:
            results.append(r.choices[0].message.content)
    
    return results

# Usage
prompts = ["Summarize X", "Summarize Y", "Summarize Z"]
results = asyncio.run(process_batch(prompts))

Production Checklist

  • Error handling: Retry with backoff, handle all error types
  • Timeouts: Set reasonable limits (30-60s)
  • Caching: Cache deterministic queries
  • Cost tracking: Log usage, set budget alerts
  • Rate limiting: Respect provider limits, queue excess
  • Fallbacks: Multiple providers or graceful degradation
  • Logging: Track requests, latency, costs
  • Testing: Mock API calls in tests

Quick Reference

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Environment setup
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."

# Basic patterns
response = client.chat.completions.create(...)  # Sync
stream = client.chat.completions.create(stream=True)  # Stream
response = await async_client.chat.completions.create(...)  # Async

# Key parameters
model="gpt-4o"           # Model selection
max_tokens=1024          # Limit response length  
temperature=0.7          # Creativity (0=deterministic, 1=creative)
timeout=30               # Prevent hangs

LLM APIs are easy to call, hard to call well. Build in resilience from day one — your future self will thank you when that 3 AM outage hits.