Integrating LLM APIs into production applications requires more than just making API calls. These patterns address the real challenges: rate limits, token costs, latency, and reliability.

Basic Client Setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import os
from anthropic import Anthropic

client = Anthropic(
    api_key=os.environ.get("ANTHROPIC_API_KEY"),
    timeout=60.0,
    max_retries=3,
)

def chat(message: str, system: str = None) -> str:
    """Simple completion with sensible defaults."""
    messages = [{"role": "user", "content": message}]
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        system=system or "You are a helpful assistant.",
        messages=messages,
    )
    
    return response.content[0].text

Retry with Exponential Backoff

Built-in retries help, but custom logic handles edge cases:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
import random
from anthropic import RateLimitError, APIStatusError

def chat_with_retry(message: str, max_retries: int = 5) -> str:
    """Retry with exponential backoff and jitter."""
    
    for attempt in range(max_retries):
        try:
            return chat(message)
            
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            
            # Exponential backoff with jitter
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"Rate limited, waiting {wait_time:.1f}s...")
            time.sleep(wait_time)
            
        except APIStatusError as e:
            if e.status_code >= 500:
                # Server error, retry
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait_time)
            else:
                # Client error, don't retry
                raise
    
    raise Exception("Max retries exceeded")

Streaming Responses

For better UX, stream responses instead of waiting for completion:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def chat_stream(message: str):
    """Stream response tokens as they arrive."""
    
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": message}],
    ) as stream:
        for text in stream.text_stream:
            yield text

# Usage
for chunk in chat_stream("Explain quantum computing"):
    print(chunk, end="", flush=True)

Token Counting and Cost Management

Track usage to avoid budget surprises:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from dataclasses import dataclass
from typing import Optional

@dataclass
class UsageTracker:
    input_tokens: int = 0
    output_tokens: int = 0
    
    # Pricing per million tokens (example rates)
    INPUT_COST_PER_M = 3.00
    OUTPUT_COST_PER_M = 15.00
    
    def add(self, response):
        self.input_tokens += response.usage.input_tokens
        self.output_tokens += response.usage.output_tokens
    
    @property
    def cost(self) -> float:
        input_cost = (self.input_tokens / 1_000_000) * self.INPUT_COST_PER_M
        output_cost = (self.output_tokens / 1_000_000) * self.OUTPUT_COST_PER_M
        return input_cost + output_cost
    
    def __str__(self):
        return f"Tokens: {self.input_tokens:,} in / {self.output_tokens:,} out | Cost: ${self.cost:.4f}"

# Usage
tracker = UsageTracker()

response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello!"}],
)
tracker.add(response)
print(tracker)  # Tokens: 12 in / 45 out | Cost: $0.0007

Conversation Management

Maintain context across multiple turns:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Conversation:
    def __init__(self, system: str = None, model: str = "claude-sonnet-4-20250514"):
        self.messages = []
        self.system = system
        self.model = model
        self.tracker = UsageTracker()
    
    def chat(self, user_message: str) -> str:
        self.messages.append({"role": "user", "content": user_message})
        
        response = client.messages.create(
            model=self.model,
            max_tokens=2048,
            system=self.system,
            messages=self.messages,
        )
        
        assistant_message = response.content[0].text
        self.messages.append({"role": "assistant", "content": assistant_message})
        self.tracker.add(response)
        
        return assistant_message
    
    def trim_history(self, keep_last: int = 10):
        """Prevent context from growing too large."""
        if len(self.messages) > keep_last * 2:
            self.messages = self.messages[-(keep_last * 2):]

# Usage
conv = Conversation(system="You are a Python expert.")
print(conv.chat("How do I read a JSON file?"))
print(conv.chat("What if it's very large?"))  # Has context from first question

Structured Output with Tool Use

Get reliable structured data from LLMs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import json

tools = [
    {
        "name": "extract_entities",
        "description": "Extract named entities from text",
        "input_schema": {
            "type": "object",
            "properties": {
                "people": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Names of people mentioned"
                },
                "organizations": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Organization names mentioned"
                },
                "locations": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Location names mentioned"
                }
            },
            "required": ["people", "organizations", "locations"]
        }
    }
]

def extract_entities(text: str) -> dict:
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        tools=tools,
        tool_choice={"type": "tool", "name": "extract_entities"},
        messages=[{
            "role": "user",
            "content": f"Extract entities from this text:\n\n{text}"
        }]
    )
    
    # Tool use response contains structured data
    tool_use = response.content[0]
    return tool_use.input

# Usage
text = "Tim Cook announced that Apple will open a new office in Austin, Texas."
entities = extract_entities(text)
# {"people": ["Tim Cook"], "organizations": ["Apple"], "locations": ["Austin", "Texas"]}

Caching Responses

Avoid redundant API calls:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import hashlib
import json
from pathlib import Path

class ResponseCache:
    def __init__(self, cache_dir: str = ".llm_cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
    
    def _key(self, model: str, messages: list, system: str = None) -> str:
        data = json.dumps({"model": model, "messages": messages, "system": system}, sort_keys=True)
        return hashlib.sha256(data.encode()).hexdigest()
    
    def get(self, model: str, messages: list, system: str = None) -> str | None:
        key = self._key(model, messages, system)
        cache_file = self.cache_dir / f"{key}.json"
        if cache_file.exists():
            return json.loads(cache_file.read_text())["response"]
        return None
    
    def set(self, model: str, messages: list, response: str, system: str = None):
        key = self._key(model, messages, system)
        cache_file = self.cache_dir / f"{key}.json"
        cache_file.write_text(json.dumps({"response": response}))

cache = ResponseCache()

def chat_cached(message: str) -> str:
    messages = [{"role": "user", "content": message}]
    
    # Check cache
    cached = cache.get("claude-sonnet-4-20250514", messages)
    if cached:
        return cached
    
    # Make API call
    response = chat(message)
    
    # Cache response
    cache.set("claude-sonnet-4-20250514", messages, response)
    
    return response

Parallel Requests with Rate Limiting

Process multiple items without hitting rate limits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()
semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests

async def chat_async(message: str) -> str:
    async with semaphore:
        response = await async_client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            messages=[{"role": "user", "content": message}],
        )
        return response.content[0].text

async def process_batch(items: list[str]) -> list[str]:
    tasks = [chat_async(f"Summarize: {item}") for item in items]
    return await asyncio.gather(*tasks)

# Usage
items = ["Article 1...", "Article 2...", "Article 3..."]
summaries = asyncio.run(process_batch(items))

Error Handling Patterns

Graceful degradation for production systems:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from anthropic import APIError, AuthenticationError

def safe_chat(message: str, fallback: str = "I'm unable to process that request.") -> str:
    try:
        return chat(message)
    
    except AuthenticationError:
        # Log and alert - this is a configuration issue
        print("ERROR: Invalid API key")
        return fallback
    
    except RateLimitError:
        # Could queue for later or return cached response
        print("Rate limited, returning fallback")
        return fallback
    
    except APIError as e:
        # Log the error details
        print(f"API error: {e.status_code} - {e.message}")
        return fallback
    
    except Exception as e:
        # Unexpected error
        print(f"Unexpected error: {e}")
        return fallback

Best Practices Summary

  1. Always set timeouts — Don’t let requests hang forever
  2. Implement retries — Transient failures are normal
  3. Track token usage — Costs add up quickly
  4. Stream for UX — Users prefer seeing progress
  5. Cache when possible — Same input = same output
  6. Use structured outputs — Tools/functions beat parsing prose
  7. Limit concurrency — Respect rate limits
  8. Graceful degradation — Have fallbacks for failures

LLM APIs are powerful but require the same production patterns as any external service: retry logic, error handling, observability, and cost awareness.

Start with the simple patterns, add complexity as needed, and always keep an eye on that token counter.