Feature Flags: The Art of Shipping Code Without Shipping Features

There’s a subtle but powerful distinction in modern software delivery: deployment is not release. Deployment means your code is running in production. Release means your users can see it. Feature flags are the bridge between these two concepts—and mastering them changes how you think about shipping software. The Problem with Traditional Deployment In the old model, deploying code meant releasing features: 1 2 3 # Old way: deploy = release git push origin main # Boom, everyone sees the new feature immediately This creates pressure. You can’t deploy partially-complete work. You can’t test in production with real traffic. And if something breaks, your only option is another deploy to roll back. ...

February 12, 2026 Â· 6 min Â· 1269 words Â· Rob Washington

Documentation as Code: Keeping Docs in Sync with Your Systems

Documentation that lives outside your codebase gets stale. Documentation that isn’t tested breaks. Let’s treat docs like code—versioned, automated, and verified. Docs Live with Code Repository Structure p ├ ├ │ │ │ │ │ │ │ ├ └ r ─ ─ ─ ─ o ─ ─ ─ ─ j e s d ├ ├ ├ │ └ m R c r o ─ ─ ─ ─ k E t c c ─ ─ ─ ─ d A / / s o D / g a a └ r ├ └ c M e r p ─ u ─ ─ s E t c i ─ n ─ ─ . . t h / b y m i i o o d i m d n t p o e n l g e e k p c - c n s l i s t a / o d t u p y e a r i m n r e . e t t . y n - e m a t r d d m . e . l m s m d p d o n s e . m d MkDocs Configuration 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # mkdocs.yml site_name: My Project site_url: https://docs.example.com repo_url: https://github.com/org/project theme: name: material features: - navigation.tabs - navigation.sections - search.highlight - content.code.copy plugins: - search - git-revision-date-localized - macros markdown_extensions: - admonition - codehilite - toc: permalink: true - pymdownx.superfences: custom_fences: - name: mermaid class: mermaid format: !!python/name:pymdownx.superfences.fence_code_format nav: - Home: index.md - Getting Started: getting-started.md - Architecture: architecture.md - API Reference: api/ - Runbooks: - Deployment: runbooks/deployment.md - Incidents: runbooks/incident-response.md Auto-Generated API Docs From OpenAPI Spec 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 # docs/api/openapi.yaml openapi: 3.0.3 info: title: My API version: 1.0.0 description: | API for managing resources. ## Authentication All endpoints require Bearer token authentication. paths: /users: get: summary: List users tags: [Users] parameters: - name: limit in: query schema: type: integer default: 20 responses: '200': description: List of users content: application/json: schema: type: array items: $ref: '#/components/schemas/User' components: schemas: User: type: object properties: id: type: string format: uuid email: type: string format: email created_at: type: string format: date-time Generate from Code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 # generate_docs.py from fastapi import FastAPI from fastapi.openapi.utils import get_openapi import json import yaml app = FastAPI() # ... your routes ... def export_openapi(): """Export OpenAPI spec to file.""" openapi_schema = get_openapi( title=app.title, version=app.version, description=app.description, routes=app.routes, ) # Write JSON with open('docs/api/openapi.json', 'w') as f: json.dump(openapi_schema, f, indent=2) # Write YAML with open('docs/api/openapi.yaml', 'w') as f: yaml.dump(openapi_schema, f, default_flow_style=False) if __name__ == "__main__": export_openapi() Terraform Docs Generation 1 2 3 4 5 # Install terraform-docs brew install terraform-docs # Generate markdown from module terraform-docs markdown table ./modules/vpc > ./modules/vpc/README.md 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # .terraform-docs.yml formatter: markdown table sections: show: - requirements - providers - inputs - outputs - resources output: file: README.md mode: inject template: |- <!-- BEGIN_TF_DOCS --> {{ .Content }} <!-- END_TF_DOCS --> Diagrams as Code Mermaid in Markdown 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # Architecture ```mermaid graph TB subgraph "Public" LB[Load Balancer] end subgraph "Application Tier" API1[API Server 1] API2[API Server 2] end subgraph "Data Tier" DB[(PostgreSQL)] Cache[(Redis)] end LB --> API1 LB --> API2 API1 --> DB API2 --> DB API1 --> Cache API2 --> Cache # ` # f f f f w # ` r r r r i # ` d o o o o t p i m m m m h P y a d l w w d a a y t g d d d d D n b i i n p p t h r i i i i i s t t s i i h o a a a a a a = h h o n m g g g g g = a d c n s r r r r r A C p C b a / a a a a a R L l i l c D a m m m m m o B u u = h l d c i r s s s s ( u ( s = s e b b a a c . . . " t " t t R c g h i a a a P e L e [ e D = h r i m w w w r 5 o r E r S e a t p s s s o 3 a ( C ( ( E m e o . . . d ( d " S " " l a s c r c d n u " A ( D P a p t t o a e c D B p " a o s i u m t t t N a p A t s t r D p a w i S l l P a t i e i u b o o " a i I " g C . a t a r n ) n c ) r a p g e s k c a 1 : e c y r e A e t " S h a i i r r i ) Q e m m i m c " o , L ( , p m p h ) n " " o p o i " E ) R C r o r t ) C e l t r t e : S d u t c ( i s E A t " s t C R L u A " e S D B r P ) r S , e I , " R , 2 E o " l u s ) a t h , s e o t 5 w E i 3 = C C F S a a ( c l " h s A e e P , I f 3 i " l ) e ] n a m e = " d o c s / i m a g e s / a r c h i t e c t u r e " ) : CI/CD for Documentation GitHub Actions 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 # .github/workflows/docs.yml name: Documentation on: push: branches: [main] paths: - 'docs/**' - 'mkdocs.yml' - 'src/**/*.py' # Regenerate if code changes pull_request: paths: - 'docs/**' jobs: build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # For git-revision-date plugin - uses: actions/setup-python@v5 with: python-version: '3.11' - name: Install dependencies run: | pip install mkdocs-material mkdocs-macros-plugin pip install -e . # Install project for API doc generation - name: Generate API docs run: python generate_docs.py - name: Generate Terraform docs run: | terraform-docs markdown table ./terraform > ./docs/terraform.md - name: Build documentation run: mkdocs build --strict - name: Upload artifact uses: actions/upload-pages-artifact@v3 with: path: site/ deploy: if: github.ref == 'refs/heads/main' needs: build runs-on: ubuntu-latest permissions: pages: write id-token: write environment: name: github-pages steps: - uses: actions/deploy-pages@v4 Documentation Testing Link Checking 1 2 3 4 5 6 # In CI - name: Check links uses: lycheeverse/lychee-action@v1 with: args: --verbose --no-progress './docs/**/*.md' fail: true Code Example Testing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # test_docs.py import subprocess import re from pathlib import Path def extract_code_blocks(markdown_file: Path) -> list: """Extract fenced code blocks from markdown.""" content = markdown_file.read_text() pattern = r'```(\w+)\n(.*?)```' return re.findall(pattern, content, re.DOTALL) def test_python_examples(): """Test that Python code examples are valid.""" for doc in Path('docs').rglob('*.md'): for lang, code in extract_code_blocks(doc): if lang == 'python': # Check syntax try: compile(code, doc.name, 'exec') except SyntaxError as e: raise AssertionError(f"Syntax error in {doc}: {e}") def test_bash_examples(): """Test that bash commands are valid syntax.""" for doc in Path('docs').rglob('*.md'): for lang, code in extract_code_blocks(doc): if lang in ('bash', 'shell', 'sh'): result = subprocess.run( ['bash', '-n', '-c', code], capture_output=True ) if result.returncode != 0: raise AssertionError( f"Bash syntax error in {doc}: {result.stderr.decode()}" ) OpenAPI Validation 1 2 3 4 # In CI - name: Validate OpenAPI spec run: | npx @apidevtools/swagger-cli validate docs/api/openapi.yaml README Generation From Template 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # generate_readme.py from jinja2 import Template import subprocess def get_version(): return subprocess.check_output( ['git', 'describe', '--tags', '--always'] ).decode().strip() def get_contributors(): output = subprocess.check_output( ['git', 'shortlog', '-sn', 'HEAD'] ).decode() return [line.split('\t')[1] for line in output.strip().split('\n')] template = Template(''' # {{ name }} {{ description }} ## Installation ```bash pip install {{ package_name }} Quick Start 1 2 3 4 from {{ package_name }} import Client client = Client(api_key="your-key") result = client.do_something() Documentation Full documentation at [{{ docs_url }}]({{ docs_url }}) ...

February 12, 2026 Â· 13 min Â· 2615 words Â· Rob Washington

Infrastructure Drift Detection: Keeping Your Terraform in Sync with Reality

You define infrastructure in Terraform. Someone clicks around in the AWS console. Now your code says one thing and reality says another. This is drift, and it will bite you when you least expect it. What Causes Drift? Manual changes — “Just this once” console modifications Emergency fixes — Hotfixes applied directly to production Other tools — Scripts, CLI commands, other IaC tools AWS itself — Auto-scaling, managed service updates Incomplete imports — Resources created outside Terraform Detecting Drift Basic: Terraform Plan 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #!/bin/bash # drift-check.sh set -e # Initialize without backend changes terraform init -input=false # Generate plan terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then echo "✅ No drift detected" elif [ $EXIT_CODE -eq 2 ]; then echo "⚠️ Drift detected! Changes needed:" terraform show tfplan exit 1 else echo "❌ Error running plan" exit 1 fi Automated Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 # drift_detector.py import subprocess import json import os from datetime import datetime from typing import List, Dict class DriftDetector: def __init__(self, workspace_dir: str): self.workspace_dir = workspace_dir def check_drift(self) -> Dict: """Run terraform plan and parse results.""" os.chdir(self.workspace_dir) # Initialize subprocess.run( ["terraform", "init", "-input=false"], capture_output=True, check=True ) # Plan with JSON output result = subprocess.run( ["terraform", "plan", "-json", "-out=tfplan"], capture_output=True, text=True ) changes = self._parse_plan_output(result.stdout) return { "workspace": self.workspace_dir, "timestamp": datetime.utcnow().isoformat(), "has_drift": len(changes) > 0, "changes": changes } def _parse_plan_output(self, output: str) -> List[Dict]: """Parse terraform plan JSON output.""" changes = [] for line in output.strip().split('\n'): try: entry = json.loads(line) if entry.get('@level') == 'info': msg = entry.get('@message', '') # Look for resource changes if 'will be' in msg: changes.append({ 'message': msg, 'type': entry.get('type', 'unknown'), 'change': entry.get('change', {}) }) except json.JSONDecodeError: continue return changes def check_all_workspaces(workspaces: List[str]) -> Dict: """Check drift across multiple Terraform workspaces.""" results = [] for workspace in workspaces: detector = DriftDetector(workspace) try: result = detector.check_drift() results.append(result) except Exception as e: results.append({ "workspace": workspace, "error": str(e), "has_drift": None }) drifted = [r for r in results if r.get('has_drift')] return { "total_workspaces": len(workspaces), "drifted_count": len(drifted), "results": results } # Usage workspaces = [ "/infrastructure/terraform/networking", "/infrastructure/terraform/compute", "/infrastructure/terraform/databases", ] report = check_all_workspaces(workspaces) print(json.dumps(report, indent=2)) CI Pipeline for Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 # .github/workflows/drift-detection.yml name: Drift Detection on: schedule: - cron: '0 */6 * * *' # Every 6 hours workflow_dispatch: jobs: detect-drift: runs-on: ubuntu-latest strategy: matrix: workspace: - networking - compute - databases - monitoring steps: - uses: actions/checkout@v4 - uses: hashicorp/setup-terraform@v3 with: terraform_version: 1.7.0 - name: Configure AWS uses: aws-actions/configure-aws-credentials@v4 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Check Drift id: drift working-directory: terraform/${{ matrix.workspace }} run: | terraform init -input=false set +e terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? set -e if [ $EXIT_CODE -eq 2 ]; then echo "drift_detected=true" >> $GITHUB_OUTPUT echo "### ⚠️ Drift Detected in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY terraform show -no-color tfplan >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY else echo "drift_detected=false" >> $GITHUB_OUTPUT echo "### ✅ No drift in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY fi - name: Alert on Drift if: steps.drift.outputs.drift_detected == 'true' uses: slackapi/slack-github-action@v1 with: payload: | { "text": "⚠️ Infrastructure drift detected in ${{ matrix.workspace }}", "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": "*Infrastructure Drift Detected*\n\nWorkspace: `${{ matrix.workspace }}`\nRun: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" } } ] } env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }} Continuous Reconciliation Atlantis for GitOps 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # atlantis.yaml version: 3 projects: - name: networking dir: terraform/networking autoplan: when_modified: - "*.tf" - "*.tfvars" enabled: true apply_requirements: - approved - mergeable - name: compute dir: terraform/compute autoplan: enabled: true apply_requirements: - approved Drift Auto-Remediation 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 # drift_remediation.py import subprocess import json from enum import Enum from typing import Optional class RemediationStrategy(Enum): ALERT_ONLY = "alert" # Just notify AUTO_PLAN = "plan" # Create PR with plan AUTO_APPLY = "apply" # Automatically apply (dangerous!) REFRESH_ONLY = "refresh" # Update state to match reality class DriftRemediator: def __init__(self, workspace: str, strategy: RemediationStrategy): self.workspace = workspace self.strategy = strategy def remediate(self, drift_report: dict) -> dict: """Remediate detected drift based on strategy.""" if not drift_report.get('has_drift'): return {"action": "none", "reason": "no drift"} if self.strategy == RemediationStrategy.ALERT_ONLY: return self._alert(drift_report) elif self.strategy == RemediationStrategy.REFRESH_ONLY: return self._refresh_state() elif self.strategy == RemediationStrategy.AUTO_PLAN: return self._create_remediation_pr(drift_report) elif self.strategy == RemediationStrategy.AUTO_APPLY: return self._auto_apply() return {"action": "unknown", "error": "invalid strategy"} def _alert(self, drift_report: dict) -> dict: """Send alert without taking action.""" # Send to Slack, PagerDuty, etc. return { "action": "alert", "changes": len(drift_report['changes']) } def _refresh_state(self) -> dict: """Refresh state to match reality (accept drift).""" result = subprocess.run( ["terraform", "apply", "-refresh-only", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "refresh", "success": result.returncode == 0, "output": result.stdout } def _create_remediation_pr(self, drift_report: dict) -> dict: """Create a PR to remediate drift.""" # Generate plan subprocess.run( ["terraform", "plan", "-out=remediation.tfplan"], cwd=self.workspace ) # Create branch and PR (pseudo-code) branch = f"fix/drift-{self.workspace}-{datetime.now().strftime('%Y%m%d')}" # In practice, you'd use GitHub API or similar return { "action": "pr_created", "branch": branch, "changes": len(drift_report['changes']) } def _auto_apply(self) -> dict: """Automatically apply to fix drift. USE WITH CAUTION.""" result = subprocess.run( ["terraform", "apply", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "auto_apply", "success": result.returncode == 0, "output": result.stdout if result.returncode == 0 else result.stderr } # Configuration per workspace WORKSPACE_STRATEGIES = { "networking": RemediationStrategy.ALERT_ONLY, # Critical, manual only "compute": RemediationStrategy.AUTO_PLAN, # Create PRs "monitoring": RemediationStrategy.REFRESH_ONLY, # Accept drift "dev-sandbox": RemediationStrategy.AUTO_APPLY, # Auto-fix okay } State Locking and Protection Prevent Manual Changes 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # terraform.tf terraform { backend "s3" { bucket = "terraform-state-prod" key = "networking/terraform.tfstate" region = "us-east-1" dynamodb_table = "terraform-locks" encrypt = true } } # Prevent destroy of critical resources resource "aws_db_instance" "main" { # ... lifecycle { prevent_destroy = true } } AWS Service Control Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 { "Version": "2012-10-17", "Statement": [ { "Sid": "RequireTerraformTag", "Effect": "Deny", "Action": [ "ec2:RunInstances", "ec2:CreateVpc", "rds:CreateDBInstance" ], "Resource": "*", "Condition": { "Null": { "aws:RequestTag/ManagedBy": "true" } } }, { "Sid": "PreventManualModification", "Effect": "Deny", "Action": [ "ec2:ModifyInstanceAttribute", "ec2:ModifyVpcAttribute" ], "Resource": "*", "Condition": { "StringEquals": { "aws:ResourceTag/ManagedBy": "terraform" }, "StringNotEquals": { "aws:PrincipalTag/Role": "terraform-automation" } } } ] } Drift Metrics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # drift_metrics.py from prometheus_client import Counter, Gauge, Histogram # Metrics drift_detected = Counter( 'terraform_drift_detected_total', 'Number of drift detection events', ['workspace', 'severity'] ) drift_resources = Gauge( 'terraform_drift_resources', 'Number of resources with drift', ['workspace'] ) drift_check_duration = Histogram( 'terraform_drift_check_seconds', 'Time to check for drift', ['workspace'] ) def record_drift_check(workspace: str, result: dict, duration: float): """Record drift check metrics.""" drift_check_duration.labels(workspace=workspace).observe(duration) if result['has_drift']: drift_detected.labels( workspace=workspace, severity='warning' if len(result['changes']) < 5 else 'critical' ).inc() drift_resources.labels(workspace=workspace).set(len(result['changes'])) else: drift_resources.labels(workspace=workspace).set(0) Best Practices Check drift frequently — At least daily, ideally every few hours Alert immediately — Drift compounds; catch it early Lock down production — Use SCPs to prevent manual changes Tag everything — ManagedBy: terraform enables enforcement Use workspaces — Isolate environments, check each independently Automate remediation — PRs for drift fixes, not manual applies Track metrics — Know your drift frequency and sources Quick Start Checklist Set up scheduled drift detection (GitHub Actions, Jenkins, etc.) Configure alerts for detected drift Add ManagedBy tags to all Terraform resources Implement SCPs to prevent manual changes in production Create runbook for drift remediation Track drift metrics over time Drift isn’t just annoying—it’s a reliability risk. Your next terraform apply might have unintended consequences because the state doesn’t match reality. Detect it early, fix it fast, and prevent it where possible. ...

February 11, 2026 Â· 8 min Â· 1606 words Â· Rob Washington

Cloud Cost Optimization: Cutting AWS Bills Without Cutting Corners

Cloud bills grow faster than you’d expect. A few forgotten instances here, oversized databases there, and suddenly you’re spending more on infrastructure than engineering salaries. Let’s fix that. Quick Wins: Find the Waste Identify Unused Resources 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 # find_waste.py import boto3 from datetime import datetime, timedelta ec2 = boto3.client('ec2') cloudwatch = boto3.client('cloudwatch') def find_idle_instances(): """Find EC2 instances with <5% CPU over 7 days.""" instances = ec2.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) idle = [] for reservation in instances['Reservations']: for instance in reservation['Instances']: instance_id = instance['InstanceId'] # Get average CPU over 7 days response = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=datetime.utcnow() - timedelta(days=7), EndTime=datetime.utcnow(), Period=86400, # 1 day Statistics=['Average'] ) if response['Datapoints']: avg_cpu = sum(d['Average'] for d in response['Datapoints']) / len(response['Datapoints']) if avg_cpu < 5: idle.append({ 'instance_id': instance_id, 'type': instance['InstanceType'], 'avg_cpu': round(avg_cpu, 2), 'name': next((t['Value'] for t in instance.get('Tags', []) if t['Key'] == 'Name'), 'unnamed') }) return idle def find_unattached_volumes(): """Find EBS volumes not attached to any instance.""" volumes = ec2.describe_volumes( Filters=[{'Name': 'status', 'Values': ['available']}] ) return [{ 'volume_id': v['VolumeId'], 'size_gb': v['Size'], 'monthly_cost': v['Size'] * 0.10 # gp2 pricing estimate } for v in volumes['Volumes']] def find_old_snapshots(): """Find snapshots older than 90 days.""" snapshots = ec2.describe_snapshots(OwnerIds=['self']) cutoff = datetime.utcnow() - timedelta(days=90) old = [] for snap in snapshots['Snapshots']: if snap['StartTime'].replace(tzinfo=None) < cutoff: old.append({ 'snapshot_id': snap['SnapshotId'], 'size_gb': snap['VolumeSize'], 'age_days': (datetime.utcnow() - snap['StartTime'].replace(tzinfo=None)).days }) return old # Run analysis print("=== Idle Instances ===") for i in find_idle_instances(): print(f" {i['instance_id']} ({i['type']}): {i['avg_cpu']}% avg CPU - {i['name']}") print("\n=== Unattached Volumes ===") for v in find_unattached_volumes(): print(f" {v['volume_id']}: {v['size_gb']}GB = ${v['monthly_cost']:.2f}/mo") print("\n=== Old Snapshots ===") for s in find_old_snapshots(): print(f" {s['snapshot_id']}: {s['size_gb']}GB, {s['age_days']} days old") Automated Cleanup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 # cleanup_automation.py import boto3 from datetime import datetime, timedelta def cleanup_old_amis(days_old: int = 180, dry_run: bool = True): """Deregister AMIs older than threshold and delete their snapshots.""" ec2 = boto3.client('ec2') images = ec2.describe_images(Owners=['self']) cutoff = datetime.utcnow() - timedelta(days=days_old) for image in images['Images']: creation_date = datetime.strptime(image['CreationDate'][:10], '%Y-%m-%d') if creation_date < cutoff: ami_id = image['ImageId'] snapshot_ids = [ bdm['Ebs']['SnapshotId'] for bdm in image.get('BlockDeviceMappings', []) if 'Ebs' in bdm ] if dry_run: print(f"Would delete AMI {ami_id} and snapshots {snapshot_ids}") else: ec2.deregister_image(ImageId=ami_id) for snap_id in snapshot_ids: ec2.delete_snapshot(SnapshotId=snap_id) print(f"Deleted AMI {ami_id} and {len(snapshot_ids)} snapshots") def stop_dev_instances_at_night(): """Stop non-production instances outside business hours.""" ec2 = boto3.resource('ec2') # Find instances tagged Environment=dev|staging instances = ec2.instances.filter( Filters=[ {'Name': 'tag:Environment', 'Values': ['dev', 'staging']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) instance_ids = [i.id for i in instances] if instance_ids: ec2.instances.filter(InstanceIds=instance_ids).stop() print(f"Stopped {len(instance_ids)} dev/staging instances") 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 # Lambda + EventBridge for scheduled cleanup AWSTemplateFormatVersion: '2010-09-09' Resources: StopDevInstancesFunction: Type: AWS::Lambda::Function Properties: FunctionName: stop-dev-instances Runtime: python3.11 Handler: index.handler Timeout: 60 Role: !GetAtt LambdaRole.Arn Code: ZipFile: | import boto3 def handler(event, context): ec2 = boto3.resource('ec2') instances = ec2.instances.filter( Filters=[ {'Name': 'tag:AutoStop', 'Values': ['true']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) ids = [i.id for i in instances] if ids: ec2.instances.filter(InstanceIds=ids).stop() return {'stopped': ids} StopSchedule: Type: AWS::Events::Rule Properties: ScheduleExpression: 'cron(0 22 ? * MON-FRI *)' # 10 PM weekdays Targets: - Id: StopDevInstances Arn: !GetAtt StopDevInstancesFunction.Arn Right-Sizing Analyze and Recommend 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # rightsizing.py import boto3 def get_rightsizing_recommendations(): """Get AWS Compute Optimizer recommendations.""" optimizer = boto3.client('compute-optimizer') response = optimizer.get_ec2_instance_recommendations() savings = [] for rec in response.get('instanceRecommendations', []): current = rec['currentInstanceType'] for option in rec.get('recommendationOptions', []): if option.get('rank') == 1: # Top recommendation recommended = option['instanceType'] monthly_savings = ( rec.get('utilizationMetrics', [{}])[0].get('value', 0) * option.get('projectedUtilizationMetrics', [{}])[0].get('value', 1) ) savings.append({ 'instance_id': rec['instanceArn'].split('/')[-1], 'current': current, 'recommended': recommended, 'finding': rec['finding'], 'estimated_savings': option.get('savingsOpportunity', {}) }) return savings # Generate report for rec in get_rightsizing_recommendations(): print(f"{rec['instance_id']}: {rec['current']} → {rec['recommended']}") print(f" Finding: {rec['finding']}") if rec['estimated_savings']: print(f" Potential savings: ${rec['estimated_savings'].get('estimatedMonthlySavings', {}).get('value', 0):.2f}/mo") Reserved Instances & Savings Plans Analyze Coverage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 # reservation_analysis.py import boto3 from collections import defaultdict def analyze_reservation_coverage(): """Analyze current RI/SP coverage and recommend purchases.""" ce = boto3.client('ce') # Get coverage report response = ce.get_reservation_coverage( TimePeriod={ 'Start': '2026-01-01', 'End': '2026-02-01' }, Granularity='MONTHLY', GroupBy=[ {'Type': 'DIMENSION', 'Key': 'INSTANCE_TYPE'} ] ) recommendations = [] for group in response.get('CoveragesByTime', [{}])[0].get('Groups', []): instance_type = group['Attributes']['instanceType'] coverage = float(group['Coverage']['CoverageHours']['CoverageHoursPercentage']) on_demand_hours = float(group['Coverage']['CoverageHours']['OnDemandHours']) if coverage < 70 and on_demand_hours > 500: recommendations.append({ 'instance_type': instance_type, 'current_coverage': coverage, 'on_demand_hours': on_demand_hours, 'recommendation': 'Consider Reserved Instances' }) return recommendations def get_savings_plan_recommendations(): """Get Savings Plan purchase recommendations.""" ce = boto3.client('ce') response = ce.get_savings_plans_purchase_recommendation( SavingsPlansType='COMPUTE_SP', TermInYears='ONE_YEAR', PaymentOption='NO_UPFRONT', LookbackPeriodInDays='THIRTY_DAYS' ) rec = response.get('SavingsPlansPurchaseRecommendation', {}) return { 'recommended_hourly_commitment': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('RecommendedHourlyCommitment'), 'estimated_monthly_savings': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedMonthlySavingsAmount'), 'estimated_savings_percentage': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedSavingsPercentage') } Spot Instances Spot for Stateless Workloads 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 # Kubernetes with Spot instances via Karpenter apiVersion: karpenter.sh/v1alpha5 kind: Provisioner metadata: name: spot-provisioner spec: requirements: - key: karpenter.sh/capacity-type operator: In values: ["spot"] - key: kubernetes.io/arch operator: In values: ["amd64"] - key: node.kubernetes.io/instance-type operator: In values: ["m5.large", "m5.xlarge", "m5a.large", "m5a.xlarge", "m6i.large"] # Spread across instance types for availability limits: resources: cpu: 1000 # Handle interruptions gracefully ttlSecondsAfterEmpty: 30 ttlSecondsUntilExpired: 2592000 # 30 days --- # Deployment using spot nodes apiVersion: apps/v1 kind: Deployment metadata: name: worker spec: replicas: 10 template: spec: nodeSelector: karpenter.sh/capacity-type: spot # Handle spot interruptions terminationGracePeriodSeconds: 120 containers: - name: worker lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 90"] # Drain gracefully Spot Interruption Handling 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # spot_interruption_handler.py import requests import time import signal import sys METADATA_URL = "http://169.254.169.254/latest/meta-data" def check_spot_interruption(): """Check if this spot instance is being interrupted.""" try: response = requests.get( f"{METADATA_URL}/spot/instance-action", timeout=1 ) if response.status_code == 200: return response.json() except: pass return None def graceful_shutdown(): """Handle graceful shutdown on interruption.""" print("Spot interruption detected, starting graceful shutdown...") # Stop accepting new work # Finish current tasks # Checkpoint state # Clean up sys.exit(0) def monitor_interruption(): """Monitor for spot interruption (2 minute warning).""" while True: interruption = check_spot_interruption() if interruption: print(f"Interruption notice: {interruption}") graceful_shutdown() time.sleep(5) # Run in background thread import threading threading.Thread(target=monitor_interruption, daemon=True).start() Storage Optimization S3 Lifecycle Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 # s3_lifecycle.py import boto3 def configure_lifecycle_rules(bucket_name: str): """Configure intelligent tiering and expiration.""" s3 = boto3.client('s3') lifecycle_config = { 'Rules': [ { 'ID': 'intelligent-tiering', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'Transitions': [ { 'Days': 0, 'StorageClass': 'INTELLIGENT_TIERING' } ] }, { 'ID': 'archive-old-logs', 'Status': 'Enabled', 'Filter': {'Prefix': 'logs/'}, 'Transitions': [ {'Days': 30, 'StorageClass': 'STANDARD_IA'}, {'Days': 90, 'StorageClass': 'GLACIER'}, ], 'Expiration': {'Days': 365} }, { 'ID': 'cleanup-incomplete-uploads', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'AbortIncompleteMultipartUpload': { 'DaysAfterInitiation': 7 } } ] } s3.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration=lifecycle_config ) Cost Monitoring Budget Alerts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # CloudFormation budget AWSTemplateFormatVersion: '2010-09-09' Resources: MonthlyBudget: Type: AWS::Budgets::Budget Properties: Budget: BudgetName: monthly-infrastructure BudgetLimit: Amount: 10000 Unit: USD TimeUnit: MONTHLY BudgetType: COST NotificationsWithSubscribers: - Notification: NotificationType: ACTUAL ComparisonOperator: GREATER_THAN Threshold: 80 Subscribers: - SubscriptionType: EMAIL Address: ops@example.com - Notification: NotificationType: FORECASTED ComparisonOperator: GREATER_THAN Threshold: 100 Subscribers: - SubscriptionType: SNS Address: !Ref AlertTopic Daily Cost Report 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 # daily_cost_report.py import boto3 from datetime import datetime, timedelta def get_daily_costs(): """Get costs broken down by service.""" ce = boto3.client('ce') end = datetime.utcnow().date() start = end - timedelta(days=7) response = ce.get_cost_and_usage( TimePeriod={ 'Start': start.isoformat(), 'End': end.isoformat() }, Granularity='DAILY', Metrics=['UnblendedCost'], GroupBy=[ {'Type': 'DIMENSION', 'Key': 'SERVICE'} ] ) report = [] for day in response['ResultsByTime']: date = day['TimePeriod']['Start'] for group in day['Groups']: service = group['Keys'][0] cost = float(group['Metrics']['UnblendedCost']['Amount']) if cost > 1: # Only show significant costs report.append({ 'date': date, 'service': service, 'cost': cost }) return report # Generate and send report for item in sorted(get_daily_costs(), key=lambda x: -x['cost'])[:10]: print(f"{item['date']} | {item['service']}: ${item['cost']:.2f}") Checklist Enable Cost Explorer and set up budgets Tag all resources for cost allocation Review and act on right-sizing recommendations monthly Implement lifecycle policies for S3 and EBS snapshots Use Spot for fault-tolerant workloads Purchase Savings Plans for baseline compute Stop dev/staging resources outside business hours Clean up unused resources weekly Review Reserved Instance coverage quarterly Cloud cost optimization isn’t a one-time project—it’s an ongoing practice. Automate the easy wins, review monthly, and treat your cloud bill like any other metric that matters. ...

February 11, 2026 Â· 9 min Â· 1709 words Â· Rob Washington

Incident Response: On-Call Practices That Don't Burn Out Your Team

On-call doesn’t have to mean sleepless nights and burnout. With the right practices, you can maintain reliable systems while keeping your team healthy. Here’s how. Alert Design Only Alert on Actionable Items 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # Bad: Noisy, leads to alert fatigue - alert: HighCPU expr: cpu_usage > 80 for: 1m # Good: Actionable, symptom-based - alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.01 for: 5m labels: severity: warning runbook: "https://runbooks.example.com/high-error-rate" annotations: summary: "Error rate {{ $value | humanizePercentage }} exceeds 1%" impact: "Users experiencing failures" action: "Check application logs and recent deployments" Severity Levels 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # severity-definitions.yaml severities: critical: description: "Service completely down, immediate action required" response_time: "5 minutes" notification: "page + phone call" examples: - "Total service outage" - "Data loss occurring" - "Security breach detected" warning: description: "Degraded service, action needed soon" response_time: "30 minutes" notification: "page" examples: - "Error rate elevated but service functional" - "Approaching resource limits" - "Single replica down" info: description: "Notable event, no immediate action" response_time: "Next business day" notification: "Slack" examples: - "Deployment completed" - "Scheduled maintenance starting" Runbooks Every alert needs a runbook: ...

February 11, 2026 Â· 16 min Â· 3302 words Â· Rob Washington

Observability: Beyond Monitoring with Metrics, Logs, and Traces

Monitoring tells you when something is wrong. Observability helps you understand why. In distributed systems, you can’t predict every failure mode—you need systems that let you ask arbitrary questions about their behavior. The Three Pillars Metrics: What’s Happening Now Numeric time-series data. Fast to query, cheap to store. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from prometheus_client import Counter, Histogram, Gauge, start_http_server # Counter - only goes up requests_total = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'] ) # Histogram - distribution of values request_duration = Histogram( 'http_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint'], buckets=[.01, .05, .1, .25, .5, 1, 2.5, 5, 10] ) # Gauge - can go up or down active_connections = Gauge( 'active_connections', 'Number of active connections' ) # Usage @app.route("/api/<endpoint>") def handle_request(endpoint): active_connections.inc() with request_duration.labels( method=request.method, endpoint=endpoint ).time(): result = process_request() requests_total.labels( method=request.method, endpoint=endpoint, status=200 ).inc() active_connections.dec() return result # Expose metrics endpoint start_http_server(9090) Logs: What Happened Discrete events with context. Rich detail, expensive at scale. ...

February 11, 2026 Â· 7 min Â· 1291 words Â· Rob Washington

The Twelve-Factor App: Building Cloud-Native Applications That Scale

The twelve-factor methodology emerged from Heroku’s experience running millions of apps. These principles create applications that deploy cleanly, scale effortlessly, and minimize divergence between development and production. Let’s walk through each factor with practical examples. 1. Codebase: One Repo, Many Deploys One codebase tracked in version control, many deploys (dev, staging, prod). 1 2 3 4 5 6 7 8 9 # Good: Single repo, branch-based environments main → production staging → staging feature/* → development # Bad: Separate repos for each environment myapp-dev/ myapp-staging/ myapp-prod/ 1 2 3 4 5 # config.py - Same code, different configs import os ENVIRONMENT = os.getenv("ENVIRONMENT", "development") DATABASE_URL = os.getenv("DATABASE_URL") 2. Dependencies: Explicitly Declare and Isolate Never rely on system-wide packages. Declare everything. ...

February 11, 2026 Â· 6 min Â· 1237 words Â· Rob Washington

Testing Infrastructure: From Terraform Plans to Production Validation

Infrastructure code deserves the same testing rigor as application code. A typo in Terraform can delete a database. An untested Ansible role can break production. Let’s build confidence with proper testing. The Testing Pyramid for Infrastructure E ( I ( 2 F n R E u t e U l e a n ( T l g l i S e r t t s s a c a t t t l T t s a i o e i c o u s c k n d t s a d T r n e e e a p s s l l t o y o s u s y r i m c s e e , n s t ) p ) l a n v a l i d a t i o n ) Unit Testing: Static Analysis Terraform Validation 1 2 3 4 5 6 7 8 # Built-in validation terraform init terraform validate terraform fmt -check # Custom validation rules terraform plan -out=tfplan terraform show -json tfplan > plan.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # tests/test_terraform_plan.py import json import pytest @pytest.fixture def plan(): with open('plan.json') as f: return json.load(f) def test_no_resources_destroyed(plan): """Ensure no resources are being destroyed.""" changes = plan.get('resource_changes', []) destroyed = [c for c in changes if 'delete' in c.get('change', {}).get('actions', [])] assert len(destroyed) == 0, f"Resources being destroyed: {[d['address'] for d in destroyed]}" def test_no_public_s3_buckets(plan): """Ensure S3 buckets aren't public.""" changes = plan.get('resource_changes', []) for change in changes: if change['type'] == 'aws_s3_bucket': after = change.get('change', {}).get('after', {}) acl = after.get('acl', 'private') assert acl == 'private', f"Bucket {change['address']} has public ACL: {acl}" def test_instances_have_tags(plan): """Ensure EC2 instances have required tags.""" required_tags = {'Environment', 'Owner', 'Project'} changes = plan.get('resource_changes', []) for change in changes: if change['type'] == 'aws_instance': after = change.get('change', {}).get('after', {}) tags = set(after.get('tags', {}).keys()) missing = required_tags - tags assert not missing, f"Instance {change['address']} missing tags: {missing}" Policy as Code with OPA 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # policy/terraform.rego package terraform deny[msg] { resource := input.resource_changes[_] resource.type == "aws_security_group_rule" resource.change.after.cidr_blocks[_] == "0.0.0.0/0" resource.change.after.from_port == 22 msg := sprintf("SSH open to world in %v", [resource.address]) } deny[msg] { resource := input.resource_changes[_] resource.type == "aws_db_instance" resource.change.after.publicly_accessible == true msg := sprintf("RDS instance %v is publicly accessible", [resource.address]) } 1 2 # Run OPA checks terraform show -json tfplan | opa eval -i - -d policy/ "data.terraform.deny" Integration Testing with Terratest Terratest deploys real infrastructure, validates it, then tears it down: ...

February 11, 2026 Â· 7 min Â· 1358 words Â· Rob Washington

Container Security: Hardening Your Docker Images and Kubernetes Deployments

Containers aren’t inherently secure. A default Docker image runs as root, includes unnecessary packages, and exposes more attack surface than needed. Let’s fix that. Secure Dockerfile Practices Use Minimal Base Images 1 2 3 4 5 6 7 8 # BAD: Full OS with unnecessary packages FROM ubuntu:latest # BETTER: Slim variant FROM python:3.11-slim # BEST: Distroless (no shell, no package manager) FROM gcr.io/distroless/python3-debian11 Size comparison: ubuntu:latest: ~77MB python:3.11-slim: ~45MB distroless/python3: ~16MB Smaller image = smaller attack surface. ...

February 11, 2026 Â· 5 min Â· 1030 words Â· Rob Washington

Structured Logging: Stop Grepping, Start Querying

Unstructured logs are a liability. When your application writes User 12345 logged in from 192.168.1.1, you’re creating text that’s easy to read but impossible to query at scale. Structured logging changes the game: logs become data you can filter, aggregate, and analyze. The Problem with Text Logs 1 2 3 4 # Traditional logging import logging logging.info(f"User {user_id} logged in from {ip_address}") # Output: INFO:root:User 12345 logged in from 192.168.1.1 Want to find all logins from a specific IP? You need regex. Want to count logins per user? Good luck. Want to correlate with other events? Hope your timestamp parsing is solid. ...

February 11, 2026 Â· 6 min Â· 1083 words Â· Rob Washington