Integrating LLM APIs into production systems is harder than the tutorials suggest. The API call works in development. Then you hit rate limits, latency spikes, context limits, and costs that scale faster than your revenue.

Here’s how to build LLM integrations that actually work.

The Basics Nobody Mentions

Always Stream

Non-streaming API calls block until complete. For a 500-token response, that’s 5-15 seconds of your user staring at nothing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Bad: User waits forever
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": prompt}]
)
print(response.choices[0].message.content)

# Good: Tokens appear as generated
stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": prompt}],
    stream=True
)
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Streaming also lets you abort early if the response is going off-rails.

Set Timeouts

LLM APIs hang sometimes. Always set timeouts.

1
2
3
4
5
6
from openai import OpenAI
import httpx

client = OpenAI(
    timeout=httpx.Timeout(60.0, connect=5.0)  # 60s total, 5s connect
)

For streaming, implement your own per-chunk timeout:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import asyncio

async def stream_with_timeout(stream, chunk_timeout=10):
    async for chunk in stream:
        try:
            yield await asyncio.wait_for(
                asyncio.to_thread(lambda: chunk),
                timeout=chunk_timeout
            )
        except asyncio.TimeoutError:
            raise Exception("Stream stalled")

Handle Rate Limits

Every LLM API has rate limits. Build retry logic from day one.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import time
from openai import RateLimitError

def call_with_retry(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            wait_time = (2 ** attempt) + (time.time() % 1)  # Exponential + jitter
            time.sleep(wait_time)

Better: use a queue with rate limiting.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from asyncio import Semaphore, Queue

class RateLimitedClient:
    def __init__(self, requests_per_minute=60):
        self.semaphore = Semaphore(requests_per_minute)
        self.client = OpenAI()
    
    async def complete(self, messages):
        async with self.semaphore:
            response = await self.client.chat.completions.create(
                model="gpt-4",
                messages=messages
            )
            await asyncio.sleep(60 / self.requests_per_minute)
            return response

Prompt Engineering Patterns

Structured Output

Don’t pray for JSON. Enforce it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from pydantic import BaseModel

class ExtractedData(BaseModel):
    name: str
    email: str
    sentiment: str

response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "Extract contact info and sentiment."},
        {"role": "user", "content": email_text}
    ],
    response_format=ExtractedData
)

data = response.choices[0].message.parsed
print(data.name, data.email, data.sentiment)

For APIs without native structured output, use function calling:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
tools = [{
    "type": "function",
    "function": {
        "name": "save_contact",
        "parameters": {
            "type": "object",
            "properties": {
                "name": {"type": "string"},
                "email": {"type": "string"},
                "sentiment": {"enum": ["positive", "neutral", "negative"]}
            },
            "required": ["name", "email", "sentiment"]
        }
    }
}]

response = client.chat.completions.create(
    model="gpt-4",
    messages=messages,
    tools=tools,
    tool_choice={"type": "function", "function": {"name": "save_contact"}}
)

Few-Shot Examples

Show, don’t tell.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
messages = [
    {"role": "system", "content": "Classify support tickets by urgency."},
    {"role": "user", "content": "My production database is down!"},
    {"role": "assistant", "content": "critical"},
    {"role": "user", "content": "How do I change my password?"},
    {"role": "assistant", "content": "low"},
    {"role": "user", "content": "Payment processing is failing for 50% of users"},
    {"role": "assistant", "content": "critical"},
    {"role": "user", "content": actual_ticket}
]

Three examples often outperform pages of instructions.

Chain of Thought for Complex Tasks

For reasoning tasks, ask for the thinking:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
prompt = """
Analyze this code for security vulnerabilities.

First, list each potential vulnerability you see.
Then, for each one, explain why it's a risk.
Finally, provide your severity rating (low/medium/high/critical).

Code:
{code}
"""

The intermediate steps improve accuracy and give you debuggable output.

Context Management

Token Counting

Know your limits before you hit them.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import tiktoken

def count_tokens(text, model="gpt-4"):
    enc = tiktoken.encoding_for_model(model)
    return len(enc.encode(text))

def truncate_to_tokens(text, max_tokens, model="gpt-4"):
    enc = tiktoken.encoding_for_model(model)
    tokens = enc.encode(text)
    if len(tokens) <= max_tokens:
        return text
    return enc.decode(tokens[:max_tokens])

Conversation Summarization

Long conversations overflow context. Summarize older messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def manage_conversation(messages, max_tokens=6000):
    total = sum(count_tokens(m["content"]) for m in messages)
    
    if total <= max_tokens:
        return messages
    
    # Keep system prompt and recent messages
    system = messages[0]
    recent = messages[-4:]
    older = messages[1:-4]
    
    # Summarize older messages
    summary_prompt = f"Summarize this conversation:\n\n{older}"
    summary = get_summary(summary_prompt)
    
    return [
        system,
        {"role": "system", "content": f"Previous conversation summary: {summary}"},
        *recent
    ]

RAG: Retrieval Augmented Generation

For knowledge-heavy applications, don’t stuff everything in context. Retrieve relevant chunks.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from openai import OpenAI
import numpy as np

client = OpenAI()

def get_embedding(text):
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

def find_relevant_chunks(query, chunks, top_k=5):
    query_embedding = get_embedding(query)
    
    similarities = []
    for chunk in chunks:
        similarity = np.dot(query_embedding, chunk["embedding"])
        similarities.append((similarity, chunk["text"]))
    
    similarities.sort(reverse=True)
    return [text for _, text in similarities[:top_k]]

def answer_with_context(question, knowledge_base):
    relevant = find_relevant_chunks(question, knowledge_base)
    
    messages = [
        {"role": "system", "content": f"Answer based on this context:\n\n{relevant}"},
        {"role": "user", "content": question}
    ]
    
    return client.chat.completions.create(
        model="gpt-4",
        messages=messages
    )

Cost Control

Model Selection

Not everything needs GPT-4.

1
2
3
4
5
6
7
def select_model(task_complexity):
    if task_complexity == "simple":
        return "gpt-3.5-turbo"  # $0.50/1M tokens
    elif task_complexity == "medium":
        return "gpt-4o-mini"     # $0.15/1M tokens
    else:
        return "gpt-4o"          # $2.50/1M tokens

Route classification tasks to cheap models. Reserve expensive models for generation.

Caching

Same prompt = same response. Cache aggressively.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import hashlib
import json
import redis

cache = redis.Redis()

def cached_completion(messages, **kwargs):
    # Create cache key from inputs
    key = hashlib.sha256(
        json.dumps({"messages": messages, **kwargs}).encode()
    ).hexdigest()
    
    # Check cache
    cached = cache.get(key)
    if cached:
        return json.loads(cached)
    
    # Call API
    response = client.chat.completions.create(
        messages=messages,
        **kwargs
    )
    
    # Cache for 24 hours
    cache.setex(key, 86400, json.dumps(response.to_dict()))
    
    return response

For semantic caching (similar questions → same answer), embed the query and check cosine similarity.

Batch Processing

For bulk operations, use the batch API (50% cheaper).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Create batch file
requests = [
    {"custom_id": f"req-{i}", "method": "POST", "url": "/v1/chat/completions",
     "body": {"model": "gpt-4", "messages": msgs}}
    for i, msgs in enumerate(all_messages)
]

# Submit batch
batch_file = client.files.create(file=jsonl_content, purpose="batch")
batch = client.batches.create(input_file_id=batch_file.id, endpoint="/v1/chat/completions")

# Poll for completion (or use webhooks)
while batch.status != "completed":
    time.sleep(60)
    batch = client.batches.retrieve(batch.id)

Observability

Log Everything

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import logging
import time

logger = logging.getLogger(__name__)

def logged_completion(messages, **kwargs):
    start = time.time()
    
    try:
        response = client.chat.completions.create(messages=messages, **kwargs)
        
        logger.info("llm_completion", extra={
            "model": kwargs.get("model"),
            "input_tokens": response.usage.prompt_tokens,
            "output_tokens": response.usage.completion_tokens,
            "latency_ms": (time.time() - start) * 1000,
            "success": True
        })
        
        return response
        
    except Exception as e:
        logger.error("llm_completion_failed", extra={
            "error": str(e),
            "latency_ms": (time.time() - start) * 1000
        })
        raise

Track: token usage, latency, error rates, cost per request.

Evaluation

How do you know if your prompts are working?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def evaluate_extraction(test_cases):
    results = []
    
    for case in test_cases:
        response = extract_data(case["input"])
        
        results.append({
            "input": case["input"],
            "expected": case["expected"],
            "actual": response,
            "correct": response == case["expected"]
        })
    
    accuracy = sum(r["correct"] for r in results) / len(results)
    return accuracy, results

Build a test suite. Run it on prompt changes. Track accuracy over time.

Start Here

  1. Today: Add timeouts and retry logic
  2. This week: Implement response caching
  3. This month: Add token counting and context management
  4. This quarter: Build evaluation pipelines

LLM APIs are powerful but unreliable. The patterns above turn “works in demo” into “works in production.”


The best LLM integration is one where users never think about the AI — they just get answers that work.