Cloud Cost Optimization: Cutting AWS Bills Without Cutting Corners

Cloud bills grow faster than you’d expect. A few forgotten instances here, oversized databases there, and suddenly you’re spending more on infrastructure than engineering salaries. Let’s fix that. Quick Wins: Find the Waste Identify Unused Resources 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 # find_waste.py import boto3 from datetime import datetime, timedelta ec2 = boto3.client('ec2') cloudwatch = boto3.client('cloudwatch') def find_idle_instances(): """Find EC2 instances with <5% CPU over 7 days.""" instances = ec2.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) idle = [] for reservation in instances['Reservations']: for instance in reservation['Instances']: instance_id = instance['InstanceId'] # Get average CPU over 7 days response = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=datetime.utcnow() - timedelta(days=7), EndTime=datetime.utcnow(), Period=86400, # 1 day Statistics=['Average'] ) if response['Datapoints']: avg_cpu = sum(d['Average'] for d in response['Datapoints']) / len(response['Datapoints']) if avg_cpu < 5: idle.append({ 'instance_id': instance_id, 'type': instance['InstanceType'], 'avg_cpu': round(avg_cpu, 2), 'name': next((t['Value'] for t in instance.get('Tags', []) if t['Key'] == 'Name'), 'unnamed') }) return idle def find_unattached_volumes(): """Find EBS volumes not attached to any instance.""" volumes = ec2.describe_volumes( Filters=[{'Name': 'status', 'Values': ['available']}] ) return [{ 'volume_id': v['VolumeId'], 'size_gb': v['Size'], 'monthly_cost': v['Size'] * 0.10 # gp2 pricing estimate } for v in volumes['Volumes']] def find_old_snapshots(): """Find snapshots older than 90 days.""" snapshots = ec2.describe_snapshots(OwnerIds=['self']) cutoff = datetime.utcnow() - timedelta(days=90) old = [] for snap in snapshots['Snapshots']: if snap['StartTime'].replace(tzinfo=None) < cutoff: old.append({ 'snapshot_id': snap['SnapshotId'], 'size_gb': snap['VolumeSize'], 'age_days': (datetime.utcnow() - snap['StartTime'].replace(tzinfo=None)).days }) return old # Run analysis print("=== Idle Instances ===") for i in find_idle_instances(): print(f" {i['instance_id']} ({i['type']}): {i['avg_cpu']}% avg CPU - {i['name']}") print("\n=== Unattached Volumes ===") for v in find_unattached_volumes(): print(f" {v['volume_id']}: {v['size_gb']}GB = ${v['monthly_cost']:.2f}/mo") print("\n=== Old Snapshots ===") for s in find_old_snapshots(): print(f" {s['snapshot_id']}: {s['size_gb']}GB, {s['age_days']} days old") Automated Cleanup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 # cleanup_automation.py import boto3 from datetime import datetime, timedelta def cleanup_old_amis(days_old: int = 180, dry_run: bool = True): """Deregister AMIs older than threshold and delete their snapshots.""" ec2 = boto3.client('ec2') images = ec2.describe_images(Owners=['self']) cutoff = datetime.utcnow() - timedelta(days=days_old) for image in images['Images']: creation_date = datetime.strptime(image['CreationDate'][:10], '%Y-%m-%d') if creation_date < cutoff: ami_id = image['ImageId'] snapshot_ids = [ bdm['Ebs']['SnapshotId'] for bdm in image.get('BlockDeviceMappings', []) if 'Ebs' in bdm ] if dry_run: print(f"Would delete AMI {ami_id} and snapshots {snapshot_ids}") else: ec2.deregister_image(ImageId=ami_id) for snap_id in snapshot_ids: ec2.delete_snapshot(SnapshotId=snap_id) print(f"Deleted AMI {ami_id} and {len(snapshot_ids)} snapshots") def stop_dev_instances_at_night(): """Stop non-production instances outside business hours.""" ec2 = boto3.resource('ec2') # Find instances tagged Environment=dev|staging instances = ec2.instances.filter( Filters=[ {'Name': 'tag:Environment', 'Values': ['dev', 'staging']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) instance_ids = [i.id for i in instances] if instance_ids: ec2.instances.filter(InstanceIds=instance_ids).stop() print(f"Stopped {len(instance_ids)} dev/staging instances") 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 # Lambda + EventBridge for scheduled cleanup AWSTemplateFormatVersion: '2010-09-09' Resources: StopDevInstancesFunction: Type: AWS::Lambda::Function Properties: FunctionName: stop-dev-instances Runtime: python3.11 Handler: index.handler Timeout: 60 Role: !GetAtt LambdaRole.Arn Code: ZipFile: | import boto3 def handler(event, context): ec2 = boto3.resource('ec2') instances = ec2.instances.filter( Filters=[ {'Name': 'tag:AutoStop', 'Values': ['true']}, {'Name': 'instance-state-name', 'Values': ['running']} ] ) ids = [i.id for i in instances] if ids: ec2.instances.filter(InstanceIds=ids).stop() return {'stopped': ids} StopSchedule: Type: AWS::Events::Rule Properties: ScheduleExpression: 'cron(0 22 ? * MON-FRI *)' # 10 PM weekdays Targets: - Id: StopDevInstances Arn: !GetAtt StopDevInstancesFunction.Arn Right-Sizing Analyze and Recommend 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 # rightsizing.py import boto3 def get_rightsizing_recommendations(): """Get AWS Compute Optimizer recommendations.""" optimizer = boto3.client('compute-optimizer') response = optimizer.get_ec2_instance_recommendations() savings = [] for rec in response.get('instanceRecommendations', []): current = rec['currentInstanceType'] for option in rec.get('recommendationOptions', []): if option.get('rank') == 1: # Top recommendation recommended = option['instanceType'] monthly_savings = ( rec.get('utilizationMetrics', [{}])[0].get('value', 0) * option.get('projectedUtilizationMetrics', [{}])[0].get('value', 1) ) savings.append({ 'instance_id': rec['instanceArn'].split('/')[-1], 'current': current, 'recommended': recommended, 'finding': rec['finding'], 'estimated_savings': option.get('savingsOpportunity', {}) }) return savings # Generate report for rec in get_rightsizing_recommendations(): print(f"{rec['instance_id']}: {rec['current']} → {rec['recommended']}") print(f" Finding: {rec['finding']}") if rec['estimated_savings']: print(f" Potential savings: ${rec['estimated_savings'].get('estimatedMonthlySavings', {}).get('value', 0):.2f}/mo") Reserved Instances & Savings Plans Analyze Coverage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 # reservation_analysis.py import boto3 from collections import defaultdict def analyze_reservation_coverage(): """Analyze current RI/SP coverage and recommend purchases.""" ce = boto3.client('ce') # Get coverage report response = ce.get_reservation_coverage( TimePeriod={ 'Start': '2026-01-01', 'End': '2026-02-01' }, Granularity='MONTHLY', GroupBy=[ {'Type': 'DIMENSION', 'Key': 'INSTANCE_TYPE'} ] ) recommendations = [] for group in response.get('CoveragesByTime', [{}])[0].get('Groups', []): instance_type = group['Attributes']['instanceType'] coverage = float(group['Coverage']['CoverageHours']['CoverageHoursPercentage']) on_demand_hours = float(group['Coverage']['CoverageHours']['OnDemandHours']) if coverage < 70 and on_demand_hours > 500: recommendations.append({ 'instance_type': instance_type, 'current_coverage': coverage, 'on_demand_hours': on_demand_hours, 'recommendation': 'Consider Reserved Instances' }) return recommendations def get_savings_plan_recommendations(): """Get Savings Plan purchase recommendations.""" ce = boto3.client('ce') response = ce.get_savings_plans_purchase_recommendation( SavingsPlansType='COMPUTE_SP', TermInYears='ONE_YEAR', PaymentOption='NO_UPFRONT', LookbackPeriodInDays='THIRTY_DAYS' ) rec = response.get('SavingsPlansPurchaseRecommendation', {}) return { 'recommended_hourly_commitment': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('RecommendedHourlyCommitment'), 'estimated_monthly_savings': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedMonthlySavingsAmount'), 'estimated_savings_percentage': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedSavingsPercentage') } Spot Instances Spot for Stateless Workloads 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 # Kubernetes with Spot instances via Karpenter apiVersion: karpenter.sh/v1alpha5 kind: Provisioner metadata: name: spot-provisioner spec: requirements: - key: karpenter.sh/capacity-type operator: In values: ["spot"] - key: kubernetes.io/arch operator: In values: ["amd64"] - key: node.kubernetes.io/instance-type operator: In values: ["m5.large", "m5.xlarge", "m5a.large", "m5a.xlarge", "m6i.large"] # Spread across instance types for availability limits: resources: cpu: 1000 # Handle interruptions gracefully ttlSecondsAfterEmpty: 30 ttlSecondsUntilExpired: 2592000 # 30 days --- # Deployment using spot nodes apiVersion: apps/v1 kind: Deployment metadata: name: worker spec: replicas: 10 template: spec: nodeSelector: karpenter.sh/capacity-type: spot # Handle spot interruptions terminationGracePeriodSeconds: 120 containers: - name: worker lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 90"] # Drain gracefully Spot Interruption Handling 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # spot_interruption_handler.py import requests import time import signal import sys METADATA_URL = "http://169.254.169.254/latest/meta-data" def check_spot_interruption(): """Check if this spot instance is being interrupted.""" try: response = requests.get( f"{METADATA_URL}/spot/instance-action", timeout=1 ) if response.status_code == 200: return response.json() except: pass return None def graceful_shutdown(): """Handle graceful shutdown on interruption.""" print("Spot interruption detected, starting graceful shutdown...") # Stop accepting new work # Finish current tasks # Checkpoint state # Clean up sys.exit(0) def monitor_interruption(): """Monitor for spot interruption (2 minute warning).""" while True: interruption = check_spot_interruption() if interruption: print(f"Interruption notice: {interruption}") graceful_shutdown() time.sleep(5) # Run in background thread import threading threading.Thread(target=monitor_interruption, daemon=True).start() Storage Optimization S3 Lifecycle Policies 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 # s3_lifecycle.py import boto3 def configure_lifecycle_rules(bucket_name: str): """Configure intelligent tiering and expiration.""" s3 = boto3.client('s3') lifecycle_config = { 'Rules': [ { 'ID': 'intelligent-tiering', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'Transitions': [ { 'Days': 0, 'StorageClass': 'INTELLIGENT_TIERING' } ] }, { 'ID': 'archive-old-logs', 'Status': 'Enabled', 'Filter': {'Prefix': 'logs/'}, 'Transitions': [ {'Days': 30, 'StorageClass': 'STANDARD_IA'}, {'Days': 90, 'StorageClass': 'GLACIER'}, ], 'Expiration': {'Days': 365} }, { 'ID': 'cleanup-incomplete-uploads', 'Status': 'Enabled', 'Filter': {'Prefix': ''}, 'AbortIncompleteMultipartUpload': { 'DaysAfterInitiation': 7 } } ] } s3.put_bucket_lifecycle_configuration( Bucket=bucket_name, LifecycleConfiguration=lifecycle_config ) Cost Monitoring Budget Alerts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # CloudFormation budget AWSTemplateFormatVersion: '2010-09-09' Resources: MonthlyBudget: Type: AWS::Budgets::Budget Properties: Budget: BudgetName: monthly-infrastructure BudgetLimit: Amount: 10000 Unit: USD TimeUnit: MONTHLY BudgetType: COST NotificationsWithSubscribers: - Notification: NotificationType: ACTUAL ComparisonOperator: GREATER_THAN Threshold: 80 Subscribers: - SubscriptionType: EMAIL Address: ops@example.com - Notification: NotificationType: FORECASTED ComparisonOperator: GREATER_THAN Threshold: 100 Subscribers: - SubscriptionType: SNS Address: !Ref AlertTopic Daily Cost Report 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 # daily_cost_report.py import boto3 from datetime import datetime, timedelta def get_daily_costs(): """Get costs broken down by service.""" ce = boto3.client('ce') end = datetime.utcnow().date() start = end - timedelta(days=7) response = ce.get_cost_and_usage( TimePeriod={ 'Start': start.isoformat(), 'End': end.isoformat() }, Granularity='DAILY', Metrics=['UnblendedCost'], GroupBy=[ {'Type': 'DIMENSION', 'Key': 'SERVICE'} ] ) report = [] for day in response['ResultsByTime']: date = day['TimePeriod']['Start'] for group in day['Groups']: service = group['Keys'][0] cost = float(group['Metrics']['UnblendedCost']['Amount']) if cost > 1: # Only show significant costs report.append({ 'date': date, 'service': service, 'cost': cost }) return report # Generate and send report for item in sorted(get_daily_costs(), key=lambda x: -x['cost'])[:10]: print(f"{item['date']} | {item['service']}: ${item['cost']:.2f}") Checklist Enable Cost Explorer and set up budgets Tag all resources for cost allocation Review and act on right-sizing recommendations monthly Implement lifecycle policies for S3 and EBS snapshots Use Spot for fault-tolerant workloads Purchase Savings Plans for baseline compute Stop dev/staging resources outside business hours Clean up unused resources weekly Review Reserved Instance coverage quarterly Cloud cost optimization isn’t a one-time project—it’s an ongoing practice. Automate the easy wins, review monthly, and treat your cloud bill like any other metric that matters. ...

February 11, 2026 · 9 min · 1709 words · Rob Washington