Policy as Code: Enforcing Standards with OPA and Gatekeeper

Manual policy enforcement doesn’t scale. Security reviews become bottlenecks. Compliance audits are painful. Policy as code solves this—define policies once, enforce them everywhere, automatically. Open Policy Agent Basics OPA uses Rego, a declarative language for expressing policies. Simple Policy 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # policy/authz.rego package authz default allow = false # Allow if user is admin allow { input.user.role == "admin" } # Allow if user owns the resource allow { input.user.id == input.resource.owner_id } # Allow read access to public resources allow { input.action == "read" input.resource.public == true } Test the Policy 1 2 3 4 5 6 7 8 9 10 # input.json { "user": {"id": "user-123", "role": "member"}, "resource": {"owner_id": "user-123", "public": false}, "action": "read" } # Run OPA opa eval -i input.json -d policy/ "data.authz.allow" # Result: true (user owns the resource) Policy Testing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 # policy/authz_test.rego package authz test_admin_allowed { allow with input as { "user": {"role": "admin"}, "action": "delete", "resource": {"owner_id": "other"} } } test_owner_allowed { allow with input as { "user": {"id": "user-1", "role": "member"}, "action": "update", "resource": {"owner_id": "user-1"} } } test_non_owner_denied { not allow with input as { "user": {"id": "user-1", "role": "member"}, "action": "update", "resource": {"owner_id": "user-2", "public": false} } } 1 2 # Run tests opa test policy/ -v Kubernetes Gatekeeper Enforce policies on Kubernetes resources at admission time. ...

February 12, 2026 Â· 7 min Â· 1333 words Â· Rob Washington

Documentation as Code: Keeping Docs in Sync with Your Systems

Documentation that lives outside your codebase gets stale. Documentation that isn’t tested breaks. Let’s treat docs like code—versioned, automated, and verified. Docs Live with Code Repository Structure p ├ ├ │ │ │ │ │ │ │ ├ └ r ─ ─ ─ ─ o ─ ─ ─ ─ j e s d ├ ├ ├ │ └ m R c r o ─ ─ ─ ─ k E t c c ─ ─ ─ ─ d A / / s o D / g a a └ r ├ └ c M e r p ─ u ─ ─ s E t c i ─ n ─ ─ . . t h / b y m i i o o d i m d n t p o e n l g e e k p c - c n s l i s t a / o d t u p y e a r i m n r e . e t t . y n - e m a t r d d m . e . l m s m d p d o n s e . m d MkDocs Configuration 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # mkdocs.yml site_name: My Project site_url: https://docs.example.com repo_url: https://github.com/org/project theme: name: material features: - navigation.tabs - navigation.sections - search.highlight - content.code.copy plugins: - search - git-revision-date-localized - macros markdown_extensions: - admonition - codehilite - toc: permalink: true - pymdownx.superfences: custom_fences: - name: mermaid class: mermaid format: !!python/name:pymdownx.superfences.fence_code_format nav: - Home: index.md - Getting Started: getting-started.md - Architecture: architecture.md - API Reference: api/ - Runbooks: - Deployment: runbooks/deployment.md - Incidents: runbooks/incident-response.md Auto-Generated API Docs From OpenAPI Spec 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 # docs/api/openapi.yaml openapi: 3.0.3 info: title: My API version: 1.0.0 description: | API for managing resources. ## Authentication All endpoints require Bearer token authentication. paths: /users: get: summary: List users tags: [Users] parameters: - name: limit in: query schema: type: integer default: 20 responses: '200': description: List of users content: application/json: schema: type: array items: $ref: '#/components/schemas/User' components: schemas: User: type: object properties: id: type: string format: uuid email: type: string format: email created_at: type: string format: date-time Generate from Code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 # generate_docs.py from fastapi import FastAPI from fastapi.openapi.utils import get_openapi import json import yaml app = FastAPI() # ... your routes ... def export_openapi(): """Export OpenAPI spec to file.""" openapi_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) # Write JSON with open('docs/api/openapi.json', 'w') as f: json.dump(openapi_schema, f, indent=2) # Write YAML with open('docs/api/openapi.yaml', 'w') as f: yaml.dump(openapi_schema, f, default_flow_style=False) if __name__ == "__main__": export_openapi() Terraform Docs Generation 1 2 3 4 5 # Install terraform-docs brew install terraform-docs # Generate markdown from module terraform-docs markdown table ./modules/vpc > ./modules/vpc/README.md 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # .terraform-docs.yml formatter: markdown table sections: show: - requirements - providers - inputs - outputs - resources output: file: README.md mode: inject template: |- <!-- BEGIN_TF_DOCS --> {{ .Content }} <!-- END_TF_DOCS --> Diagrams as Code Mermaid in Markdown 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # Architecture ```mermaid graph TB subgraph "Public" LB[Load Balancer] end subgraph "Application Tier" API1[API Server 1] API2[API Server 2] end subgraph "Data Tier" DB[(PostgreSQL)] Cache[(Redis)] end LB --> API1 LB --> API2 API1 --> DB API2 --> DB API1 --> Cache API2 --> Cache # ` # f f f f w # ` r r r r i # ` d o o o o t p i m m m m h P y a d l w w d a a y t g d d d d D n b i i n p p t h r i i i i i s t t s i i h o a a a a a a = h h o n m g g g g g = a d c n s r r r r r A C p C b a / a a a a a R L l i l c D a m m m m m o B u u = h l d c i r s s s s ( u ( s = s e b b a a c . . . " t " t t R c g h i a a a P e L e [ e D = h r i m w w w r 5 o r E r S e a t p s s s o 3 a ( C ( ( E m e o . . . d ( d " S " " l a s c r c d n u " A ( D P a p t t o a e c D B p " a o s i u m t t t N a p A t s t r D p a w i S l l P a t i e i u b o o " a i I " g C . a t a r n ) n c ) r a p g e s k c a 1 : e c y r e A e t " S h a i i r r i ) Q e m m i m c " o , L ( , p m p h ) n " " o p o i " E ) R C r o r t ) C e l t r t e : S d u t c ( i s E A t " s t C R L u A " e S D B r P ) r S , e I , " R , 2 E o " l u s ) a t h , s e o t 5 w E i 3 = C C F S a a ( c l " h s A e e P , I f 3 i " l ) e ] n a m e = " d o c s / i m a g e s / a r c h i t e c t u r e " ) : CI/CD for Documentation GitHub Actions 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 # .github/workflows/docs.yml name: Documentation on: push: branches: [main] paths: - 'docs/**' - 'mkdocs.yml' - 'src/**/*.py' # Regenerate if code changes pull_request: paths: - 'docs/**' jobs: build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # For git-revision-date plugin - uses: actions/setup-python@v5 with: python-version: '3.11' - name: Install dependencies run: | pip install mkdocs-material mkdocs-macros-plugin pip install -e . # Install project for API doc generation - name: Generate API docs run: python generate_docs.py - name: Generate Terraform docs run: | terraform-docs markdown table ./terraform > ./docs/terraform.md - name: Build documentation run: mkdocs build --strict - name: Upload artifact uses: actions/upload-pages-artifact@v3 with: path: site/ deploy: if: github.ref == 'refs/heads/main' needs: build runs-on: ubuntu-latest permissions: pages: write id-token: write environment: name: github-pages steps: - uses: actions/deploy-pages@v4 Documentation Testing Link Checking 1 2 3 4 5 6 # In CI - name: Check links uses: lycheeverse/lychee-action@v1 with: args: --verbose --no-progress './docs/**/*.md' fail: true Code Example Testing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # test_docs.py import subprocess import re from pathlib import Path def extract_code_blocks(markdown_file: Path) -> list: """Extract fenced code blocks from markdown.""" content = markdown_file.read_text() pattern = r'```(\w+)\n(.*?)```' return re.findall(pattern, content, re.DOTALL) def test_python_examples(): """Test that Python code examples are valid.""" for doc in Path('docs').rglob('*.md'): for lang, code in extract_code_blocks(doc): if lang == 'python': # Check syntax try: compile(code, doc.name, 'exec') except SyntaxError as e: raise AssertionError(f"Syntax error in {doc}: {e}") def test_bash_examples(): """Test that bash commands are valid syntax.""" for doc in Path('docs').rglob('*.md'): for lang, code in extract_code_blocks(doc): if lang in ('bash', 'shell', 'sh'): result = subprocess.run( ['bash', '-n', '-c', code], capture_output=True ) if result.returncode != 0: raise AssertionError( f"Bash syntax error in {doc}: {result.stderr.decode()}" ) OpenAPI Validation 1 2 3 4 # In CI - name: Validate OpenAPI spec run: | npx @apidevtools/swagger-cli validate docs/api/openapi.yaml README Generation From Template 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # generate_readme.py from jinja2 import Template import subprocess def get_version(): return subprocess.check_output( ['git', 'describe', '--tags', '--always'] ).decode().strip() def get_contributors(): output = subprocess.check_output( ['git', 'shortlog', '-sn', 'HEAD'] ).decode() return [line.split('\t')[1] for line in output.strip().split('\n')] template = Template(''' # {{ name }} {{ description }} ## Installation ```bash pip install {{ package_name }} Quick Start 1 2 3 4 from {{ package_name }} import Client client = Client(api_key="your-key") result = client.do_something() Documentation Full documentation at [{{ docs_url }}]({{ docs_url }}) ...

February 12, 2026 Â· 13 min Â· 2615 words Â· Rob Washington

DNS Management: Infrastructure as Code for Your Domain Records

DNS is the foundation everything else depends on. A misconfigured record can take down your entire infrastructure. Yet DNS is often managed through web consoles with no version control, no review process, and no automation. Let’s fix that. Terraform for DNS Route53 Basics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 # dns.tf resource "aws_route53_zone" "main" { name = "example.com" tags = { Environment = "production" } } # A record resource "aws_route53_record" "www" { zone_id = aws_route53_zone.main.zone_id name = "www.example.com" type = "A" ttl = 300 records = ["203.0.113.10"] } # CNAME record resource "aws_route53_record" "app" { zone_id = aws_route53_zone.main.zone_id name = "app.example.com" type = "CNAME" ttl = 300 records = ["app-lb-123456.us-east-1.elb.amazonaws.com"] } # Alias to ALB (no TTL, resolved at edge) resource "aws_route53_record" "api" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" alias { name = aws_lb.api.dns_name zone_id = aws_lb.api.zone_id evaluate_target_health = true } } # MX records resource "aws_route53_record" "mx" { zone_id = aws_route53_zone.main.zone_id name = "example.com" type = "MX" ttl = 3600 records = [ "10 mail1.example.com", "20 mail2.example.com" ] } # TXT for SPF resource "aws_route53_record" "spf" { zone_id = aws_route53_zone.main.zone_id name = "example.com" type = "TXT" ttl = 3600 records = ["v=spf1 include:_spf.google.com ~all"] } Dynamic Records from Infrastructure 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 # Generate records from other resources locals { services = { "api" = aws_lb.api.dns_name "admin" = aws_lb.admin.dns_name "docs" = aws_cloudfront_distribution.docs.domain_name } } resource "aws_route53_record" "services" { for_each = local.services zone_id = aws_route53_zone.main.zone_id name = "${each.key}.example.com" type = "CNAME" ttl = 300 records = [each.value] } # From Kubernetes ingresses data "kubernetes_ingress_v1" "all" { for_each = toset(["api", "web", "admin"]) metadata { name = each.key namespace = "production" } } resource "aws_route53_record" "k8s_services" { for_each = data.kubernetes_ingress_v1.all zone_id = aws_route53_zone.main.zone_id name = "${each.key}.example.com" type = "CNAME" ttl = 300 records = [each.value.status[0].load_balancer[0].ingress[0].hostname] } DNS Failover Health Check Based Routing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 # Health check for primary resource "aws_route53_health_check" "primary" { fqdn = "primary-api.example.com" port = 443 type = "HTTPS" resource_path = "/health" failure_threshold = 3 request_interval = 30 tags = { Name = "primary-api-health" } } # Health check for secondary resource "aws_route53_health_check" "secondary" { fqdn = "secondary-api.example.com" port = 443 type = "HTTPS" resource_path = "/health" failure_threshold = 3 request_interval = 30 tags = { Name = "secondary-api-health" } } # Primary record with failover resource "aws_route53_record" "api_primary" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.10"] set_identifier = "primary" health_check_id = aws_route53_health_check.primary.id failover_routing_policy { type = "PRIMARY" } } # Secondary record (used when primary fails) resource "aws_route53_record" "api_secondary" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.20"] set_identifier = "secondary" health_check_id = aws_route53_health_check.secondary.id failover_routing_policy { type = "SECONDARY" } } Weighted Routing for Gradual Migration 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 # 90% to current, 10% to new resource "aws_route53_record" "api_current" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.10"] set_identifier = "current" weighted_routing_policy { weight = 90 } } resource "aws_route53_record" "api_new" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.20"] set_identifier = "new" weighted_routing_policy { weight = 10 } } External DNS for Kubernetes Automatically create DNS records from Kubernetes resources. ...

February 12, 2026 Â· 9 min Â· 1841 words Â· Rob Washington

SSL/TLS Automation: Never Manually Renew a Certificate Again

Manual certificate management is a reliability incident waiting to happen. A forgotten renewal, an expired cert at 3 AM, angry customers. Let’s automate this problem away. Certbot: The Foundation Basic Setup 1 2 3 4 5 6 7 8 9 # Install certbot sudo apt install certbot python3-certbot-nginx # Get certificate for nginx sudo certbot --nginx -d example.com -d www.example.com # Auto-renewal is configured automatically # Test it: sudo certbot renew --dry-run Standalone Mode (No Web Server) 1 2 3 4 5 # Stop web server, get cert, restart sudo certbot certonly --standalone -d example.com # Or use DNS challenge (no downtime) sudo certbot certonly --manual --preferred-challenges dns -d example.com Automated Renewal with Hooks 1 2 3 4 5 6 7 8 # /etc/letsencrypt/renewal-hooks/deploy/reload-nginx.sh #!/bin/bash systemctl reload nginx # /etc/letsencrypt/renewal-hooks/post/notify.sh #!/bin/bash curl -X POST https://slack.com/webhook \ -d '{"text":"SSL certificate renewed for '$RENEWED_DOMAINS'"}' cert-manager for Kubernetes The standard for Kubernetes certificate automation. ...

February 12, 2026 Â· 7 min Â· 1454 words Â· Rob Washington

Infrastructure Drift Detection: Keeping Your Terraform in Sync with Reality

You define infrastructure in Terraform. Someone clicks around in the AWS console. Now your code says one thing and reality says another. This is drift, and it will bite you when you least expect it. What Causes Drift? Manual changes — “Just this once” console modifications Emergency fixes — Hotfixes applied directly to production Other tools — Scripts, CLI commands, other IaC tools AWS itself — Auto-scaling, managed service updates Incomplete imports — Resources created outside Terraform Detecting Drift Basic: Terraform Plan 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #!/bin/bash # drift-check.sh set -e # Initialize without backend changes terraform init -input=false # Generate plan terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then echo "✅ No drift detected" elif [ $EXIT_CODE -eq 2 ]; then echo "⚠️ Drift detected! Changes needed:" terraform show tfplan exit 1 else echo "❌ Error running plan" exit 1 fi Automated Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 # drift_detector.py import subprocess import json import os from datetime import datetime from typing import List, Dict class DriftDetector: def __init__(self, workspace_dir: str): self.workspace_dir = workspace_dir def check_drift(self) -> Dict: """Run terraform plan and parse results.""" os.chdir(self.workspace_dir) # Initialize subprocess.run( ["terraform", "init", "-input=false"], capture_output=True, check=True ) # Plan with JSON output result = subprocess.run( ["terraform", "plan", "-json", "-out=tfplan"], capture_output=True, text=True ) changes = self._parse_plan_output(result.stdout) return { "workspace": self.workspace_dir, "timestamp": datetime.utcnow().isoformat(), "has_drift": len(changes) > 0, "changes": changes } def _parse_plan_output(self, output: str) -> List[Dict]: """Parse terraform plan JSON output.""" changes = [] for line in output.strip().split('\n'): try: entry = json.loads(line) if entry.get('@level') == 'info': msg = entry.get('@message', '') # Look for resource changes if 'will be' in msg: changes.append({ 'message': msg, 'type': entry.get('type', 'unknown'), 'change': entry.get('change', {}) }) except json.JSONDecodeError: continue return changes def check_all_workspaces(workspaces: List[str]) -> Dict: """Check drift across multiple Terraform workspaces.""" results = [] for workspace in workspaces: detector = DriftDetector(workspace) try: result = detector.check_drift() results.append(result) except Exception as e: results.append({ "workspace": workspace, "error": str(e), "has_drift": None }) drifted = [r for r in results if r.get('has_drift')] return { "total_workspaces": len(workspaces), "drifted_count": len(drifted), "results": results } # Usage workspaces = [ "/infrastructure/terraform/networking", "/infrastructure/terraform/compute", "/infrastructure/terraform/databases", ] report = check_all_workspaces(workspaces) print(json.dumps(report, indent=2)) CI Pipeline for Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 # .github/workflows/drift-detection.yml name: Drift Detection on: schedule: - cron: '0 */6 * * *' # Every 6 hours workflow_dispatch: jobs: detect-drift: runs-on: ubuntu-latest strategy: matrix: workspace: - networking - compute - databases - monitoring steps: - uses: actions/checkout@v4 - uses: hashicorp/setup-terraform@v3 with: terraform_version: 1.7.0 - name: Configure AWS uses: aws-actions/configure-aws-credentials@v4 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Check Drift id: drift working-directory: terraform/${{ matrix.workspace }} run: | terraform init -input=false set +e terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? set -e if [ $EXIT_CODE -eq 2 ]; then echo "drift_detected=true" >> $GITHUB_OUTPUT echo "### ⚠️ Drift Detected in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY terraform show -no-color tfplan >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY else echo "drift_detected=false" >> $GITHUB_OUTPUT echo "### ✅ No drift in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY fi - name: Alert on Drift if: steps.drift.outputs.drift_detected == 'true' uses: slackapi/slack-github-action@v1 with: payload: | { "text": "⚠️ Infrastructure drift detected in ${{ matrix.workspace }}", "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": "*Infrastructure Drift Detected*\n\nWorkspace: `${{ matrix.workspace }}`\nRun: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" } } ] } env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }} Continuous Reconciliation Atlantis for GitOps 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # atlantis.yaml version: 3 projects: - name: networking dir: terraform/networking autoplan: when_modified: - "*.tf" - "*.tfvars" enabled: true apply_requirements: - approved - mergeable - name: compute dir: terraform/compute autoplan: enabled: true apply_requirements: - approved Drift Auto-Remediation 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 # drift_remediation.py import subprocess import json from enum import Enum from typing import Optional class RemediationStrategy(Enum): ALERT_ONLY = "alert" # Just notify AUTO_PLAN = "plan" # Create PR with plan AUTO_APPLY = "apply" # Automatically apply (dangerous!) REFRESH_ONLY = "refresh" # Update state to match reality class DriftRemediator: def __init__(self, workspace: str, strategy: RemediationStrategy): self.workspace = workspace self.strategy = strategy def remediate(self, drift_report: dict) -> dict: """Remediate detected drift based on strategy.""" if not drift_report.get('has_drift'): return {"action": "none", "reason": "no drift"} if self.strategy == RemediationStrategy.ALERT_ONLY: return self._alert(drift_report) elif self.strategy == RemediationStrategy.REFRESH_ONLY: return self._refresh_state() elif self.strategy == RemediationStrategy.AUTO_PLAN: return self._create_remediation_pr(drift_report) elif self.strategy == RemediationStrategy.AUTO_APPLY: return self._auto_apply() return {"action": "unknown", "error": "invalid strategy"} def _alert(self, drift_report: dict) -> dict: """Send alert without taking action.""" # Send to Slack, PagerDuty, etc. return { "action": "alert", "changes": len(drift_report['changes']) } def _refresh_state(self) -> dict: """Refresh state to match reality (accept drift).""" result = subprocess.run( ["terraform", "apply", "-refresh-only", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "refresh", "success": result.returncode == 0, "output": result.stdout } def _create_remediation_pr(self, drift_report: dict) -> dict: """Create a PR to remediate drift.""" # Generate plan subprocess.run( ["terraform", "plan", "-out=remediation.tfplan"], cwd=self.workspace ) # Create branch and PR (pseudo-code) branch = f"fix/drift-{self.workspace}-{datetime.now().strftime('%Y%m%d')}" # In practice, you'd use GitHub API or similar return { "action": "pr_created", "branch": branch, "changes": len(drift_report['changes']) } def _auto_apply(self) -> dict: """Automatically apply to fix drift. USE WITH CAUTION.""" result = subprocess.run( ["terraform", "apply", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "auto_apply", "success": result.returncode == 0, "output": result.stdout if result.returncode == 0 else result.stderr } # Configuration per workspace WORKSPACE_STRATEGIES = { "networking": RemediationStrategy.ALERT_ONLY, # Critical, manual only "compute": RemediationStrategy.AUTO_PLAN, # Create PRs "monitoring": RemediationStrategy.REFRESH_ONLY, # Accept drift "dev-sandbox": RemediationStrategy.AUTO_APPLY, # Auto-fix okay } State Locking and Protection Prevent Manual Changes 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # terraform.tf terraform { backend "s3" { bucket = "terraform-state-prod" key = "networking/terraform.tfstate" region = "us-east-1" dynamodb_table = "terraform-locks" encrypt = true } } # Prevent destroy of critical resources resource "aws_db_instance" "main" { # ... lifecycle { prevent_destroy = true } } AWS Service Control Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 { "Version": "2012-10-17", "Statement": [ { "Sid": "RequireTerraformTag", "Effect": "Deny", "Action": [ "ec2:RunInstances", "ec2:CreateVpc", "rds:CreateDBInstance" ], "Resource": "*", "Condition": { "Null": { "aws:RequestTag/ManagedBy": "true" } } }, { "Sid": "PreventManualModification", "Effect": "Deny", "Action": [ "ec2:ModifyInstanceAttribute", "ec2:ModifyVpcAttribute" ], "Resource": "*", "Condition": { "StringEquals": { "aws:ResourceTag/ManagedBy": "terraform" }, "StringNotEquals": { "aws:PrincipalTag/Role": "terraform-automation" } } } ] } Drift Metrics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # drift_metrics.py from prometheus_client import Counter, Gauge, Histogram # Metrics drift_detected = Counter( 'terraform_drift_detected_total', 'Number of drift detection events', ['workspace', 'severity'] ) drift_resources = Gauge( 'terraform_drift_resources', 'Number of resources with drift', ['workspace'] ) drift_check_duration = Histogram( 'terraform_drift_check_seconds', 'Time to check for drift', ['workspace'] ) def record_drift_check(workspace: str, result: dict, duration: float): """Record drift check metrics.""" drift_check_duration.labels(workspace=workspace).observe(duration) if result['has_drift']: drift_detected.labels( workspace=workspace, severity='warning' if len(result['changes']) < 5 else 'critical' ).inc() drift_resources.labels(workspace=workspace).set(len(result['changes'])) else: drift_resources.labels(workspace=workspace).set(0) Best Practices Check drift frequently — At least daily, ideally every few hours Alert immediately — Drift compounds; catch it early Lock down production — Use SCPs to prevent manual changes Tag everything — ManagedBy: terraform enables enforcement Use workspaces — Isolate environments, check each independently Automate remediation — PRs for drift fixes, not manual applies Track metrics — Know your drift frequency and sources Quick Start Checklist Set up scheduled drift detection (GitHub Actions, Jenkins, etc.) Configure alerts for detected drift Add ManagedBy tags to all Terraform resources Implement SCPs to prevent manual changes in production Create runbook for drift remediation Track drift metrics over time Drift isn’t just annoying—it’s a reliability risk. Your next terraform apply might have unintended consequences because the state doesn’t match reality. Detect it early, fix it fast, and prevent it where possible. ...

February 11, 2026 Â· 8 min Â· 1606 words Â· Rob Washington

Cloud Cost Optimization: Cutting AWS Bills Without Cutting Corners

Cloud bills grow faster than you’d expect. A few forgotten instances here, oversized databases there, and suddenly you’re spending more on infrastructure than engineering salaries. Let’s fix that. Quick Wins: Find the Waste Identify Unused Resources 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 # find_waste.py import boto3 from datetime import datetime, timedelta ec2 = boto3.client('ec2') cloudwatch = boto3.client('cloudwatch') def find_idle_instances(): """Find EC2 instances with <5% CPU over 7 days.""" instances = ec2.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) idle = [] for reservation in instances['Reservations']: for instance in reservation['Instances']: instance_id = instance['InstanceId'] # Get average CPU over 7 days response = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=datetime.utcnow() - timedelta(days=7), EndTime=datetime.utcnow(), Period=86400, # 1 day Statistics=['Average'] ) if response['Datapoints']: avg_cpu = sum(d['Average'] for d in response['Datapoints']) / len(response['Datapoints']) if avg_cpu < 5: idle.append({ 'instance_id': instance_id, 'type': instance['InstanceType'], 'avg_cpu': round(avg_cpu, 2), 'name': next((t['Value'] for t in instance.get('Tags', []) if t['Key'] == 'Name'), 'unnamed') }) return idle def find_unattached_volumes(): """Find EBS volumes not attached to any instance.""" volumes = ec2.describe_volumes( Filters=[{'Name': 'status', 'Values': ['available']}] ) return [{ 'volume_id': v['VolumeId'], 'size_gb': v['Size'], 'monthly_cost': v['Size'] * 0.10 # gp2 pricing estimate } for v in volumes['Volumes']] def find_old_snapshots(): """Find snapshots older than 90 days.""" snapshots = ec2.describe_snapshots(OwnerIds=['self']) cutoff = datetime.utcnow() - timedelta(days=90) old = [] for snap in snapshots['Snapshots']: if snap['StartTime'].replace(tzinfo=None) < cutoff: old.append({ 'snapshot_id': snap['SnapshotId'], 'size_gb': snap['VolumeSize'], 'age_days': (datetime.utcnow() - snap['StartTime'].replace(tzinfo=None)).days }) return old # Run analysis print("=== Idle Instances ===") for i in find_idle_instances(): print(f" {i['instance_id']} ({i['type']}): {i['avg_cpu']}% avg CPU - {i['name']}") print("\n=== Unattached Volumes ===") for v in find_unattached_volumes(): print(f" {v['volume_id']}: {v['size_gb']}GB = ${v['monthly_cost']:.2f}/mo") print("\n=== Old Snapshots ===") for s in find_old_snapshots(): print(f" {s['snapshot_id']}: {s['size_gb']}GB, {s['age_days']} days old") Automated Cleanup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 # cleanup_automation.py import boto3 from datetime import datetime, timedelta def cleanup_old_amis(days_old: int = 180, dry_run: bool = True): """Deregister AMIs older than threshold and delete their snapshots.""" ec2 = boto3.client('ec2') images = ec2.describe_images(Owners=['self']) cutoff = datetime.utcnow() - timedelta(days=days_old) for image in images['Images']: creation_date = datetime.strptime(image['CreationDate'][:10], '%Y-%m-%d') if creation_date < cutoff: ami_id = image['ImageId'] snapshot_ids = [ bdm['Ebs']['SnapshotId'] for bdm in image.get('BlockDeviceMappings', []) if 'Ebs' in bdm ] if dry_run: print(f"Would delete AMI {ami_id} and snapshots {snapshot_ids}") else: ec2.deregister_image(ImageId=ami_id) for snap_id in snapshot_ids: ec2.delete_snapshot(SnapshotId=snap_id) print(f"Deleted AMI {ami_id} and {len(snapshot_ids)} snapshots") def stop_dev_instances_at_night(): """Stop non-production instances outside business hours.""" ec2 = boto3.resource('ec2') # Find instances tagged Environment=dev|staging instances = ec2.instances.filter( Filters=[ {'Name': 'tag:Environment', 'Values': ['dev', 'staging']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) instance_ids = [i.id for i in instances] if instance_ids: ec2.instances.filter(InstanceIds=instance_ids).stop() print(f"Stopped {len(instance_ids)} dev/staging instances") 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 # Lambda + EventBridge for scheduled cleanup AWSTemplateFormatVersion: '2010-09-09' Resources: StopDevInstancesFunction: Type: AWS::Lambda::Function Properties: FunctionName: stop-dev-instances Runtime: python3.11 Handler: index.handler Timeout: 60 Role: !GetAtt LambdaRole.Arn Code: ZipFile: | import boto3 def handler(event, context): ec2 = boto3.resource('ec2') instances = ec2.instances.filter( Filters=[ {'Name': 'tag:AutoStop', 'Values': ['true']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) ids = [i.id for i in instances] if ids: ec2.instances.filter(InstanceIds=ids).stop() return {'stopped': ids} StopSchedule: Type: AWS::Events::Rule Properties: ScheduleExpression: 'cron(0 22 ? * MON-FRI *)' # 10 PM weekdays Targets: - Id: StopDevInstances Arn: !GetAtt StopDevInstancesFunction.Arn Right-Sizing Analyze and Recommend 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # rightsizing.py import boto3 def get_rightsizing_recommendations(): """Get AWS Compute Optimizer recommendations.""" optimizer = boto3.client('compute-optimizer') response = optimizer.get_ec2_instance_recommendations() savings = [] for rec in response.get('instanceRecommendations', []): current = rec['currentInstanceType'] for option in rec.get('recommendationOptions', []): if option.get('rank') == 1: # Top recommendation recommended = option['instanceType'] monthly_savings = ( rec.get('utilizationMetrics', [{}])[0].get('value', 0) * option.get('projectedUtilizationMetrics', [{}])[0].get('value', 1) ) savings.append({ 'instance_id': rec['instanceArn'].split('/')[-1], 'current': current, 'recommended': recommended, 'finding': rec['finding'], 'estimated_savings': option.get('savingsOpportunity', {}) }) return savings # Generate report for rec in get_rightsizing_recommendations(): print(f"{rec['instance_id']}: {rec['current']} → {rec['recommended']}") print(f" Finding: {rec['finding']}") if rec['estimated_savings']: print(f" Potential savings: ${rec['estimated_savings'].get('estimatedMonthlySavings', {}).get('value', 0):.2f}/mo") Reserved Instances & Savings Plans Analyze Coverage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 # reservation_analysis.py import boto3 from collections import defaultdict def analyze_reservation_coverage(): """Analyze current RI/SP coverage and recommend purchases.""" ce = boto3.client('ce') # Get coverage report response = ce.get_reservation_coverage( TimePeriod={ 'Start': '2026-01-01', 'End': '2026-02-01' }, Granularity='MONTHLY', GroupBy=[ {'Type': 'DIMENSION', 'Key': 'INSTANCE_TYPE'} ] ) recommendations = [] for group in response.get('CoveragesByTime', [{}])[0].get('Groups', []): instance_type = group['Attributes']['instanceType'] coverage = float(group['Coverage']['CoverageHours']['CoverageHoursPercentage']) on_demand_hours = float(group['Coverage']['CoverageHours']['OnDemandHours']) if coverage < 70 and on_demand_hours > 500: recommendations.append({ 'instance_type': instance_type, 'current_coverage': coverage, 'on_demand_hours': on_demand_hours, 'recommendation': 'Consider Reserved Instances' }) return recommendations def get_savings_plan_recommendations(): """Get Savings Plan purchase recommendations.""" ce = boto3.client('ce') response = ce.get_savings_plans_purchase_recommendation( SavingsPlansType='COMPUTE_SP', TermInYears='ONE_YEAR', PaymentOption='NO_UPFRONT', LookbackPeriodInDays='THIRTY_DAYS' ) rec = response.get('SavingsPlansPurchaseRecommendation', {}) return { 'recommended_hourly_commitment': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('RecommendedHourlyCommitment'), 'estimated_monthly_savings': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedMonthlySavingsAmount'), 'estimated_savings_percentage': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedSavingsPercentage') } Spot Instances Spot for Stateless Workloads 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 # Kubernetes with Spot instances via Karpenter apiVersion: karpenter.sh/v1alpha5 kind: Provisioner metadata: name: spot-provisioner spec: requirements: - key: karpenter.sh/capacity-type operator: In values: ["spot"] - key: kubernetes.io/arch operator: In values: ["amd64"] - key: node.kubernetes.io/instance-type operator: In values: ["m5.large", "m5.xlarge", "m5a.large", "m5a.xlarge", "m6i.large"] # Spread across instance types for availability limits: resources: cpu: 1000 # Handle interruptions gracefully ttlSecondsAfterEmpty: 30 ttlSecondsUntilExpired: 2592000 # 30 days --- # Deployment using spot nodes apiVersion: apps/v1 kind: Deployment metadata: name: worker spec: replicas: 10 template: spec: nodeSelector: karpenter.sh/capacity-type: spot # Handle spot interruptions terminationGracePeriodSeconds: 120 containers: - name: worker lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 90"] # Drain gracefully Spot Interruption Handling 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # spot_interruption_handler.py import requests import time import signal import sys METADATA_URL = "http://169.254.169.254/latest/meta-data" def check_spot_interruption(): """Check if this spot instance is being interrupted.""" try: response = requests.get( f"{METADATA_URL}/spot/instance-action", timeout=1 ) if response.status_code == 200: return response.json() except: pass return None def graceful_shutdown(): """Handle graceful shutdown on interruption.""" print("Spot interruption detected, starting graceful shutdown...") # Stop accepting new work # Finish current tasks # Checkpoint state # Clean up sys.exit(0) def monitor_interruption(): """Monitor for spot interruption (2 minute warning).""" while True: interruption = check_spot_interruption() if interruption: print(f"Interruption notice: {interruption}") graceful_shutdown() time.sleep(5) # Run in background thread import threading threading.Thread(target=monitor_interruption, daemon=True).start() Storage Optimization S3 Lifecycle Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 # s3_lifecycle.py import boto3 def configure_lifecycle_rules(bucket_name: str): """Configure intelligent tiering and expiration.""" s3 = boto3.client('s3') lifecycle_config = { 'Rules': [ { 'ID': 'intelligent-tiering', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'Transitions': [ { 'Days': 0, 'StorageClass': 'INTELLIGENT_TIERING' } ] }, { 'ID': 'archive-old-logs', 'Status': 'Enabled', 'Filter': {'Prefix': 'logs/'}, 'Transitions': [ {'Days': 30, 'StorageClass': 'STANDARD_IA'}, {'Days': 90, 'StorageClass': 'GLACIER'}, ], 'Expiration': {'Days': 365} }, { 'ID': 'cleanup-incomplete-uploads', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'AbortIncompleteMultipartUpload': { 'DaysAfterInitiation': 7 } } ] } s3.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration=lifecycle_config ) Cost Monitoring Budget Alerts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # CloudFormation budget AWSTemplateFormatVersion: '2010-09-09' Resources: MonthlyBudget: Type: AWS::Budgets::Budget Properties: Budget: BudgetName: monthly-infrastructure BudgetLimit: Amount: 10000 Unit: USD TimeUnit: MONTHLY BudgetType: COST NotificationsWithSubscribers: - Notification: NotificationType: ACTUAL ComparisonOperator: GREATER_THAN Threshold: 80 Subscribers: - SubscriptionType: EMAIL Address: ops@example.com - Notification: NotificationType: FORECASTED ComparisonOperator: GREATER_THAN Threshold: 100 Subscribers: - SubscriptionType: SNS Address: !Ref AlertTopic Daily Cost Report 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 # daily_cost_report.py import boto3 from datetime import datetime, timedelta def get_daily_costs(): """Get costs broken down by service.""" ce = boto3.client('ce') end = datetime.utcnow().date() start = end - timedelta(days=7) response = ce.get_cost_and_usage( TimePeriod={ 'Start': start.isoformat(), 'End': end.isoformat() }, Granularity='DAILY', Metrics=['UnblendedCost'], GroupBy=[ {'Type': 'DIMENSION', 'Key': 'SERVICE'} ] ) report = [] for day in response['ResultsByTime']: date = day['TimePeriod']['Start'] for group in day['Groups']: service = group['Keys'][0] cost = float(group['Metrics']['UnblendedCost']['Amount']) if cost > 1: # Only show significant costs report.append({ 'date': date, 'service': service, 'cost': cost }) return report # Generate and send report for item in sorted(get_daily_costs(), key=lambda x: -x['cost'])[:10]: print(f"{item['date']} | {item['service']}: ${item['cost']:.2f}") Checklist Enable Cost Explorer and set up budgets Tag all resources for cost allocation Review and act on right-sizing recommendations monthly Implement lifecycle policies for S3 and EBS snapshots Use Spot for fault-tolerant workloads Purchase Savings Plans for baseline compute Stop dev/staging resources outside business hours Clean up unused resources weekly Review Reserved Instance coverage quarterly Cloud cost optimization isn’t a one-time project—it’s an ongoing practice. Automate the easy wins, review monthly, and treat your cloud bill like any other metric that matters. ...

February 11, 2026 Â· 9 min Â· 1709 words Â· Rob Washington

Chaos Engineering: Breaking Your Systems to Make Them Stronger

You don’t know if your system handles failures gracefully until failures happen. Chaos engineering lets you find out on your terms—in controlled conditions, during business hours, with engineers ready to respond. The Principles Define steady state — What does “healthy” look like? Hypothesize — “The system will continue serving traffic if one pod dies” Inject failure — Kill the pod Observe — Did steady state hold? Learn — Fix what broke, update runbooks Start Simple: Kill Things Random Pod Termination 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 # chaos_pod_killer.py import random from kubernetes import client, config from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ChaosPodKiller: def __init__(self, namespace: str, label_selector: str): config.load_incluster_config() # or load_kube_config() for local self.v1 = client.CoreV1Api() self.namespace = namespace self.label_selector = label_selector def get_targets(self) -> list: """Get pods matching selector.""" pods = self.v1.list_namespaced_pod( namespace=self.namespace, label_selector=self.label_selector ) return [pod.metadata.name for pod in pods.items if pod.status.phase == "Running"] def kill_random_pod(self, dry_run: bool = True) -> str: """Kill a random pod from targets.""" targets = self.get_targets() if len(targets) <= 1: logger.warning("Only one pod running, skipping kill") return None victim = random.choice(targets) if dry_run: logger.info(f"DRY RUN: Would delete pod {victim}") else: logger.info(f"Deleting pod {victim}") self.v1.delete_namespaced_pod( name=victim, namespace=self.namespace, grace_period_seconds=0 ) return victim def run_experiment(self, duration_minutes: int = 30, interval_seconds: int = 300): """Run chaos experiment for duration.""" import time end_time = datetime.now().timestamp() + (duration_minutes * 60) logger.info(f"Starting chaos experiment for {duration_minutes} minutes") while datetime.now().timestamp() < end_time: victim = self.kill_random_pod(dry_run=False) if victim: logger.info(f"Killed {victim}, waiting {interval_seconds}s") time.sleep(interval_seconds) logger.info("Chaos experiment completed") # Usage if __name__ == "__main__": killer = ChaosPodKiller( namespace="production", label_selector="app=api-service" ) killer.run_experiment(duration_minutes=30, interval_seconds=300) Kubernetes CronJob for Continuous Chaos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 apiVersion: batch/v1 kind: CronJob metadata: name: chaos-pod-killer namespace: chaos-system spec: schedule: "*/10 9-17 * * 1-5" # Every 10 min, 9-5, weekdays only jobTemplate: spec: template: spec: serviceAccountName: chaos-runner containers: - name: chaos image: chaos-toolkit:latest command: - python - /scripts/chaos_pod_killer.py env: - name: TARGET_NAMESPACE value: "production" - name: LABEL_SELECTOR value: "chaos-enabled=true" restartPolicy: Never Network Chaos Introduce Latency 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # Using Chaos Mesh apiVersion: chaos-mesh.org/v1alpha1 kind: NetworkChaos metadata: name: network-delay namespace: chaos-testing spec: action: delay mode: all selector: namespaces: - production labelSelectors: app: api-service delay: latency: "100ms" jitter: "20ms" correlation: "50" duration: "5m" scheduler: cron: "@every 1h" Packet Loss 1 2 3 4 5 6 7 8 9 10 11 12 13 14 apiVersion: chaos-mesh.org/v1alpha1 kind: NetworkChaos metadata: name: network-loss spec: action: loss mode: one selector: labelSelectors: app: payment-service loss: loss: "10" # 10% packet loss correlation: "50" duration: "2m" Network Partition 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 apiVersion: chaos-mesh.org/v1alpha1 kind: NetworkChaos metadata: name: network-partition spec: action: partition mode: all selector: namespaces: - production labelSelectors: app: api-service direction: both target: selector: namespaces: - production labelSelectors: app: database duration: "30s" Resource Stress CPU Stress 1 2 3 4 5 6 7 8 9 10 11 12 13 14 apiVersion: chaos-mesh.org/v1alpha1 kind: StressChaos metadata: name: cpu-stress spec: mode: one selector: labelSelectors: app: api-service stressors: cpu: workers: 2 load: 80 # 80% CPU usage duration: "5m" Memory Stress 1 2 3 4 5 6 7 8 9 10 11 12 13 14 apiVersion: chaos-mesh.org/v1alpha1 kind: StressChaos metadata: name: memory-stress spec: mode: one selector: labelSelectors: app: api-service stressors: memory: workers: 1 size: "512MB" # Allocate 512MB duration: "3m" Application-Level Chaos HTTP Fault Injection with Istio 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 apiVersion: networking.istio.io/v1beta1 kind: VirtualService metadata: name: api-chaos spec: hosts: - api-service http: - fault: abort: percentage: value: 5 httpStatus: 503 delay: percentage: value: 10 fixedDelay: 2s route: - destination: host: api-service Custom Failure Injection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # chaos_middleware.py import random import time import os from functools import wraps CHAOS_ENABLED = os.getenv("CHAOS_ENABLED", "false").lower() == "true" CHAOS_FAILURE_RATE = float(os.getenv("CHAOS_FAILURE_RATE", "0.0")) CHAOS_LATENCY_MS = int(os.getenv("CHAOS_LATENCY_MS", "0")) def chaos_middleware(f): """Inject chaos into function calls.""" @wraps(f) def wrapper(*args, **kwargs): if not CHAOS_ENABLED: return f(*args, **kwargs) # Random failure if random.random() < CHAOS_FAILURE_RATE: raise Exception("Chaos injection: random failure") # Random latency if CHAOS_LATENCY_MS > 0: delay = random.randint(0, CHAOS_LATENCY_MS) / 1000 time.sleep(delay) return f(*args, **kwargs) return wrapper # Usage @chaos_middleware def call_payment_service(order): return requests.post(PAYMENT_URL, json=order.to_dict()) Experiment Framework 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 # chaos_experiment.py from dataclasses import dataclass from typing import Callable, List import time @dataclass class SteadyStateCheck: name: str check: Callable[[], bool] @dataclass class ChaosExperiment: name: str hypothesis: str steady_state_checks: List[SteadyStateCheck] inject_failure: Callable[[], None] rollback: Callable[[], None] duration_seconds: int = 60 def run_experiment(experiment: ChaosExperiment) -> dict: """Run a chaos experiment with proper controls.""" result = { "name": experiment.name, "hypothesis": experiment.hypothesis, "success": False, "steady_state_before": {}, "steady_state_after": {}, "errors": [] } # 1. Verify steady state before print(f"Checking steady state before experiment...") for check in experiment.steady_state_checks: try: passed = check.check() result["steady_state_before"][check.name] = passed if not passed: result["errors"].append(f"Pre-check failed: {check.name}") return result except Exception as e: result["errors"].append(f"Pre-check error: {check.name}: {e}") return result # 2. Inject failure print(f"Injecting failure: {experiment.name}") try: experiment.inject_failure() except Exception as e: result["errors"].append(f"Injection failed: {e}") experiment.rollback() return result # 3. Wait for duration print(f"Waiting {experiment.duration_seconds}s...") time.sleep(experiment.duration_seconds) # 4. Check steady state during/after print(f"Checking steady state after experiment...") for check in experiment.steady_state_checks: try: passed = check.check() result["steady_state_after"][check.name] = passed except Exception as e: result["steady_state_after"][check.name] = False result["errors"].append(f"Post-check error: {check.name}: {e}") # 5. Rollback print(f"Rolling back...") experiment.rollback() # 6. Evaluate hypothesis result["success"] = all(result["steady_state_after"].values()) return result # Example experiment def check_api_responding(): response = requests.get("http://api-service/health", timeout=5) return response.status_code == 200 def check_error_rate_low(): # Query Prometheus result = prometheus.query('sum(rate(http_requests_total{status=~"5.."}[1m])) / sum(rate(http_requests_total[1m]))') return float(result) < 0.01 def kill_one_api_pod(): killer = ChaosPodKiller("production", "app=api-service") killer.kill_random_pod(dry_run=False) def noop_rollback(): pass # Kubernetes will restart the pod experiment = ChaosExperiment( name="api-pod-failure", hypothesis="API continues serving traffic when one pod dies", steady_state_checks=[ SteadyStateCheck("api_responding", check_api_responding), SteadyStateCheck("error_rate_low", check_error_rate_low), ], inject_failure=kill_one_api_pod, rollback=noop_rollback, duration_seconds=60 ) result = run_experiment(experiment) print(f"Experiment {'PASSED' if result['success'] else 'FAILED'}") Game Days Schedule regular chaos exercises: ...

February 11, 2026 Â· 8 min Â· 1552 words Â· Rob Washington

Incident Response: On-Call Practices That Don't Burn Out Your Team

On-call doesn’t have to mean sleepless nights and burnout. With the right practices, you can maintain reliable systems while keeping your team healthy. Here’s how. Alert Design Only Alert on Actionable Items 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # Bad: Noisy, leads to alert fatigue - alert: HighCPU expr: cpu_usage > 80 for: 1m # Good: Actionable, symptom-based - alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.01 for: 5m labels: severity: warning runbook: "https://runbooks.example.com/high-error-rate" annotations: summary: "Error rate {{ $value | humanizePercentage }} exceeds 1%" impact: "Users experiencing failures" action: "Check application logs and recent deployments" Severity Levels 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # severity-definitions.yaml severities: critical: description: "Service completely down, immediate action required" response_time: "5 minutes" notification: "page + phone call" examples: - "Total service outage" - "Data loss occurring" - "Security breach detected" warning: description: "Degraded service, action needed soon" response_time: "30 minutes" notification: "page" examples: - "Error rate elevated but service functional" - "Approaching resource limits" - "Single replica down" info: description: "Notable event, no immediate action" response_time: "Next business day" notification: "Slack" examples: - "Deployment completed" - "Scheduled maintenance starting" Runbooks Every alert needs a runbook: ...

February 11, 2026 Â· 16 min Â· 3302 words Â· Rob Washington

Observability: Beyond Monitoring with Metrics, Logs, and Traces

Monitoring tells you when something is wrong. Observability helps you understand why. In distributed systems, you can’t predict every failure mode—you need systems that let you ask arbitrary questions about their behavior. The Three Pillars Metrics: What’s Happening Now Numeric time-series data. Fast to query, cheap to store. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from prometheus_client import Counter, Histogram, Gauge, start_http_server # Counter - only goes up requests_total = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'] ) # Histogram - distribution of values request_duration = Histogram( 'http_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint'], buckets=[.01, .05, .1, .25, .5, 1, 2.5, 5, 10] ) # Gauge - can go up or down active_connections = Gauge( 'active_connections', 'Number of active connections' ) # Usage @app.route("/api/<endpoint>") def handle_request(endpoint): active_connections.inc() with request_duration.labels( method=request.method, endpoint=endpoint ).time(): result = process_request() requests_total.labels( method=request.method, endpoint=endpoint, status=200 ).inc() active_connections.dec() return result # Expose metrics endpoint start_http_server(9090) Logs: What Happened Discrete events with context. Rich detail, expensive at scale. ...

February 11, 2026 Â· 7 min Â· 1291 words Â· Rob Washington

The Twelve-Factor App: Building Cloud-Native Applications That Scale

The twelve-factor methodology emerged from Heroku’s experience running millions of apps. These principles create applications that deploy cleanly, scale effortlessly, and minimize divergence between development and production. Let’s walk through each factor with practical examples. 1. Codebase: One Repo, Many Deploys One codebase tracked in version control, many deploys (dev, staging, prod). 1 2 3 4 5 6 7 8 9 # Good: Single repo, branch-based environments main → production staging → staging feature/* → development # Bad: Separate repos for each environment myapp-dev/ myapp-staging/ myapp-prod/ 1 2 3 4 5 # config.py - Same code, different configs import os ENVIRONMENT = os.getenv("ENVIRONMENT", "development") DATABASE_URL = os.getenv("DATABASE_URL") 2. Dependencies: Explicitly Declare and Isolate Never rely on system-wide packages. Declare everything. ...

February 11, 2026 Â· 6 min Â· 1237 words Â· Rob Washington