Infrastructure Drift Detection: Keeping Your Terraform in Sync with Reality

You define infrastructure in Terraform. Someone clicks around in the AWS console. Now your code says one thing and reality says another. This is drift, and it will bite you when you least expect it. What Causes Drift? Manual changes — “Just this once” console modifications Emergency fixes — Hotfixes applied directly to production Other tools — Scripts, CLI commands, other IaC tools AWS itself — Auto-scaling, managed service updates Incomplete imports — Resources created outside Terraform Detecting Drift Basic: Terraform Plan 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #!/bin/bash # drift-check.sh set -e # Initialize without backend changes terraform init -input=false # Generate plan terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then echo "✅ No drift detected" elif [ $EXIT_CODE -eq 2 ]; then echo "⚠️ Drift detected! Changes needed:" terraform show tfplan exit 1 else echo "❌ Error running plan" exit 1 fi Automated Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 # drift_detector.py import subprocess import json import os from datetime import datetime from typing import List, Dict class DriftDetector: def __init__(self, workspace_dir: str): self.workspace_dir = workspace_dir def check_drift(self) -> Dict: """Run terraform plan and parse results.""" os.chdir(self.workspace_dir) # Initialize subprocess.run( ["terraform", "init", "-input=false"], capture_output=True, check=True ) # Plan with JSON output result = subprocess.run( ["terraform", "plan", "-json", "-out=tfplan"], capture_output=True, text=True ) changes = self._parse_plan_output(result.stdout) return { "workspace": self.workspace_dir, "timestamp": datetime.utcnow().isoformat(), "has_drift": len(changes) > 0, "changes": changes } def _parse_plan_output(self, output: str) -> List[Dict]: """Parse terraform plan JSON output.""" changes = [] for line in output.strip().split('\n'): try: entry = json.loads(line) if entry.get('@level') == 'info': msg = entry.get('@message', '') # Look for resource changes if 'will be' in msg: changes.append({ 'message': msg, 'type': entry.get('type', 'unknown'), 'change': entry.get('change', {}) }) except json.JSONDecodeError: continue return changes def check_all_workspaces(workspaces: List[str]) -> Dict: """Check drift across multiple Terraform workspaces.""" results = [] for workspace in workspaces: detector = DriftDetector(workspace) try: result = detector.check_drift() results.append(result) except Exception as e: results.append({ "workspace": workspace, "error": str(e), "has_drift": None }) drifted = [r for r in results if r.get('has_drift')] return { "total_workspaces": len(workspaces), "drifted_count": len(drifted), "results": results } # Usage workspaces = [ "/infrastructure/terraform/networking", "/infrastructure/terraform/compute", "/infrastructure/terraform/databases", ] report = check_all_workspaces(workspaces) print(json.dumps(report, indent=2)) CI Pipeline for Drift Detection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 # .github/workflows/drift-detection.yml name: Drift Detection on: schedule: - cron: '0 */6 * * *' # Every 6 hours workflow_dispatch: jobs: detect-drift: runs-on: ubuntu-latest strategy: matrix: workspace: - networking - compute - databases - monitoring steps: - uses: actions/checkout@v4 - uses: hashicorp/setup-terraform@v3 with: terraform_version: 1.7.0 - name: Configure AWS uses: aws-actions/configure-aws-credentials@v4 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Check Drift id: drift working-directory: terraform/${{ matrix.workspace }} run: | terraform init -input=false set +e terraform plan -detailed-exitcode -out=tfplan 2>&1 | tee plan.txt EXIT_CODE=$? set -e if [ $EXIT_CODE -eq 2 ]; then echo "drift_detected=true" >> $GITHUB_OUTPUT echo "### ⚠️ Drift Detected in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY terraform show -no-color tfplan >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY else echo "drift_detected=false" >> $GITHUB_OUTPUT echo "### ✅ No drift in ${{ matrix.workspace }}" >> $GITHUB_STEP_SUMMARY fi - name: Alert on Drift if: steps.drift.outputs.drift_detected == 'true' uses: slackapi/slack-github-action@v1 with: payload: | { "text": "⚠️ Infrastructure drift detected in ${{ matrix.workspace }}", "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": "*Infrastructure Drift Detected*\n\nWorkspace: `${{ matrix.workspace }}`\nRun: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" } } ] } env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }} Continuous Reconciliation Atlantis for GitOps 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # atlantis.yaml version: 3 projects: - name: networking dir: terraform/networking autoplan: when_modified: - "*.tf" - "*.tfvars" enabled: true apply_requirements: - approved - mergeable - name: compute dir: terraform/compute autoplan: enabled: true apply_requirements: - approved Drift Auto-Remediation 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 # drift_remediation.py import subprocess import json from enum import Enum from typing import Optional class RemediationStrategy(Enum): ALERT_ONLY = "alert" # Just notify AUTO_PLAN = "plan" # Create PR with plan AUTO_APPLY = "apply" # Automatically apply (dangerous!) REFRESH_ONLY = "refresh" # Update state to match reality class DriftRemediator: def __init__(self, workspace: str, strategy: RemediationStrategy): self.workspace = workspace self.strategy = strategy def remediate(self, drift_report: dict) -> dict: """Remediate detected drift based on strategy.""" if not drift_report.get('has_drift'): return {"action": "none", "reason": "no drift"} if self.strategy == RemediationStrategy.ALERT_ONLY: return self._alert(drift_report) elif self.strategy == RemediationStrategy.REFRESH_ONLY: return self._refresh_state() elif self.strategy == RemediationStrategy.AUTO_PLAN: return self._create_remediation_pr(drift_report) elif self.strategy == RemediationStrategy.AUTO_APPLY: return self._auto_apply() return {"action": "unknown", "error": "invalid strategy"} def _alert(self, drift_report: dict) -> dict: """Send alert without taking action.""" # Send to Slack, PagerDuty, etc. return { "action": "alert", "changes": len(drift_report['changes']) } def _refresh_state(self) -> dict: """Refresh state to match reality (accept drift).""" result = subprocess.run( ["terraform", "apply", "-refresh-only", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "refresh", "success": result.returncode == 0, "output": result.stdout } def _create_remediation_pr(self, drift_report: dict) -> dict: """Create a PR to remediate drift.""" # Generate plan subprocess.run( ["terraform", "plan", "-out=remediation.tfplan"], cwd=self.workspace ) # Create branch and PR (pseudo-code) branch = f"fix/drift-{self.workspace}-{datetime.now().strftime('%Y%m%d')}" # In practice, you'd use GitHub API or similar return { "action": "pr_created", "branch": branch, "changes": len(drift_report['changes']) } def _auto_apply(self) -> dict: """Automatically apply to fix drift. USE WITH CAUTION.""" result = subprocess.run( ["terraform", "apply", "-auto-approve"], capture_output=True, text=True, cwd=self.workspace ) return { "action": "auto_apply", "success": result.returncode == 0, "output": result.stdout if result.returncode == 0 else result.stderr } # Configuration per workspace WORKSPACE_STRATEGIES = { "networking": RemediationStrategy.ALERT_ONLY, # Critical, manual only "compute": RemediationStrategy.AUTO_PLAN, # Create PRs "monitoring": RemediationStrategy.REFRESH_ONLY, # Accept drift "dev-sandbox": RemediationStrategy.AUTO_APPLY, # Auto-fix okay } State Locking and Protection Prevent Manual Changes 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # terraform.tf terraform { backend "s3" { bucket = "terraform-state-prod" key = "networking/terraform.tfstate" region = "us-east-1" dynamodb_table = "terraform-locks" encrypt = true } } # Prevent destroy of critical resources resource "aws_db_instance" "main" { # ... lifecycle { prevent_destroy = true } } AWS Service Control Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 { "Version": "2012-10-17", "Statement": [ { "Sid": "RequireTerraformTag", "Effect": "Deny", "Action": [ "ec2:RunInstances", "ec2:CreateVpc", "rds:CreateDBInstance" ], "Resource": "*", "Condition": { "Null": { "aws:RequestTag/ManagedBy": "true" } } }, { "Sid": "PreventManualModification", "Effect": "Deny", "Action": [ "ec2:ModifyInstanceAttribute", "ec2:ModifyVpcAttribute" ], "Resource": "*", "Condition": { "StringEquals": { "aws:ResourceTag/ManagedBy": "terraform" }, "StringNotEquals": { "aws:PrincipalTag/Role": "terraform-automation" } } } ] } Drift Metrics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # drift_metrics.py from prometheus_client import Counter, Gauge, Histogram # Metrics drift_detected = Counter( 'terraform_drift_detected_total', 'Number of drift detection events', ['workspace', 'severity'] ) drift_resources = Gauge( 'terraform_drift_resources', 'Number of resources with drift', ['workspace'] ) drift_check_duration = Histogram( 'terraform_drift_check_seconds', 'Time to check for drift', ['workspace'] ) def record_drift_check(workspace: str, result: dict, duration: float): """Record drift check metrics.""" drift_check_duration.labels(workspace=workspace).observe(duration) if result['has_drift']: drift_detected.labels( workspace=workspace, severity='warning' if len(result['changes']) < 5 else 'critical' ).inc() drift_resources.labels(workspace=workspace).set(len(result['changes'])) else: drift_resources.labels(workspace=workspace).set(0) Best Practices Check drift frequently — At least daily, ideally every few hours Alert immediately — Drift compounds; catch it early Lock down production — Use SCPs to prevent manual changes Tag everything — ManagedBy: terraform enables enforcement Use workspaces — Isolate environments, check each independently Automate remediation — PRs for drift fixes, not manual applies Track metrics — Know your drift frequency and sources Quick Start Checklist Set up scheduled drift detection (GitHub Actions, Jenkins, etc.) Configure alerts for detected drift Add ManagedBy tags to all Terraform resources Implement SCPs to prevent manual changes in production Create runbook for drift remediation Track drift metrics over time Drift isn’t just annoying—it’s a reliability risk. Your next terraform apply might have unintended consequences because the state doesn’t match reality. Detect it early, fix it fast, and prevent it where possible. ...

February 11, 2026 · 8 min · 1606 words · Rob Washington