That database password hasn’t changed in three years. The API key in your config was committed by someone who left two jobs ago. The SSL certificate expires next Tuesday and nobody knows.

Secrets rot. Rotation automation fixes this.

Why Rotate?

Static credentials are liability:

  • Leaked credentials stay valid until someone notices
  • Compliance requires it (PCI-DSS, SOC2, HIPAA)
  • Blast radius grows the longer a secret lives
  • Offboarded employees may still have access

Automated rotation means:

  • Credentials expire before they’re useful to attackers
  • No human remembers (or forgets) to rotate
  • Rotation becomes a non-event, not a fire drill

AWS Secrets Manager Rotation

AWS handles the hard parts for supported services.

RDS Database Rotation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Create the secret
resource "aws_secretsmanager_secret" "db_credentials" {
  name = "prod/database/admin"
}

resource "aws_secretsmanager_secret_version" "db_credentials" {
  secret_id = aws_secretsmanager_secret.db_credentials.id
  secret_string = jsonencode({
    username = "admin"
    password = random_password.db.result
    engine   = "postgres"
    host     = aws_db_instance.main.address
    port     = 5432
    dbname   = "myapp"
  })
}

# Enable rotation
resource "aws_secretsmanager_secret_rotation" "db_credentials" {
  secret_id           = aws_secretsmanager_secret.db_credentials.id
  rotation_lambda_arn = aws_lambda_function.rotate_db.arn

  rotation_rules {
    automatically_after_days = 30
  }
}

Custom Rotation Lambda

For non-RDS secrets, write your own rotator:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import boto3
import json
import string
import secrets

def lambda_handler(event, context):
    secret_id = event['SecretId']
    step = event['Step']
    token = event['ClientRequestToken']
    
    sm = boto3.client('secretsmanager')
    
    if step == "createSecret":
        create_secret(sm, secret_id, token)
    elif step == "setSecret":
        set_secret(sm, secret_id, token)
    elif step == "testSecret":
        test_secret(sm, secret_id, token)
    elif step == "finishSecret":
        finish_secret(sm, secret_id, token)

def create_secret(sm, secret_id, token):
    """Generate new secret value"""
    # Get current secret
    current = sm.get_secret_value(SecretId=secret_id, VersionStage="AWSCURRENT")
    current_dict = json.loads(current['SecretString'])
    
    # Generate new password
    alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
    new_password = ''.join(secrets.choice(alphabet) for _ in range(32))
    
    # Store as pending
    current_dict['password'] = new_password
    sm.put_secret_value(
        SecretId=secret_id,
        ClientRequestToken=token,
        SecretString=json.dumps(current_dict),
        VersionStages=['AWSPENDING']
    )

def set_secret(sm, secret_id, token):
    """Apply new secret to the service"""
    pending = sm.get_secret_value(
        SecretId=secret_id,
        VersionId=token,
        VersionStage="AWSPENDING"
    )
    creds = json.loads(pending['SecretString'])
    
    # Update the actual service (e.g., database user password)
    update_database_password(creds['username'], creds['password'])

def test_secret(sm, secret_id, token):
    """Verify new secret works"""
    pending = sm.get_secret_value(
        SecretId=secret_id,
        VersionId=token,
        VersionStage="AWSPENDING"
    )
    creds = json.loads(pending['SecretString'])
    
    # Test connection with new credentials
    if not can_connect(creds):
        raise Exception("New credentials failed validation")

def finish_secret(sm, secret_id, token):
    """Promote pending to current"""
    metadata = sm.describe_secret(SecretId=secret_id)
    
    for version_id, stages in metadata['VersionIdsToStages'].items():
        if 'AWSCURRENT' in stages and version_id != token:
            # Demote old current
            sm.update_secret_version_stage(
                SecretId=secret_id,
                VersionStage='AWSCURRENT',
                RemoveFromVersionId=version_id,
                MoveToVersionId=token
            )
            break

HashiCorp Vault Dynamic Secrets

Vault generates credentials on-demand. They’re born with an expiration date.

Database Secrets Engine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Enable database secrets engine
vault secrets enable database

# Configure PostgreSQL connection
vault write database/config/mydb \
    plugin_name=postgresql-database-plugin \
    connection_url="postgresql://{{username}}:{{password}}@postgres:5432/mydb" \
    allowed_roles="readonly,readwrite" \
    username="vault_admin" \
    password="admin_password"

# Create a role that generates credentials
vault write database/roles/readonly \
    db_name=mydb \
    creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";" \
    default_ttl="1h" \
    max_ttl="24h"

Application requests credentials:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import hvac
import psycopg2

client = hvac.Client(url='https://vault:8200', token=os.environ['VAULT_TOKEN'])

# Get dynamic credentials (created on demand)
creds = client.secrets.database.generate_credentials(name='readonly')

# Use them
conn = psycopg2.connect(
    host='postgres',
    database='mydb',
    user=creds['data']['username'],
    password=creds['data']['password']
)

# Credentials auto-expire after TTL
# Vault revokes them in the database

AWS Secrets Engine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Enable AWS secrets engine
vault secrets enable aws

# Configure root credentials
vault write aws/config/root \
    access_key=AKIA... \
    secret_key=... \
    region=us-east-1

# Create role for dynamic IAM users
vault write aws/roles/deploy \
    credential_type=iam_user \
    policy_document=-<<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:*"],
      "Resource": ["arn:aws:s3:::my-bucket/*"]
    }
  ]
}
EOF

# Get temporary credentials
vault read aws/creds/deploy
# Returns: access_key, secret_key, lease_id, lease_duration

Certificate Rotation with cert-manager

Kubernetes cert-manager handles TLS certificate lifecycle.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
  name: api-tls
  namespace: default
spec:
  secretName: api-tls-secret
  duration: 2160h    # 90 days
  renewBefore: 360h  # Renew 15 days before expiry
  
  issuerRef:
    name: letsencrypt-prod
    kind: ClusterIssuer
  
  commonName: api.example.com
  dnsNames:
    - api.example.com
    - www.api.example.com

cert-manager automatically:

  1. Requests certificate from Let’s Encrypt
  2. Stores in Kubernetes Secret
  3. Renews before expiration
  4. Updates the Secret (pods need to reload)

Force Reload on Certificate Renewal

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api
  annotations:
    reloader.stakater.com/auto: "true"  # Using Reloader
spec:
  template:
    spec:
      containers:
      - name: api
        volumeMounts:
        - name: tls
          mountPath: /etc/tls
          readOnly: true
      volumes:
      - name: tls
        secret:
          secretName: api-tls-secret

API Key Rotation Pattern

For third-party API keys that don’t support dynamic generation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import boto3
from datetime import datetime, timedelta

class APIKeyRotator:
    def __init__(self, service_name):
        self.sm = boto3.client('secretsmanager')
        self.service_name = service_name
    
    def rotate(self):
        """Dual-key rotation pattern"""
        secret_id = f"api/{self.service_name}"
        
        # Get current keys
        current = self.get_secret(secret_id)
        
        # Generate new key via API
        new_key = self.create_api_key()
        
        # Update secret with both keys (overlap period)
        self.sm.put_secret_value(
            SecretId=secret_id,
            SecretString=json.dumps({
                "primary_key": new_key,
                "secondary_key": current.get("primary_key"),
                "rotated_at": datetime.utcnow().isoformat()
            })
        )
        
        # Wait for propagation
        time.sleep(60)
        
        # Revoke old key
        if current.get("primary_key"):
            self.revoke_api_key(current["primary_key"])
    
    def get_current_key(self, secret_id):
        """Application calls this - tries primary, falls back to secondary"""
        secret = self.get_secret(secret_id)
        
        try:
            # Try primary
            self.validate_key(secret["primary_key"])
            return secret["primary_key"]
        except InvalidKeyError:
            # Fall back to secondary during rotation
            return secret["secondary_key"]

Rotation Without Downtime

The key insight: overlap period. Old and new credentials must both work during rotation.

TTTTTTTi++++++m001234elineCBUAVR:roppeeetdprvahaliottifkeoecyelandatnoepielwApowdNlncDiccrcurrenaseedetiddewineenognntvnttianiialceaaliowlldnfcwirogerdkesntial

Database User Rotation

1
2
3
4
5
6
7
8
9
-- Create new user
CREATE USER app_v2 WITH PASSWORD 'new_password';
GRANT app_role TO app_v2;

-- Both users work during transition
-- Update application to use app_v2

-- After verification, drop old user
DROP USER app_v1;

Connection Pool Considerations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class RotationAwarePool:
    def __init__(self, get_credentials):
        self.get_credentials = get_credentials
        self.pool = None
        self.creds_hash = None
    
    def get_connection(self):
        current_creds = self.get_credentials()
        current_hash = hash(json.dumps(current_creds, sort_keys=True))
        
        if current_hash != self.creds_hash:
            # Credentials changed - recreate pool
            if self.pool:
                self.pool.close()
            
            self.pool = create_pool(current_creds)
            self.creds_hash = current_hash
        
        return self.pool.get_connection()

Monitoring Rotation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from prometheus_client import Counter, Gauge

rotation_success = Counter(
    'secret_rotation_success_total',
    'Successful secret rotations',
    ['secret_name']
)

rotation_failure = Counter(
    'secret_rotation_failure_total',
    'Failed secret rotations',
    ['secret_name', 'step']
)

secret_age_seconds = Gauge(
    'secret_age_seconds',
    'Age of current secret version',
    ['secret_name']
)

def rotate_with_metrics(secret_name, rotate_fn):
    try:
        rotate_fn()
        rotation_success.labels(secret_name).inc()
        secret_age_seconds.labels(secret_name).set(0)
    except Exception as e:
        rotation_failure.labels(secret_name, str(e)).inc()
        raise

Alert on:

  • Rotation failures
  • Secrets older than rotation period
  • Certificates expiring within 7 days

The Checklist

  1. Inventory all secrets — You can’t rotate what you don’t know about
  2. Classify by rotation complexity — Managed (AWS RDS) vs. custom
  3. Implement overlap — Never single-point-of-failure during rotation
  4. Test rotation in staging — Before it breaks production
  5. Monitor rotation health — Failures, age, upcoming expirations
  6. Document manual fallback — For when automation fails
  7. Start with long intervals — 90 days, then tighten to 30

Rotation automation is insurance. It costs effort upfront but pays dividends when credentials leak, employees leave, or auditors ask questions.

Fresh secrets are happy secrets. Automate the freshness.