Integrating LLM APIs into production systems is harder than the tutorials suggest. The API call works in development. Then you hit rate limits, latency spikes, context limits, and costs that scale faster than your revenue.
Here’s how to build LLM integrations that actually work.
The Basics Nobody Mentions#
Always Stream#
Non-streaming API calls block until complete. For a 500-token response, that’s 5-15 seconds of your user staring at nothing.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| # Bad: User waits forever
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
print(response.choices[0].message.content)
# Good: Tokens appear as generated
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
|
Streaming also lets you abort early if the response is going off-rails.
Set Timeouts#
LLM APIs hang sometimes. Always set timeouts.
1
2
3
4
5
6
| from openai import OpenAI
import httpx
client = OpenAI(
timeout=httpx.Timeout(60.0, connect=5.0) # 60s total, 5s connect
)
|
For streaming, implement your own per-chunk timeout:
1
2
3
4
5
6
7
8
9
10
11
| import asyncio
async def stream_with_timeout(stream, chunk_timeout=10):
async for chunk in stream:
try:
yield await asyncio.wait_for(
asyncio.to_thread(lambda: chunk),
timeout=chunk_timeout
)
except asyncio.TimeoutError:
raise Exception("Stream stalled")
|
Handle Rate Limits#
Every LLM API has rate limits. Build retry logic from day one.
1
2
3
4
5
6
7
8
9
10
11
12
| import time
from openai import RateLimitError
def call_with_retry(func, max_retries=3):
for attempt in range(max_retries):
try:
return func()
except RateLimitError as e:
if attempt == max_retries - 1:
raise
wait_time = (2 ** attempt) + (time.time() % 1) # Exponential + jitter
time.sleep(wait_time)
|
Better: use a queue with rate limiting.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| from asyncio import Semaphore, Queue
class RateLimitedClient:
def __init__(self, requests_per_minute=60):
self.semaphore = Semaphore(requests_per_minute)
self.client = OpenAI()
async def complete(self, messages):
async with self.semaphore:
response = await self.client.chat.completions.create(
model="gpt-4",
messages=messages
)
await asyncio.sleep(60 / self.requests_per_minute)
return response
|
Prompt Engineering Patterns#
Structured Output#
Don’t pray for JSON. Enforce it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| from pydantic import BaseModel
class ExtractedData(BaseModel):
name: str
email: str
sentiment: str
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": "Extract contact info and sentiment."},
{"role": "user", "content": email_text}
],
response_format=ExtractedData
)
data = response.choices[0].message.parsed
print(data.name, data.email, data.sentiment)
|
For APIs without native structured output, use function calling:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| tools = [{
"type": "function",
"function": {
"name": "save_contact",
"parameters": {
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"},
"sentiment": {"enum": ["positive", "neutral", "negative"]}
},
"required": ["name", "email", "sentiment"]
}
}
}]
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
tool_choice={"type": "function", "function": {"name": "save_contact"}}
)
|
Few-Shot Examples#
Show, don’t tell.
1
2
3
4
5
6
7
8
9
10
| messages = [
{"role": "system", "content": "Classify support tickets by urgency."},
{"role": "user", "content": "My production database is down!"},
{"role": "assistant", "content": "critical"},
{"role": "user", "content": "How do I change my password?"},
{"role": "assistant", "content": "low"},
{"role": "user", "content": "Payment processing is failing for 50% of users"},
{"role": "assistant", "content": "critical"},
{"role": "user", "content": actual_ticket}
]
|
Three examples often outperform pages of instructions.
Chain of Thought for Complex Tasks#
For reasoning tasks, ask for the thinking:
1
2
3
4
5
6
7
8
9
10
| prompt = """
Analyze this code for security vulnerabilities.
First, list each potential vulnerability you see.
Then, for each one, explain why it's a risk.
Finally, provide your severity rating (low/medium/high/critical).
Code:
{code}
"""
|
The intermediate steps improve accuracy and give you debuggable output.
Context Management#
Token Counting#
Know your limits before you hit them.
1
2
3
4
5
6
7
8
9
10
11
12
| import tiktoken
def count_tokens(text, model="gpt-4"):
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
def truncate_to_tokens(text, max_tokens, model="gpt-4"):
enc = tiktoken.encoding_for_model(model)
tokens = enc.encode(text)
if len(tokens) <= max_tokens:
return text
return enc.decode(tokens[:max_tokens])
|
Conversation Summarization#
Long conversations overflow context. Summarize older messages.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| def manage_conversation(messages, max_tokens=6000):
total = sum(count_tokens(m["content"]) for m in messages)
if total <= max_tokens:
return messages
# Keep system prompt and recent messages
system = messages[0]
recent = messages[-4:]
older = messages[1:-4]
# Summarize older messages
summary_prompt = f"Summarize this conversation:\n\n{older}"
summary = get_summary(summary_prompt)
return [
system,
{"role": "system", "content": f"Previous conversation summary: {summary}"},
*recent
]
|
RAG: Retrieval Augmented Generation#
For knowledge-heavy applications, don’t stuff everything in context. Retrieve relevant chunks.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| from openai import OpenAI
import numpy as np
client = OpenAI()
def get_embedding(text):
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def find_relevant_chunks(query, chunks, top_k=5):
query_embedding = get_embedding(query)
similarities = []
for chunk in chunks:
similarity = np.dot(query_embedding, chunk["embedding"])
similarities.append((similarity, chunk["text"]))
similarities.sort(reverse=True)
return [text for _, text in similarities[:top_k]]
def answer_with_context(question, knowledge_base):
relevant = find_relevant_chunks(question, knowledge_base)
messages = [
{"role": "system", "content": f"Answer based on this context:\n\n{relevant}"},
{"role": "user", "content": question}
]
return client.chat.completions.create(
model="gpt-4",
messages=messages
)
|
Cost Control#
Model Selection#
Not everything needs GPT-4.
1
2
3
4
5
6
7
| def select_model(task_complexity):
if task_complexity == "simple":
return "gpt-3.5-turbo" # $0.50/1M tokens
elif task_complexity == "medium":
return "gpt-4o-mini" # $0.15/1M tokens
else:
return "gpt-4o" # $2.50/1M tokens
|
Route classification tasks to cheap models. Reserve expensive models for generation.
Caching#
Same prompt = same response. Cache aggressively.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import hashlib
import json
import redis
cache = redis.Redis()
def cached_completion(messages, **kwargs):
# Create cache key from inputs
key = hashlib.sha256(
json.dumps({"messages": messages, **kwargs}).encode()
).hexdigest()
# Check cache
cached = cache.get(key)
if cached:
return json.loads(cached)
# Call API
response = client.chat.completions.create(
messages=messages,
**kwargs
)
# Cache for 24 hours
cache.setex(key, 86400, json.dumps(response.to_dict()))
return response
|
For semantic caching (similar questions → same answer), embed the query and check cosine similarity.
Batch Processing#
For bulk operations, use the batch API (50% cheaper).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # Create batch file
requests = [
{"custom_id": f"req-{i}", "method": "POST", "url": "/v1/chat/completions",
"body": {"model": "gpt-4", "messages": msgs}}
for i, msgs in enumerate(all_messages)
]
# Submit batch
batch_file = client.files.create(file=jsonl_content, purpose="batch")
batch = client.batches.create(input_file_id=batch_file.id, endpoint="/v1/chat/completions")
# Poll for completion (or use webhooks)
while batch.status != "completed":
time.sleep(60)
batch = client.batches.retrieve(batch.id)
|
Observability#
Log Everything#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import logging
import time
logger = logging.getLogger(__name__)
def logged_completion(messages, **kwargs):
start = time.time()
try:
response = client.chat.completions.create(messages=messages, **kwargs)
logger.info("llm_completion", extra={
"model": kwargs.get("model"),
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
"latency_ms": (time.time() - start) * 1000,
"success": True
})
return response
except Exception as e:
logger.error("llm_completion_failed", extra={
"error": str(e),
"latency_ms": (time.time() - start) * 1000
})
raise
|
Track: token usage, latency, error rates, cost per request.
Evaluation#
How do you know if your prompts are working?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| def evaluate_extraction(test_cases):
results = []
for case in test_cases:
response = extract_data(case["input"])
results.append({
"input": case["input"],
"expected": case["expected"],
"actual": response,
"correct": response == case["expected"]
})
accuracy = sum(r["correct"] for r in results) / len(results)
return accuracy, results
|
Build a test suite. Run it on prompt changes. Track accuracy over time.
Start Here#
- Today: Add timeouts and retry logic
- This week: Implement response caching
- This month: Add token counting and context management
- This quarter: Build evaluation pipelines
LLM APIs are powerful but unreliable. The patterns above turn “works in demo” into “works in production.”
The best LLM integration is one where users never think about the AI — they just get answers that work.