DNS Management: Infrastructure as Code for Your Domain Records

DNS is the foundation everything else depends on. A misconfigured record can take down your entire infrastructure. Yet DNS is often managed through web consoles with no version control, no review process, and no automation. Let’s fix that. Terraform for DNS Route53 Basics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 # dns.tf resource "aws_route53_zone" "main" { name = "example.com" tags = { Environment = "production" } } # A record resource "aws_route53_record" "www" { zone_id = aws_route53_zone.main.zone_id name = "www.example.com" type = "A" ttl = 300 records = ["203.0.113.10"] } # CNAME record resource "aws_route53_record" "app" { zone_id = aws_route53_zone.main.zone_id name = "app.example.com" type = "CNAME" ttl = 300 records = ["app-lb-123456.us-east-1.elb.amazonaws.com"] } # Alias to ALB (no TTL, resolved at edge) resource "aws_route53_record" "api" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" alias { name = aws_lb.api.dns_name zone_id = aws_lb.api.zone_id evaluate_target_health = true } } # MX records resource "aws_route53_record" "mx" { zone_id = aws_route53_zone.main.zone_id name = "example.com" type = "MX" ttl = 3600 records = [ "10 mail1.example.com", "20 mail2.example.com" ] } # TXT for SPF resource "aws_route53_record" "spf" { zone_id = aws_route53_zone.main.zone_id name = "example.com" type = "TXT" ttl = 3600 records = ["v=spf1 include:_spf.google.com ~all"] } Dynamic Records from Infrastructure 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 # Generate records from other resources locals { services = { "api" = aws_lb.api.dns_name "admin" = aws_lb.admin.dns_name "docs" = aws_cloudfront_distribution.docs.domain_name } } resource "aws_route53_record" "services" { for_each = local.services zone_id = aws_route53_zone.main.zone_id name = "${each.key}.example.com" type = "CNAME" ttl = 300 records = [each.value] } # From Kubernetes ingresses data "kubernetes_ingress_v1" "all" { for_each = toset(["api", "web", "admin"]) metadata { name = each.key namespace = "production" } } resource "aws_route53_record" "k8s_services" { for_each = data.kubernetes_ingress_v1.all zone_id = aws_route53_zone.main.zone_id name = "${each.key}.example.com" type = "CNAME" ttl = 300 records = [each.value.status[0].load_balancer[0].ingress[0].hostname] } DNS Failover Health Check Based Routing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 # Health check for primary resource "aws_route53_health_check" "primary" { fqdn = "primary-api.example.com" port = 443 type = "HTTPS" resource_path = "/health" failure_threshold = 3 request_interval = 30 tags = { Name = "primary-api-health" } } # Health check for secondary resource "aws_route53_health_check" "secondary" { fqdn = "secondary-api.example.com" port = 443 type = "HTTPS" resource_path = "/health" failure_threshold = 3 request_interval = 30 tags = { Name = "secondary-api-health" } } # Primary record with failover resource "aws_route53_record" "api_primary" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.10"] set_identifier = "primary" health_check_id = aws_route53_health_check.primary.id failover_routing_policy { type = "PRIMARY" } } # Secondary record (used when primary fails) resource "aws_route53_record" "api_secondary" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.20"] set_identifier = "secondary" health_check_id = aws_route53_health_check.secondary.id failover_routing_policy { type = "SECONDARY" } } Weighted Routing for Gradual Migration 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 # 90% to current, 10% to new resource "aws_route53_record" "api_current" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.10"] set_identifier = "current" weighted_routing_policy { weight = 90 } } resource "aws_route53_record" "api_new" { zone_id = aws_route53_zone.main.zone_id name = "api.example.com" type = "A" ttl = 60 records = ["203.0.113.20"] set_identifier = "new" weighted_routing_policy { weight = 10 } } External DNS for Kubernetes Automatically create DNS records from Kubernetes resources. ...

February 12, 2026 Â· 9 min Â· 1841 words Â· Rob Washington

SSL/TLS Automation: Never Manually Renew a Certificate Again

Manual certificate management is a reliability incident waiting to happen. A forgotten renewal, an expired cert at 3 AM, angry customers. Let’s automate this problem away. Certbot: The Foundation Basic Setup 1 2 3 4 5 6 7 8 9 # Install certbot sudo apt install certbot python3-certbot-nginx # Get certificate for nginx sudo certbot --nginx -d example.com -d www.example.com # Auto-renewal is configured automatically # Test it: sudo certbot renew --dry-run Standalone Mode (No Web Server) 1 2 3 4 5 # Stop web server, get cert, restart sudo certbot certonly --standalone -d example.com # Or use DNS challenge (no downtime) sudo certbot certonly --manual --preferred-challenges dns -d example.com Automated Renewal with Hooks 1 2 3 4 5 6 7 8 # /etc/letsencrypt/renewal-hooks/deploy/reload-nginx.sh #!/bin/bash systemctl reload nginx # /etc/letsencrypt/renewal-hooks/post/notify.sh #!/bin/bash curl -X POST https://slack.com/webhook \ -d '{"text":"SSL certificate renewed for '$RENEWED_DOMAINS'"}' cert-manager for Kubernetes The standard for Kubernetes certificate automation. ...

February 12, 2026 Â· 7 min Â· 1454 words Â· Rob Washington

Infrastructure Drift Detection: Keeping Your Terraform in Sync with Reality

You define infrastructure in Terraform. Someone clicks around in the AWS console. Now your code says one thing and reality says another. This is drift, and it will bite you when you least expect it. What Causes Drift? Manual changes — “Just this once” console modifications Emergency fixes — Hotfixes applied directly to production Other tools — Scripts, CLI commands, other IaC tools AWS itself — Auto-scaling, managed service updates Incomplete imports — Resources created outside Terraform Detecting Drift Basic: Terraform Plan 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #!/bin/bash # drift-check.sh set -e # Initialize without backend changes terraform init -input=false # Generate plan terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then echo "✅ No drift detected" elif [ $EXIT_CODE -eq 2 ]; then echo "⚠️ Drift detected! Changes needed:" terraform show tfplan exit 1 else echo "❌ Error running plan" exit 1 fi Automated Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 # drift_detector.py import subprocess import json import os from datetime import datetime from typing import List, Dict class DriftDetector: def __init__(self, workspace_dir: str): self.workspace_dir = workspace_dir def check_drift(self) -> Dict: """Run terraform plan and parse results.""" os.chdir(self.workspace_dir) # Initialize subprocess.run( ["terraform", "init", "-input=false"], capture_output=True, check=True ) # Plan with JSON output result = subprocess.run( ["terraform", "plan", "-json", "-out=tfplan"], capture_output=True, text=True ) changes = self._parse_plan_output(result.stdout) return { "workspace": self.workspace_dir, "timestamp": datetime.utcnow().isoformat(), "has_drift": len(changes) > 0, "changes": changes } def _parse_plan_output(self, output: str) -> List[Dict]: """Parse terraform plan JSON output.""" changes = [] for line in output.strip().split('\n'): try: entry = json.loads(line) if entry.get('@level') == 'info': msg = entry.get('@message', '') # Look for resource changes if 'will be' in msg: changes.append({ 'message': msg, 'type': entry.get('type', 'unknown'), 'change': entry.get('change', {}) }) except json.JSONDecodeError: continue return changes def check_all_workspaces(workspaces: List[str]) -> Dict: """Check drift across multiple Terraform workspaces.""" results = [] for workspace in workspaces: detector = DriftDetector(workspace) try: result = detector.check_drift() results.append(result) except Exception as e: results.append({ "workspace": workspace, "error": str(e), "has_drift": None }) drifted = [r for r in results if r.get('has_drift')] return { "total_workspaces": len(workspaces), "drifted_count": len(drifted), "results": results } # Usage workspaces = [ "/infrastructure/terraform/networking", "/infrastructure/terraform/compute", "/infrastructure/terraform/databases", ] report = check_all_workspaces(workspaces) print(json.dumps(report, indent=2)) CI Pipeline for Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 # .github/workflows/drift-detection.yml name: Drift Detection on: schedule: - cron: '0 */6 * * *' # Every 6 hours workflow_dispatch: jobs: detect-drift: runs-on: ubuntu-latest strategy: matrix: workspace: - networking - compute - databases - monitoring steps: - uses: actions/checkout@v4 - uses: hashicorp/setup-terraform@v3 with: terraform_version: 1.7.0 - name: Configure AWS uses: aws-actions/configure-aws-credentials@v4 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Check Drift id: drift working-directory: terraform/${{ matrix.workspace }} run: | terraform init -input=false set +e terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? set -e if [ $EXIT_CODE -eq 2 ]; then echo "drift_detected=true" >> $GITHUB_OUTPUT echo "### ⚠️ Drift Detected in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY terraform show -no-color tfplan >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY else echo "drift_detected=false" >> $GITHUB_OUTPUT echo "### ✅ No drift in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY fi - name: Alert on Drift if: steps.drift.outputs.drift_detected == 'true' uses: slackapi/slack-github-action@v1 with: payload: | { "text": "⚠️ Infrastructure drift detected in ${{ matrix.workspace }}", "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": "*Infrastructure Drift Detected*\n\nWorkspace: `${{ matrix.workspace }}`\nRun: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" } } ] } env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }} Continuous Reconciliation Atlantis for GitOps 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # atlantis.yaml version: 3 projects: - name: networking dir: terraform/networking autoplan: when_modified: - "*.tf" - "*.tfvars" enabled: true apply_requirements: - approved - mergeable - name: compute dir: terraform/compute autoplan: enabled: true apply_requirements: - approved Drift Auto-Remediation 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 # drift_remediation.py import subprocess import json from enum import Enum from typing import Optional class RemediationStrategy(Enum): ALERT_ONLY = "alert" # Just notify AUTO_PLAN = "plan" # Create PR with plan AUTO_APPLY = "apply" # Automatically apply (dangerous!) REFRESH_ONLY = "refresh" # Update state to match reality class DriftRemediator: def __init__(self, workspace: str, strategy: RemediationStrategy): self.workspace = workspace self.strategy = strategy def remediate(self, drift_report: dict) -> dict: """Remediate detected drift based on strategy.""" if not drift_report.get('has_drift'): return {"action": "none", "reason": "no drift"} if self.strategy == RemediationStrategy.ALERT_ONLY: return self._alert(drift_report) elif self.strategy == RemediationStrategy.REFRESH_ONLY: return self._refresh_state() elif self.strategy == RemediationStrategy.AUTO_PLAN: return self._create_remediation_pr(drift_report) elif self.strategy == RemediationStrategy.AUTO_APPLY: return self._auto_apply() return {"action": "unknown", "error": "invalid strategy"} def _alert(self, drift_report: dict) -> dict: """Send alert without taking action.""" # Send to Slack, PagerDuty, etc. return { "action": "alert", "changes": len(drift_report['changes']) } def _refresh_state(self) -> dict: """Refresh state to match reality (accept drift).""" result = subprocess.run( ["terraform", "apply", "-refresh-only", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "refresh", "success": result.returncode == 0, "output": result.stdout } def _create_remediation_pr(self, drift_report: dict) -> dict: """Create a PR to remediate drift.""" # Generate plan subprocess.run( ["terraform", "plan", "-out=remediation.tfplan"], cwd=self.workspace ) # Create branch and PR (pseudo-code) branch = f"fix/drift-{self.workspace}-{datetime.now().strftime('%Y%m%d')}" # In practice, you'd use GitHub API or similar return { "action": "pr_created", "branch": branch, "changes": len(drift_report['changes']) } def _auto_apply(self) -> dict: """Automatically apply to fix drift. USE WITH CAUTION.""" result = subprocess.run( ["terraform", "apply", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "auto_apply", "success": result.returncode == 0, "output": result.stdout if result.returncode == 0 else result.stderr } # Configuration per workspace WORKSPACE_STRATEGIES = { "networking": RemediationStrategy.ALERT_ONLY, # Critical, manual only "compute": RemediationStrategy.AUTO_PLAN, # Create PRs "monitoring": RemediationStrategy.REFRESH_ONLY, # Accept drift "dev-sandbox": RemediationStrategy.AUTO_APPLY, # Auto-fix okay } State Locking and Protection Prevent Manual Changes 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # terraform.tf terraform { backend "s3" { bucket = "terraform-state-prod" key = "networking/terraform.tfstate" region = "us-east-1" dynamodb_table = "terraform-locks" encrypt = true } } # Prevent destroy of critical resources resource "aws_db_instance" "main" { # ... lifecycle { prevent_destroy = true } } AWS Service Control Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 { "Version": "2012-10-17", "Statement": [ { "Sid": "RequireTerraformTag", "Effect": "Deny", "Action": [ "ec2:RunInstances", "ec2:CreateVpc", "rds:CreateDBInstance" ], "Resource": "*", "Condition": { "Null": { "aws:RequestTag/ManagedBy": "true" } } }, { "Sid": "PreventManualModification", "Effect": "Deny", "Action": [ "ec2:ModifyInstanceAttribute", "ec2:ModifyVpcAttribute" ], "Resource": "*", "Condition": { "StringEquals": { "aws:ResourceTag/ManagedBy": "terraform" }, "StringNotEquals": { "aws:PrincipalTag/Role": "terraform-automation" } } } ] } Drift Metrics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # drift_metrics.py from prometheus_client import Counter, Gauge, Histogram # Metrics drift_detected = Counter( 'terraform_drift_detected_total', 'Number of drift detection events', ['workspace', 'severity'] ) drift_resources = Gauge( 'terraform_drift_resources', 'Number of resources with drift', ['workspace'] ) drift_check_duration = Histogram( 'terraform_drift_check_seconds', 'Time to check for drift', ['workspace'] ) def record_drift_check(workspace: str, result: dict, duration: float): """Record drift check metrics.""" drift_check_duration.labels(workspace=workspace).observe(duration) if result['has_drift']: drift_detected.labels( workspace=workspace, severity='warning' if len(result['changes']) < 5 else 'critical' ).inc() drift_resources.labels(workspace=workspace).set(len(result['changes'])) else: drift_resources.labels(workspace=workspace).set(0) Best Practices Check drift frequently — At least daily, ideally every few hours Alert immediately — Drift compounds; catch it early Lock down production — Use SCPs to prevent manual changes Tag everything — ManagedBy: terraform enables enforcement Use workspaces — Isolate environments, check each independently Automate remediation — PRs for drift fixes, not manual applies Track metrics — Know your drift frequency and sources Quick Start Checklist Set up scheduled drift detection (GitHub Actions, Jenkins, etc.) Configure alerts for detected drift Add ManagedBy tags to all Terraform resources Implement SCPs to prevent manual changes in production Create runbook for drift remediation Track drift metrics over time Drift isn’t just annoying—it’s a reliability risk. Your next terraform apply might have unintended consequences because the state doesn’t match reality. Detect it early, fix it fast, and prevent it where possible. ...

February 11, 2026 Â· 8 min Â· 1606 words Â· Rob Washington

Cloud Cost Optimization: Cutting AWS Bills Without Cutting Corners

Cloud bills grow faster than you’d expect. A few forgotten instances here, oversized databases there, and suddenly you’re spending more on infrastructure than engineering salaries. Let’s fix that. Quick Wins: Find the Waste Identify Unused Resources 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 # find_waste.py import boto3 from datetime import datetime, timedelta ec2 = boto3.client('ec2') cloudwatch = boto3.client('cloudwatch') def find_idle_instances(): """Find EC2 instances with <5% CPU over 7 days.""" instances = ec2.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) idle = [] for reservation in instances['Reservations']: for instance in reservation['Instances']: instance_id = instance['InstanceId'] # Get average CPU over 7 days response = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=datetime.utcnow() - timedelta(days=7), EndTime=datetime.utcnow(), Period=86400, # 1 day Statistics=['Average'] ) if response['Datapoints']: avg_cpu = sum(d['Average'] for d in response['Datapoints']) / len(response['Datapoints']) if avg_cpu < 5: idle.append({ 'instance_id': instance_id, 'type': instance['InstanceType'], 'avg_cpu': round(avg_cpu, 2), 'name': next((t['Value'] for t in instance.get('Tags', []) if t['Key'] == 'Name'), 'unnamed') }) return idle def find_unattached_volumes(): """Find EBS volumes not attached to any instance.""" volumes = ec2.describe_volumes( Filters=[{'Name': 'status', 'Values': ['available']}] ) return [{ 'volume_id': v['VolumeId'], 'size_gb': v['Size'], 'monthly_cost': v['Size'] * 0.10 # gp2 pricing estimate } for v in volumes['Volumes']] def find_old_snapshots(): """Find snapshots older than 90 days.""" snapshots = ec2.describe_snapshots(OwnerIds=['self']) cutoff = datetime.utcnow() - timedelta(days=90) old = [] for snap in snapshots['Snapshots']: if snap['StartTime'].replace(tzinfo=None) < cutoff: old.append({ 'snapshot_id': snap['SnapshotId'], 'size_gb': snap['VolumeSize'], 'age_days': (datetime.utcnow() - snap['StartTime'].replace(tzinfo=None)).days }) return old # Run analysis print("=== Idle Instances ===") for i in find_idle_instances(): print(f" {i['instance_id']} ({i['type']}): {i['avg_cpu']}% avg CPU - {i['name']}") print("\n=== Unattached Volumes ===") for v in find_unattached_volumes(): print(f" {v['volume_id']}: {v['size_gb']}GB = ${v['monthly_cost']:.2f}/mo") print("\n=== Old Snapshots ===") for s in find_old_snapshots(): print(f" {s['snapshot_id']}: {s['size_gb']}GB, {s['age_days']} days old") Automated Cleanup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 # cleanup_automation.py import boto3 from datetime import datetime, timedelta def cleanup_old_amis(days_old: int = 180, dry_run: bool = True): """Deregister AMIs older than threshold and delete their snapshots.""" ec2 = boto3.client('ec2') images = ec2.describe_images(Owners=['self']) cutoff = datetime.utcnow() - timedelta(days=days_old) for image in images['Images']: creation_date = datetime.strptime(image['CreationDate'][:10], '%Y-%m-%d') if creation_date < cutoff: ami_id = image['ImageId'] snapshot_ids = [ bdm['Ebs']['SnapshotId'] for bdm in image.get('BlockDeviceMappings', []) if 'Ebs' in bdm ] if dry_run: print(f"Would delete AMI {ami_id} and snapshots {snapshot_ids}") else: ec2.deregister_image(ImageId=ami_id) for snap_id in snapshot_ids: ec2.delete_snapshot(SnapshotId=snap_id) print(f"Deleted AMI {ami_id} and {len(snapshot_ids)} snapshots") def stop_dev_instances_at_night(): """Stop non-production instances outside business hours.""" ec2 = boto3.resource('ec2') # Find instances tagged Environment=dev|staging instances = ec2.instances.filter( Filters=[ {'Name': 'tag:Environment', 'Values': ['dev', 'staging']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) instance_ids = [i.id for i in instances] if instance_ids: ec2.instances.filter(InstanceIds=instance_ids).stop() print(f"Stopped {len(instance_ids)} dev/staging instances") 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 # Lambda + EventBridge for scheduled cleanup AWSTemplateFormatVersion: '2010-09-09' Resources: StopDevInstancesFunction: Type: AWS::Lambda::Function Properties: FunctionName: stop-dev-instances Runtime: python3.11 Handler: index.handler Timeout: 60 Role: !GetAtt LambdaRole.Arn Code: ZipFile: | import boto3 def handler(event, context): ec2 = boto3.resource('ec2') instances = ec2.instances.filter( Filters=[ {'Name': 'tag:AutoStop', 'Values': ['true']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) ids = [i.id for i in instances] if ids: ec2.instances.filter(InstanceIds=ids).stop() return {'stopped': ids} StopSchedule: Type: AWS::Events::Rule Properties: ScheduleExpression: 'cron(0 22 ? * MON-FRI *)' # 10 PM weekdays Targets: - Id: StopDevInstances Arn: !GetAtt StopDevInstancesFunction.Arn Right-Sizing Analyze and Recommend 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # rightsizing.py import boto3 def get_rightsizing_recommendations(): """Get AWS Compute Optimizer recommendations.""" optimizer = boto3.client('compute-optimizer') response = optimizer.get_ec2_instance_recommendations() savings = [] for rec in response.get('instanceRecommendations', []): current = rec['currentInstanceType'] for option in rec.get('recommendationOptions', []): if option.get('rank') == 1: # Top recommendation recommended = option['instanceType'] monthly_savings = ( rec.get('utilizationMetrics', [{}])[0].get('value', 0) * option.get('projectedUtilizationMetrics', [{}])[0].get('value', 1) ) savings.append({ 'instance_id': rec['instanceArn'].split('/')[-1], 'current': current, 'recommended': recommended, 'finding': rec['finding'], 'estimated_savings': option.get('savingsOpportunity', {}) }) return savings # Generate report for rec in get_rightsizing_recommendations(): print(f"{rec['instance_id']}: {rec['current']} → {rec['recommended']}") print(f" Finding: {rec['finding']}") if rec['estimated_savings']: print(f" Potential savings: ${rec['estimated_savings'].get('estimatedMonthlySavings', {}).get('value', 0):.2f}/mo") Reserved Instances & Savings Plans Analyze Coverage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 # reservation_analysis.py import boto3 from collections import defaultdict def analyze_reservation_coverage(): """Analyze current RI/SP coverage and recommend purchases.""" ce = boto3.client('ce') # Get coverage report response = ce.get_reservation_coverage( TimePeriod={ 'Start': '2026-01-01', 'End': '2026-02-01' }, Granularity='MONTHLY', GroupBy=[ {'Type': 'DIMENSION', 'Key': 'INSTANCE_TYPE'} ] ) recommendations = [] for group in response.get('CoveragesByTime', [{}])[0].get('Groups', []): instance_type = group['Attributes']['instanceType'] coverage = float(group['Coverage']['CoverageHours']['CoverageHoursPercentage']) on_demand_hours = float(group['Coverage']['CoverageHours']['OnDemandHours']) if coverage < 70 and on_demand_hours > 500: recommendations.append({ 'instance_type': instance_type, 'current_coverage': coverage, 'on_demand_hours': on_demand_hours, 'recommendation': 'Consider Reserved Instances' }) return recommendations def get_savings_plan_recommendations(): """Get Savings Plan purchase recommendations.""" ce = boto3.client('ce') response = ce.get_savings_plans_purchase_recommendation( SavingsPlansType='COMPUTE_SP', TermInYears='ONE_YEAR', PaymentOption='NO_UPFRONT', LookbackPeriodInDays='THIRTY_DAYS' ) rec = response.get('SavingsPlansPurchaseRecommendation', {}) return { 'recommended_hourly_commitment': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('RecommendedHourlyCommitment'), 'estimated_monthly_savings': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedMonthlySavingsAmount'), 'estimated_savings_percentage': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedSavingsPercentage') } Spot Instances Spot for Stateless Workloads 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 # Kubernetes with Spot instances via Karpenter apiVersion: karpenter.sh/v1alpha5 kind: Provisioner metadata: name: spot-provisioner spec: requirements: - key: karpenter.sh/capacity-type operator: In values: ["spot"] - key: kubernetes.io/arch operator: In values: ["amd64"] - key: node.kubernetes.io/instance-type operator: In values: ["m5.large", "m5.xlarge", "m5a.large", "m5a.xlarge", "m6i.large"] # Spread across instance types for availability limits: resources: cpu: 1000 # Handle interruptions gracefully ttlSecondsAfterEmpty: 30 ttlSecondsUntilExpired: 2592000 # 30 days --- # Deployment using spot nodes apiVersion: apps/v1 kind: Deployment metadata: name: worker spec: replicas: 10 template: spec: nodeSelector: karpenter.sh/capacity-type: spot # Handle spot interruptions terminationGracePeriodSeconds: 120 containers: - name: worker lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 90"] # Drain gracefully Spot Interruption Handling 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # spot_interruption_handler.py import requests import time import signal import sys METADATA_URL = "http://169.254.169.254/latest/meta-data" def check_spot_interruption(): """Check if this spot instance is being interrupted.""" try: response = requests.get( f"{METADATA_URL}/spot/instance-action", timeout=1 ) if response.status_code == 200: return response.json() except: pass return None def graceful_shutdown(): """Handle graceful shutdown on interruption.""" print("Spot interruption detected, starting graceful shutdown...") # Stop accepting new work # Finish current tasks # Checkpoint state # Clean up sys.exit(0) def monitor_interruption(): """Monitor for spot interruption (2 minute warning).""" while True: interruption = check_spot_interruption() if interruption: print(f"Interruption notice: {interruption}") graceful_shutdown() time.sleep(5) # Run in background thread import threading threading.Thread(target=monitor_interruption, daemon=True).start() Storage Optimization S3 Lifecycle Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 # s3_lifecycle.py import boto3 def configure_lifecycle_rules(bucket_name: str): """Configure intelligent tiering and expiration.""" s3 = boto3.client('s3') lifecycle_config = { 'Rules': [ { 'ID': 'intelligent-tiering', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'Transitions': [ { 'Days': 0, 'StorageClass': 'INTELLIGENT_TIERING' } ] }, { 'ID': 'archive-old-logs', 'Status': 'Enabled', 'Filter': {'Prefix': 'logs/'}, 'Transitions': [ {'Days': 30, 'StorageClass': 'STANDARD_IA'}, {'Days': 90, 'StorageClass': 'GLACIER'}, ], 'Expiration': {'Days': 365} }, { 'ID': 'cleanup-incomplete-uploads', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'AbortIncompleteMultipartUpload': { 'DaysAfterInitiation': 7 } } ] } s3.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration=lifecycle_config ) Cost Monitoring Budget Alerts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # CloudFormation budget AWSTemplateFormatVersion: '2010-09-09' Resources: MonthlyBudget: Type: AWS::Budgets::Budget Properties: Budget: BudgetName: monthly-infrastructure BudgetLimit: Amount: 10000 Unit: USD TimeUnit: MONTHLY BudgetType: COST NotificationsWithSubscribers: - Notification: NotificationType: ACTUAL ComparisonOperator: GREATER_THAN Threshold: 80 Subscribers: - SubscriptionType: EMAIL Address: ops@example.com - Notification: NotificationType: FORECASTED ComparisonOperator: GREATER_THAN Threshold: 100 Subscribers: - SubscriptionType: SNS Address: !Ref AlertTopic Daily Cost Report 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 # daily_cost_report.py import boto3 from datetime import datetime, timedelta def get_daily_costs(): """Get costs broken down by service.""" ce = boto3.client('ce') end = datetime.utcnow().date() start = end - timedelta(days=7) response = ce.get_cost_and_usage( TimePeriod={ 'Start': start.isoformat(), 'End': end.isoformat() }, Granularity='DAILY', Metrics=['UnblendedCost'], GroupBy=[ {'Type': 'DIMENSION', 'Key': 'SERVICE'} ] ) report = [] for day in response['ResultsByTime']: date = day['TimePeriod']['Start'] for group in day['Groups']: service = group['Keys'][0] cost = float(group['Metrics']['UnblendedCost']['Amount']) if cost > 1: # Only show significant costs report.append({ 'date': date, 'service': service, 'cost': cost }) return report # Generate and send report for item in sorted(get_daily_costs(), key=lambda x: -x['cost'])[:10]: print(f"{item['date']} | {item['service']}: ${item['cost']:.2f}") Checklist Enable Cost Explorer and set up budgets Tag all resources for cost allocation Review and act on right-sizing recommendations monthly Implement lifecycle policies for S3 and EBS snapshots Use Spot for fault-tolerant workloads Purchase Savings Plans for baseline compute Stop dev/staging resources outside business hours Clean up unused resources weekly Review Reserved Instance coverage quarterly Cloud cost optimization isn’t a one-time project—it’s an ongoing practice. Automate the easy wins, review monthly, and treat your cloud bill like any other metric that matters. ...

February 11, 2026 Â· 9 min Â· 1709 words Â· Rob Washington

Chaos Engineering: Breaking Your Systems to Make Them Stronger

You don’t know if your system handles failures gracefully until failures happen. Chaos engineering lets you find out on your terms—in controlled conditions, during business hours, with engineers ready to respond. The Principles Define steady state — What does “healthy” look like? Hypothesize — “The system will continue serving traffic if one pod dies” Inject failure — Kill the pod Observe — Did steady state hold? Learn — Fix what broke, update runbooks Start Simple: Kill Things Random Pod Termination 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 # chaos_pod_killer.py import random from kubernetes import client, config from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ChaosPodKiller: def __init__(self, namespace: str, label_selector: str): config.load_incluster_config() # or load_kube_config() for local self.v1 = client.CoreV1Api() self.namespace = namespace self.label_selector = label_selector def get_targets(self) -> list: """Get pods matching selector.""" pods = self.v1.list_namespaced_pod( namespace=self.namespace, label_selector=self.label_selector ) return [pod.metadata.name for pod in pods.items if pod.status.phase == "Running"] def kill_random_pod(self, dry_run: bool = True) -> str: """Kill a random pod from targets.""" targets = self.get_targets() if len(targets) <= 1: logger.warning("Only one pod running, skipping kill") return None victim = random.choice(targets) if dry_run: logger.info(f"DRY RUN: Would delete pod {victim}") else: logger.info(f"Deleting pod {victim}") self.v1.delete_namespaced_pod( name=victim, namespace=self.namespace, grace_period_seconds=0 ) return victim def run_experiment(self, duration_minutes: int = 30, interval_seconds: int = 300): """Run chaos experiment for duration.""" import time end_time = datetime.now().timestamp() + (duration_minutes * 60) logger.info(f"Starting chaos experiment for {duration_minutes} minutes") while datetime.now().timestamp() < end_time: victim = self.kill_random_pod(dry_run=False) if victim: logger.info(f"Killed {victim}, waiting {interval_seconds}s") time.sleep(interval_seconds) logger.info("Chaos experiment completed") # Usage if __name__ == "__main__": killer = ChaosPodKiller( namespace="production", label_selector="app=api-service" ) killer.run_experiment(duration_minutes=30, interval_seconds=300) Kubernetes CronJob for Continuous Chaos 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 apiVersion: batch/v1 kind: CronJob metadata: name: chaos-pod-killer namespace: chaos-system spec: schedule: "*/10 9-17 * * 1-5" # Every 10 min, 9-5, weekdays only jobTemplate: spec: template: spec: serviceAccountName: chaos-runner containers: - name: chaos image: chaos-toolkit:latest command: - python - /scripts/chaos_pod_killer.py env: - name: TARGET_NAMESPACE value: "production" - name: LABEL_SELECTOR value: "chaos-enabled=true" restartPolicy: Never Network Chaos Introduce Latency 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # Using Chaos Mesh apiVersion: chaos-mesh.org/v1alpha1 kind: NetworkChaos metadata: name: network-delay namespace: chaos-testing spec: action: delay mode: all selector: namespaces: - production labelSelectors: app: api-service delay: latency: "100ms" jitter: "20ms" correlation: "50" duration: "5m" scheduler: cron: "@every 1h" Packet Loss 1 2 3 4 5 6 7 8 9 10 11 12 13 14 apiVersion: chaos-mesh.org/v1alpha1 kind: NetworkChaos metadata: name: network-loss spec: action: loss mode: one selector: labelSelectors: app: payment-service loss: loss: "10" # 10% packet loss correlation: "50" duration: "2m" Network Partition 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 apiVersion: chaos-mesh.org/v1alpha1 kind: NetworkChaos metadata: name: network-partition spec: action: partition mode: all selector: namespaces: - production labelSelectors: app: api-service direction: both target: selector: namespaces: - production labelSelectors: app: database duration: "30s" Resource Stress CPU Stress 1 2 3 4 5 6 7 8 9 10 11 12 13 14 apiVersion: chaos-mesh.org/v1alpha1 kind: StressChaos metadata: name: cpu-stress spec: mode: one selector: labelSelectors: app: api-service stressors: cpu: workers: 2 load: 80 # 80% CPU usage duration: "5m" Memory Stress 1 2 3 4 5 6 7 8 9 10 11 12 13 14 apiVersion: chaos-mesh.org/v1alpha1 kind: StressChaos metadata: name: memory-stress spec: mode: one selector: labelSelectors: app: api-service stressors: memory: workers: 1 size: "512MB" # Allocate 512MB duration: "3m" Application-Level Chaos HTTP Fault Injection with Istio 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 apiVersion: networking.istio.io/v1beta1 kind: VirtualService metadata: name: api-chaos spec: hosts: - api-service http: - fault: abort: percentage: value: 5 httpStatus: 503 delay: percentage: value: 10 fixedDelay: 2s route: - destination: host: api-service Custom Failure Injection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # chaos_middleware.py import random import time import os from functools import wraps CHAOS_ENABLED = os.getenv("CHAOS_ENABLED", "false").lower() == "true" CHAOS_FAILURE_RATE = float(os.getenv("CHAOS_FAILURE_RATE", "0.0")) CHAOS_LATENCY_MS = int(os.getenv("CHAOS_LATENCY_MS", "0")) def chaos_middleware(f): """Inject chaos into function calls.""" @wraps(f) def wrapper(*args, **kwargs): if not CHAOS_ENABLED: return f(*args, **kwargs) # Random failure if random.random() < CHAOS_FAILURE_RATE: raise Exception("Chaos injection: random failure") # Random latency if CHAOS_LATENCY_MS > 0: delay = random.randint(0, CHAOS_LATENCY_MS) / 1000 time.sleep(delay) return f(*args, **kwargs) return wrapper # Usage @chaos_middleware def call_payment_service(order): return requests.post(PAYMENT_URL, json=order.to_dict()) Experiment Framework 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 # chaos_experiment.py from dataclasses import dataclass from typing import Callable, List import time @dataclass class SteadyStateCheck: name: str check: Callable[[], bool] @dataclass class ChaosExperiment: name: str hypothesis: str steady_state_checks: List[SteadyStateCheck] inject_failure: Callable[[], None] rollback: Callable[[], None] duration_seconds: int = 60 def run_experiment(experiment: ChaosExperiment) -> dict: """Run a chaos experiment with proper controls.""" result = { "name": experiment.name, "hypothesis": experiment.hypothesis, "success": False, "steady_state_before": {}, "steady_state_after": {}, "errors": [] } # 1. Verify steady state before print(f"Checking steady state before experiment...") for check in experiment.steady_state_checks: try: passed = check.check() result["steady_state_before"][check.name] = passed if not passed: result["errors"].append(f"Pre-check failed: {check.name}") return result except Exception as e: result["errors"].append(f"Pre-check error: {check.name}: {e}") return result # 2. Inject failure print(f"Injecting failure: {experiment.name}") try: experiment.inject_failure() except Exception as e: result["errors"].append(f"Injection failed: {e}") experiment.rollback() return result # 3. Wait for duration print(f"Waiting {experiment.duration_seconds}s...") time.sleep(experiment.duration_seconds) # 4. Check steady state during/after print(f"Checking steady state after experiment...") for check in experiment.steady_state_checks: try: passed = check.check() result["steady_state_after"][check.name] = passed except Exception as e: result["steady_state_after"][check.name] = False result["errors"].append(f"Post-check error: {check.name}: {e}") # 5. Rollback print(f"Rolling back...") experiment.rollback() # 6. Evaluate hypothesis result["success"] = all(result["steady_state_after"].values()) return result # Example experiment def check_api_responding(): response = requests.get("http://api-service/health", timeout=5) return response.status_code == 200 def check_error_rate_low(): # Query Prometheus result = prometheus.query('sum(rate(http_requests_total{status=~"5.."}[1m])) / sum(rate(http_requests_total[1m]))') return float(result) < 0.01 def kill_one_api_pod(): killer = ChaosPodKiller("production", "app=api-service") killer.kill_random_pod(dry_run=False) def noop_rollback(): pass # Kubernetes will restart the pod experiment = ChaosExperiment( name="api-pod-failure", hypothesis="API continues serving traffic when one pod dies", steady_state_checks=[ SteadyStateCheck("api_responding", check_api_responding), SteadyStateCheck("error_rate_low", check_error_rate_low), ], inject_failure=kill_one_api_pod, rollback=noop_rollback, duration_seconds=60 ) result = run_experiment(experiment) print(f"Experiment {'PASSED' if result['success'] else 'FAILED'}") Game Days Schedule regular chaos exercises: ...

February 11, 2026 Â· 8 min Â· 1552 words Â· Rob Washington

Incident Response: On-Call Practices That Don't Burn Out Your Team

On-call doesn’t have to mean sleepless nights and burnout. With the right practices, you can maintain reliable systems while keeping your team healthy. Here’s how. Alert Design Only Alert on Actionable Items 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # Bad: Noisy, leads to alert fatigue - alert: HighCPU expr: cpu_usage > 80 for: 1m # Good: Actionable, symptom-based - alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.01 for: 5m labels: severity: warning runbook: "https://runbooks.example.com/high-error-rate" annotations: summary: "Error rate {{ $value | humanizePercentage }} exceeds 1%" impact: "Users experiencing failures" action: "Check application logs and recent deployments" Severity Levels 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # severity-definitions.yaml severities: critical: description: "Service completely down, immediate action required" response_time: "5 minutes" notification: "page + phone call" examples: - "Total service outage" - "Data loss occurring" - "Security breach detected" warning: description: "Degraded service, action needed soon" response_time: "30 minutes" notification: "page" examples: - "Error rate elevated but service functional" - "Approaching resource limits" - "Single replica down" info: description: "Notable event, no immediate action" response_time: "Next business day" notification: "Slack" examples: - "Deployment completed" - "Scheduled maintenance starting" Runbooks Every alert needs a runbook: ...

February 11, 2026 Â· 16 min Â· 3302 words Â· Rob Washington

Observability: Beyond Monitoring with Metrics, Logs, and Traces

Monitoring tells you when something is wrong. Observability helps you understand why. In distributed systems, you can’t predict every failure mode—you need systems that let you ask arbitrary questions about their behavior. The Three Pillars Metrics: What’s Happening Now Numeric time-series data. Fast to query, cheap to store. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from prometheus_client import Counter, Histogram, Gauge, start_http_server # Counter - only goes up requests_total = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'] ) # Histogram - distribution of values request_duration = Histogram( 'http_request_duration_seconds', 'Request duration in seconds', ['method', 'endpoint'], buckets=[.01, .05, .1, .25, .5, 1, 2.5, 5, 10] ) # Gauge - can go up or down active_connections = Gauge( 'active_connections', 'Number of active connections' ) # Usage @app.route("/api/<endpoint>") def handle_request(endpoint): active_connections.inc() with request_duration.labels( method=request.method, endpoint=endpoint ).time(): result = process_request() requests_total.labels( method=request.method, endpoint=endpoint, status=200 ).inc() active_connections.dec() return result # Expose metrics endpoint start_http_server(9090) Logs: What Happened Discrete events with context. Rich detail, expensive at scale. ...

February 11, 2026 Â· 7 min Â· 1291 words Â· Rob Washington

The Twelve-Factor App: Building Cloud-Native Applications That Scale

The twelve-factor methodology emerged from Heroku’s experience running millions of apps. These principles create applications that deploy cleanly, scale effortlessly, and minimize divergence between development and production. Let’s walk through each factor with practical examples. 1. Codebase: One Repo, Many Deploys One codebase tracked in version control, many deploys (dev, staging, prod). 1 2 3 4 5 6 7 8 9 # Good: Single repo, branch-based environments main → production staging → staging feature/* → development # Bad: Separate repos for each environment myapp-dev/ myapp-staging/ myapp-prod/ 1 2 3 4 5 # config.py - Same code, different configs import os ENVIRONMENT = os.getenv("ENVIRONMENT", "development") DATABASE_URL = os.getenv("DATABASE_URL") 2. Dependencies: Explicitly Declare and Isolate Never rely on system-wide packages. Declare everything. ...

February 11, 2026 Â· 6 min Â· 1237 words Â· Rob Washington

Service Mesh: Traffic Management, Security, and Observability with Istio

When you have dozens of microservices talking to each other, managing traffic, security, and observability becomes complex. A service mesh handles this at the infrastructure layer, so your applications don’t have to. What Problems Does a Service Mesh Solve? Without a mesh, every service needs to implement: Retries and timeouts Circuit breakers Load balancing TLS certificates Metrics and tracing Access control With a mesh, the sidecar proxy handles all of this: ...

February 11, 2026 Â· 6 min Â· 1277 words Â· Rob Washington

Testing Infrastructure: From Terraform Plans to Production Validation

Infrastructure code deserves the same testing rigor as application code. A typo in Terraform can delete a database. An untested Ansible role can break production. Let’s build confidence with proper testing. The Testing Pyramid for Infrastructure E ( I ( 2 F n R E u t e U l e a n ( T l g l i S e r t t s s a c a t t t l T t s a i o e i c o u s c k n d t s a d T r n e e e a p s s l l t o y o s u s y r i m c s e e , n s t ) p ) l a n v a l i d a t i o n ) Unit Testing: Static Analysis Terraform Validation 1 2 3 4 5 6 7 8 # Built-in validation terraform init terraform validate terraform fmt -check # Custom validation rules terraform plan -out=tfplan terraform show -json tfplan > plan.json 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # tests/test_terraform_plan.py import json import pytest @pytest.fixture def plan(): with open('plan.json') as f: return json.load(f) def test_no_resources_destroyed(plan): """Ensure no resources are being destroyed.""" changes = plan.get('resource_changes', []) destroyed = [c for c in changes if 'delete' in c.get('change', {}).get('actions', [])] assert len(destroyed) == 0, f"Resources being destroyed: {[d['address'] for d in destroyed]}" def test_no_public_s3_buckets(plan): """Ensure S3 buckets aren't public.""" changes = plan.get('resource_changes', []) for change in changes: if change['type'] == 'aws_s3_bucket': after = change.get('change', {}).get('after', {}) acl = after.get('acl', 'private') assert acl == 'private', f"Bucket {change['address']} has public ACL: {acl}" def test_instances_have_tags(plan): """Ensure EC2 instances have required tags.""" required_tags = {'Environment', 'Owner', 'Project'} changes = plan.get('resource_changes', []) for change in changes: if change['type'] == 'aws_instance': after = change.get('change', {}).get('after', {}) tags = set(after.get('tags', {}).keys()) missing = required_tags - tags assert not missing, f"Instance {change['address']} missing tags: {missing}" Policy as Code with OPA 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # policy/terraform.rego package terraform deny[msg] { resource := input.resource_changes[_] resource.type == "aws_security_group_rule" resource.change.after.cidr_blocks[_] == "0.0.0.0/0" resource.change.after.from_port == 22 msg := sprintf("SSH open to world in %v", [resource.address]) } deny[msg] { resource := input.resource_changes[_] resource.type == "aws_db_instance" resource.change.after.publicly_accessible == true msg := sprintf("RDS instance %v is publicly accessible", [resource.address]) } 1 2 # Run OPA checks terraform show -json tfplan | opa eval -i - -d policy/ "data.terraform.deny" Integration Testing with Terratest Terratest deploys real infrastructure, validates it, then tears it down: ...

February 11, 2026 Â· 7 min Â· 1358 words Â· Rob Washington