Unstructured logs are a liability. When your application writes User 12345 logged in from 192.168.1.1, you’re creating text that’s easy to read but impossible to query at scale. Structured logging changes the game: logs become data you can filter, aggregate, and analyze.

The Problem with Text Logs

1
2
3
4
# Traditional logging
import logging
logging.info(f"User {user_id} logged in from {ip_address}")
# Output: INFO:root:User 12345 logged in from 192.168.1.1

Want to find all logins from a specific IP? You need regex. Want to count logins per user? Good luck. Want to correlate with other events? Hope your timestamp parsing is solid.

Structured Logging with JSON

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        
        # Add extra fields
        if hasattr(record, "extra"):
            log_data.update(record.extra)
        
        # Add exception info if present
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)
        
        return json.dumps(log_data)


# Configure logging
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.root.addHandler(handler)
logging.root.setLevel(logging.INFO)

# Usage
logger = logging.getLogger(__name__)

def log_with_context(logger, level, message, **kwargs):
    record = logger.makeRecord(
        logger.name, level, "", 0, message, (), None
    )
    record.extra = kwargs
    logger.handle(record)

# Log structured data
log_with_context(
    logger, 
    logging.INFO, 
    "User logged in",
    user_id=12345,
    ip_address="192.168.1.1",
    auth_method="oauth",
    session_id="abc-123"
)

Output:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
  "timestamp": "2026-02-11T16:00:00.000Z",
  "level": "INFO",
  "logger": "__main__",
  "message": "User logged in",
  "user_id": 12345,
  "ip_address": "192.168.1.1",
  "auth_method": "oauth",
  "session_id": "abc-123"
}

Now you can query: jq 'select(.user_id == 12345)' or aggregate in your log platform.

The structlog library makes this much cleaner:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import structlog

# Configure structlog
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Simple usage
logger.info("user_logged_in", user_id=12345, ip="192.168.1.1")

# Bind context that persists across calls
logger = logger.bind(request_id="req-abc-123")
logger.info("processing_started")
logger.info("database_query", table="users", duration_ms=45)
logger.info("processing_complete")
# All three logs include request_id

Context Propagation

The real power comes from maintaining context across your application:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import structlog
from contextvars import ContextVar
from functools import wraps

# Thread-safe context
request_context: ContextVar[dict] = ContextVar("request_context", default={})

def with_logging_context(**kwargs):
    """Decorator to add context to all logs within a function."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **func_kwargs):
            current = request_context.get().copy()
            current.update(kwargs)
            token = request_context.set(current)
            try:
                return func(*args, **func_kwargs)
            finally:
                request_context.reset(token)
        return wrapper
    return decorator

class ContextProcessor:
    """Structlog processor that adds context vars to every log."""
    def __call__(self, logger, method_name, event_dict):
        event_dict.update(request_context.get())
        return event_dict

# Add to structlog processors
structlog.configure(
    processors=[
        ContextProcessor(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ],
)

# Usage in FastAPI
from fastapi import FastAPI, Request
import uuid

app = FastAPI()

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
    request_id = str(uuid.uuid4())
    context = {
        "request_id": request_id,
        "method": request.method,
        "path": request.url.path,
        "client_ip": request.client.host,
    }
    token = request_context.set(context)
    
    logger = structlog.get_logger()
    logger.info("request_started")
    
    try:
        response = await call_next(request)
        logger.info("request_completed", status_code=response.status_code)
        return response
    except Exception as e:
        logger.exception("request_failed", error=str(e))
        raise
    finally:
        request_context.reset(token)

What to Log

Always Include

1
2
3
4
5
6
7
8
# Request context
logger.info("event_name",
    request_id="uuid",        # Correlation ID
    user_id=123,              # Who
    timestamp="iso8601",      # When (usually automatic)
    service="api-server",     # What service
    environment="production", # Where
)

For Operations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Database queries
logger.info("db_query",
    query_type="select",
    table="users",
    duration_ms=45,
    rows_affected=1,
)

# External API calls
logger.info("external_api_call",
    service="stripe",
    endpoint="/charges",
    method="POST",
    status_code=200,
    duration_ms=230,
)

# Business events
logger.info("order_placed",
    order_id="ord-123",
    customer_id="cust-456",
    total_amount=99.99,
    items_count=3,
)

For Errors

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
try:
    process_payment(order)
except PaymentError as e:
    logger.error("payment_failed",
        order_id=order.id,
        error_type=type(e).__name__,
        error_message=str(e),
        payment_provider="stripe",
        amount=order.total,
        exc_info=True,  # Include stack trace
    )

Log Levels Done Right

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# DEBUG: Detailed diagnostic info (disabled in production)
logger.debug("cache_lookup", key="user:123", hit=True)

# INFO: Normal operations, business events
logger.info("order_shipped", order_id="123", carrier="ups")

# WARNING: Something unexpected but handled
logger.warning("rate_limit_approaching", 
    current=95, limit=100, user_id=123)

# ERROR: Something failed, needs attention
logger.error("payment_declined", order_id="123", reason="insufficient_funds")

# CRITICAL: System is broken, wake someone up
logger.critical("database_connection_lost", 
    host="db.example.com", retry_count=5)

Querying Structured Logs

With jq (local)

1
2
3
4
5
6
7
8
# Find all errors
cat app.log | jq 'select(.level == "ERROR")'

# Find slow database queries
cat app.log | jq 'select(.duration_ms > 1000)'

# Count events by type
cat app.log | jq -s 'group_by(.message) | map({event: .[0].message, count: length})'

With CloudWatch Logs Insights

1
2
3
4
5
fields @timestamp, @message
| filter level = "ERROR"
| filter user_id = 12345
| sort @timestamp desc
| limit 100

With Elasticsearch/OpenSearch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "query": {
    "bool": {
      "must": [
        { "match": { "level": "ERROR" } },
        { "range": { "duration_ms": { "gte": 1000 } } }
      ],
      "filter": [
        { "range": { "@timestamp": { "gte": "now-1h" } } }
      ]
    }
  }
}

Performance Considerations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Avoid expensive operations in log calls
# BAD: This runs even if debug is disabled
logger.debug(f"Large object: {expensive_serialize(obj)}")

# GOOD: Use lazy evaluation
logger.debug("large_object", data=lambda: expensive_serialize(obj))

# Or check level first
if logger.isEnabledFor(logging.DEBUG):
    logger.debug("large_object", data=expensive_serialize(obj))

Best Practices

  1. Use consistent field namesuser_id everywhere, not sometimes userId
  2. Log at boundaries — requests in/out, function entry/exit for critical paths
  3. Include correlation IDs — trace requests across services
  4. Don’t log sensitive data — no passwords, tokens, PII
  5. Use appropriate levels — ERROR means something’s broken
  6. Keep messages machine-readableuser_logged_in not User has logged in!
  7. Add units to numeric fieldsduration_ms, size_bytes

Structured logging transforms debugging from archaeology into data analysis. The upfront investment pays off the first time you need to find a needle in a haystack of logs.