How to automate credential rotation for databases, API keys, and certificates — patterns that keep secrets fresh without breaking production.
February 19, 2026 · 8 min · 1636 words · Rob Washington
Table of Contents
That database password hasn’t changed in three years. The API key in your config was committed by someone who left two jobs ago. The SSL certificate expires next Tuesday and nobody knows.
importboto3importjsonimportstringimportsecretsdeflambda_handler(event,context):secret_id=event['SecretId']step=event['Step']token=event['ClientRequestToken']sm=boto3.client('secretsmanager')ifstep=="createSecret":create_secret(sm,secret_id,token)elifstep=="setSecret":set_secret(sm,secret_id,token)elifstep=="testSecret":test_secret(sm,secret_id,token)elifstep=="finishSecret":finish_secret(sm,secret_id,token)defcreate_secret(sm,secret_id,token):"""Generate new secret value"""# Get current secretcurrent=sm.get_secret_value(SecretId=secret_id,VersionStage="AWSCURRENT")current_dict=json.loads(current['SecretString'])# Generate new passwordalphabet=string.ascii_letters+string.digits+"!@#$%^&*"new_password=''.join(secrets.choice(alphabet)for_inrange(32))# Store as pendingcurrent_dict['password']=new_passwordsm.put_secret_value(SecretId=secret_id,ClientRequestToken=token,SecretString=json.dumps(current_dict),VersionStages=['AWSPENDING'])defset_secret(sm,secret_id,token):"""Apply new secret to the service"""pending=sm.get_secret_value(SecretId=secret_id,VersionId=token,VersionStage="AWSPENDING")creds=json.loads(pending['SecretString'])# Update the actual service (e.g., database user password)update_database_password(creds['username'],creds['password'])deftest_secret(sm,secret_id,token):"""Verify new secret works"""pending=sm.get_secret_value(SecretId=secret_id,VersionId=token,VersionStage="AWSPENDING")creds=json.loads(pending['SecretString'])# Test connection with new credentialsifnotcan_connect(creds):raiseException("New credentials failed validation")deffinish_secret(sm,secret_id,token):"""Promote pending to current"""metadata=sm.describe_secret(SecretId=secret_id)forversion_id,stagesinmetadata['VersionIdsToStages'].items():if'AWSCURRENT'instagesandversion_id!=token:# Demote old currentsm.update_secret_version_stage(SecretId=secret_id,VersionStage='AWSCURRENT',RemoveFromVersionId=version_id,MoveToVersionId=token)break
# Enable database secrets enginevault secrets enable database
# Configure PostgreSQL connectionvault write database/config/mydb \
plugin_name=postgresql-database-plugin \
connection_url="postgresql://{{username}}:{{password}}@postgres:5432/mydb"\
allowed_roles="readonly,readwrite"\
username="vault_admin"\
password="admin_password"# Create a role that generates credentialsvault write database/roles/readonly \
db_name=mydb \
creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";"\
default_ttl="1h"\
max_ttl="24h"
Application requests credentials:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
importhvacimportpsycopg2client=hvac.Client(url='https://vault:8200',token=os.environ['VAULT_TOKEN'])# Get dynamic credentials (created on demand)creds=client.secrets.database.generate_credentials(name='readonly')# Use themconn=psycopg2.connect(host='postgres',database='mydb',user=creds['data']['username'],password=creds['data']['password'])# Credentials auto-expire after TTL# Vault revokes them in the database
importboto3fromdatetimeimportdatetime,timedeltaclassAPIKeyRotator:def__init__(self,service_name):self.sm=boto3.client('secretsmanager')self.service_name=service_namedefrotate(self):"""Dual-key rotation pattern"""secret_id=f"api/{self.service_name}"# Get current keyscurrent=self.get_secret(secret_id)# Generate new key via APInew_key=self.create_api_key()# Update secret with both keys (overlap period)self.sm.put_secret_value(SecretId=secret_id,SecretString=json.dumps({"primary_key":new_key,"secondary_key":current.get("primary_key"),"rotated_at":datetime.utcnow().isoformat()}))# Wait for propagationtime.sleep(60)# Revoke old keyifcurrent.get("primary_key"):self.revoke_api_key(current["primary_key"])defget_current_key(self,secret_id):"""Application calls this - tries primary, falls back to secondary"""secret=self.get_secret(secret_id)try:# Try primaryself.validate_key(secret["primary_key"])returnsecret["primary_key"]exceptInvalidKeyError:# Fall back to secondary during rotationreturnsecret["secondary_key"]
-- Create new user
CREATEUSERapp_v2WITHPASSWORD'new_password';GRANTapp_roleTOapp_v2;-- Both users work during transition
-- Update application to use app_v2
-- After verification, drop old user
DROPUSERapp_v1;
fromprometheus_clientimportCounter,Gaugerotation_success=Counter('secret_rotation_success_total','Successful secret rotations',['secret_name'])rotation_failure=Counter('secret_rotation_failure_total','Failed secret rotations',['secret_name','step'])secret_age_seconds=Gauge('secret_age_seconds','Age of current secret version',['secret_name'])defrotate_with_metrics(secret_name,rotate_fn):try:rotate_fn()rotation_success.labels(secret_name).inc()secret_age_seconds.labels(secret_name).set(0)exceptExceptionase:rotation_failure.labels(secret_name,str(e)).inc()raise