Cloud bills grow faster than you’d expect. A few forgotten instances here, oversized databases there, and suddenly you’re spending more on infrastructure than engineering salaries. Let’s fix that.

Quick Wins: Find the Waste

Identify Unused Resources

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# find_waste.py
import boto3
from datetime import datetime, timedelta

ec2 = boto3.client('ec2')
cloudwatch = boto3.client('cloudwatch')

def find_idle_instances():
    """Find EC2 instances with <5% CPU over 7 days."""
    instances = ec2.describe_instances(
        Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
    )
    
    idle = []
    for reservation in instances['Reservations']:
        for instance in reservation['Instances']:
            instance_id = instance['InstanceId']
            
            # Get average CPU over 7 days
            response = cloudwatch.get_metric_statistics(
                Namespace='AWS/EC2',
                MetricName='CPUUtilization',
                Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
                StartTime=datetime.utcnow() - timedelta(days=7),
                EndTime=datetime.utcnow(),
                Period=86400,  # 1 day
                Statistics=['Average']
            )
            
            if response['Datapoints']:
                avg_cpu = sum(d['Average'] for d in response['Datapoints']) / len(response['Datapoints'])
                if avg_cpu < 5:
                    idle.append({
                        'instance_id': instance_id,
                        'type': instance['InstanceType'],
                        'avg_cpu': round(avg_cpu, 2),
                        'name': next((t['Value'] for t in instance.get('Tags', []) 
                                     if t['Key'] == 'Name'), 'unnamed')
                    })
    
    return idle

def find_unattached_volumes():
    """Find EBS volumes not attached to any instance."""
    volumes = ec2.describe_volumes(
        Filters=[{'Name': 'status', 'Values': ['available']}]
    )
    
    return [{
        'volume_id': v['VolumeId'],
        'size_gb': v['Size'],
        'monthly_cost': v['Size'] * 0.10  # gp2 pricing estimate
    } for v in volumes['Volumes']]

def find_old_snapshots():
    """Find snapshots older than 90 days."""
    snapshots = ec2.describe_snapshots(OwnerIds=['self'])
    cutoff = datetime.utcnow() - timedelta(days=90)
    
    old = []
    for snap in snapshots['Snapshots']:
        if snap['StartTime'].replace(tzinfo=None) < cutoff:
            old.append({
                'snapshot_id': snap['SnapshotId'],
                'size_gb': snap['VolumeSize'],
                'age_days': (datetime.utcnow() - snap['StartTime'].replace(tzinfo=None)).days
            })
    
    return old


# Run analysis
print("=== Idle Instances ===")
for i in find_idle_instances():
    print(f"  {i['instance_id']} ({i['type']}): {i['avg_cpu']}% avg CPU - {i['name']}")

print("\n=== Unattached Volumes ===")
for v in find_unattached_volumes():
    print(f"  {v['volume_id']}: {v['size_gb']}GB = ${v['monthly_cost']:.2f}/mo")

print("\n=== Old Snapshots ===")
for s in find_old_snapshots():
    print(f"  {s['snapshot_id']}: {s['size_gb']}GB, {s['age_days']} days old")

Automated Cleanup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# cleanup_automation.py
import boto3
from datetime import datetime, timedelta

def cleanup_old_amis(days_old: int = 180, dry_run: bool = True):
    """Deregister AMIs older than threshold and delete their snapshots."""
    ec2 = boto3.client('ec2')
    
    images = ec2.describe_images(Owners=['self'])
    cutoff = datetime.utcnow() - timedelta(days=days_old)
    
    for image in images['Images']:
        creation_date = datetime.strptime(image['CreationDate'][:10], '%Y-%m-%d')
        
        if creation_date < cutoff:
            ami_id = image['ImageId']
            snapshot_ids = [
                bdm['Ebs']['SnapshotId'] 
                for bdm in image.get('BlockDeviceMappings', [])
                if 'Ebs' in bdm
            ]
            
            if dry_run:
                print(f"Would delete AMI {ami_id} and snapshots {snapshot_ids}")
            else:
                ec2.deregister_image(ImageId=ami_id)
                for snap_id in snapshot_ids:
                    ec2.delete_snapshot(SnapshotId=snap_id)
                print(f"Deleted AMI {ami_id} and {len(snapshot_ids)} snapshots")

def stop_dev_instances_at_night():
    """Stop non-production instances outside business hours."""
    ec2 = boto3.resource('ec2')
    
    # Find instances tagged Environment=dev|staging
    instances = ec2.instances.filter(
        Filters=[
            {'Name': 'tag:Environment', 'Values': ['dev', 'staging']},
            {'Name': 'instance-state-name', 'Values': ['running']}
        ]
    )
    
    instance_ids = [i.id for i in instances]
    
    if instance_ids:
        ec2.instances.filter(InstanceIds=instance_ids).stop()
        print(f"Stopped {len(instance_ids)} dev/staging instances")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Lambda + EventBridge for scheduled cleanup
AWSTemplateFormatVersion: '2010-09-09'
Resources:
  StopDevInstancesFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: stop-dev-instances
      Runtime: python3.11
      Handler: index.handler
      Timeout: 60
      Role: !GetAtt LambdaRole.Arn
      Code:
        ZipFile: |
          import boto3
          def handler(event, context):
              ec2 = boto3.resource('ec2')
              instances = ec2.instances.filter(
                  Filters=[
                      {'Name': 'tag:AutoStop', 'Values': ['true']},
                      {'Name': 'instance-state-name', 'Values': ['running']}
                  ]
              )
              ids = [i.id for i in instances]
              if ids:
                  ec2.instances.filter(InstanceIds=ids).stop()
              return {'stopped': ids}

  StopSchedule:
    Type: AWS::Events::Rule
    Properties:
      ScheduleExpression: 'cron(0 22 ? * MON-FRI *)'  # 10 PM weekdays
      Targets:
        - Id: StopDevInstances
          Arn: !GetAtt StopDevInstancesFunction.Arn

Right-Sizing

Analyze and Recommend

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# rightsizing.py
import boto3

def get_rightsizing_recommendations():
    """Get AWS Compute Optimizer recommendations."""
    optimizer = boto3.client('compute-optimizer')
    
    response = optimizer.get_ec2_instance_recommendations()
    
    savings = []
    for rec in response.get('instanceRecommendations', []):
        current = rec['currentInstanceType']
        
        for option in rec.get('recommendationOptions', []):
            if option.get('rank') == 1:  # Top recommendation
                recommended = option['instanceType']
                monthly_savings = (
                    rec.get('utilizationMetrics', [{}])[0].get('value', 0) * 
                    option.get('projectedUtilizationMetrics', [{}])[0].get('value', 1)
                )
                
                savings.append({
                    'instance_id': rec['instanceArn'].split('/')[-1],
                    'current': current,
                    'recommended': recommended,
                    'finding': rec['finding'],
                    'estimated_savings': option.get('savingsOpportunity', {})
                })
    
    return savings

# Generate report
for rec in get_rightsizing_recommendations():
    print(f"{rec['instance_id']}: {rec['current']}{rec['recommended']}")
    print(f"  Finding: {rec['finding']}")
    if rec['estimated_savings']:
        print(f"  Potential savings: ${rec['estimated_savings'].get('estimatedMonthlySavings', {}).get('value', 0):.2f}/mo")

Reserved Instances & Savings Plans

Analyze Coverage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# reservation_analysis.py
import boto3
from collections import defaultdict

def analyze_reservation_coverage():
    """Analyze current RI/SP coverage and recommend purchases."""
    ce = boto3.client('ce')
    
    # Get coverage report
    response = ce.get_reservation_coverage(
        TimePeriod={
            'Start': '2026-01-01',
            'End': '2026-02-01'
        },
        Granularity='MONTHLY',
        GroupBy=[
            {'Type': 'DIMENSION', 'Key': 'INSTANCE_TYPE'}
        ]
    )
    
    recommendations = []
    for group in response.get('CoveragesByTime', [{}])[0].get('Groups', []):
        instance_type = group['Attributes']['instanceType']
        coverage = float(group['Coverage']['CoverageHours']['CoverageHoursPercentage'])
        on_demand_hours = float(group['Coverage']['CoverageHours']['OnDemandHours'])
        
        if coverage < 70 and on_demand_hours > 500:
            recommendations.append({
                'instance_type': instance_type,
                'current_coverage': coverage,
                'on_demand_hours': on_demand_hours,
                'recommendation': 'Consider Reserved Instances'
            })
    
    return recommendations

def get_savings_plan_recommendations():
    """Get Savings Plan purchase recommendations."""
    ce = boto3.client('ce')
    
    response = ce.get_savings_plans_purchase_recommendation(
        SavingsPlansType='COMPUTE_SP',
        TermInYears='ONE_YEAR',
        PaymentOption='NO_UPFRONT',
        LookbackPeriodInDays='THIRTY_DAYS'
    )
    
    rec = response.get('SavingsPlansPurchaseRecommendation', {})
    
    return {
        'recommended_hourly_commitment': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('RecommendedHourlyCommitment'),
        'estimated_monthly_savings': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedMonthlySavingsAmount'),
        'estimated_savings_percentage': rec.get('SavingsPlansPurchaseRecommendationSummary', {}).get('EstimatedSavingsPercentage')
    }

Spot Instances

Spot for Stateless Workloads

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Kubernetes with Spot instances via Karpenter
apiVersion: karpenter.sh/v1alpha5
kind: Provisioner
metadata:
  name: spot-provisioner
spec:
  requirements:
    - key: karpenter.sh/capacity-type
      operator: In
      values: ["spot"]
    - key: kubernetes.io/arch
      operator: In
      values: ["amd64"]
    - key: node.kubernetes.io/instance-type
      operator: In
      values: ["m5.large", "m5.xlarge", "m5a.large", "m5a.xlarge", "m6i.large"]
  
  # Spread across instance types for availability
  limits:
    resources:
      cpu: 1000
  
  # Handle interruptions gracefully
  ttlSecondsAfterEmpty: 30
  ttlSecondsUntilExpired: 2592000  # 30 days

---
# Deployment using spot nodes
apiVersion: apps/v1
kind: Deployment
metadata:
  name: worker
spec:
  replicas: 10
  template:
    spec:
      nodeSelector:
        karpenter.sh/capacity-type: spot
      
      # Handle spot interruptions
      terminationGracePeriodSeconds: 120
      
      containers:
      - name: worker
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 90"]  # Drain gracefully

Spot Interruption Handling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# spot_interruption_handler.py
import requests
import time
import signal
import sys

METADATA_URL = "http://169.254.169.254/latest/meta-data"

def check_spot_interruption():
    """Check if this spot instance is being interrupted."""
    try:
        response = requests.get(
            f"{METADATA_URL}/spot/instance-action",
            timeout=1
        )
        if response.status_code == 200:
            return response.json()
    except:
        pass
    return None

def graceful_shutdown():
    """Handle graceful shutdown on interruption."""
    print("Spot interruption detected, starting graceful shutdown...")
    # Stop accepting new work
    # Finish current tasks
    # Checkpoint state
    # Clean up
    sys.exit(0)

def monitor_interruption():
    """Monitor for spot interruption (2 minute warning)."""
    while True:
        interruption = check_spot_interruption()
        if interruption:
            print(f"Interruption notice: {interruption}")
            graceful_shutdown()
        time.sleep(5)

# Run in background thread
import threading
threading.Thread(target=monitor_interruption, daemon=True).start()

Storage Optimization

S3 Lifecycle Policies

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# s3_lifecycle.py
import boto3

def configure_lifecycle_rules(bucket_name: str):
    """Configure intelligent tiering and expiration."""
    s3 = boto3.client('s3')
    
    lifecycle_config = {
        'Rules': [
            {
                'ID': 'intelligent-tiering',
                'Status': 'Enabled',
                'Filter': {'Prefix': ''},
                'Transitions': [
                    {
                        'Days': 0,
                        'StorageClass': 'INTELLIGENT_TIERING'
                    }
                ]
            },
            {
                'ID': 'archive-old-logs',
                'Status': 'Enabled',
                'Filter': {'Prefix': 'logs/'},
                'Transitions': [
                    {'Days': 30, 'StorageClass': 'STANDARD_IA'},
                    {'Days': 90, 'StorageClass': 'GLACIER'},
                ],
                'Expiration': {'Days': 365}
            },
            {
                'ID': 'cleanup-incomplete-uploads',
                'Status': 'Enabled',
                'Filter': {'Prefix': ''},
                'AbortIncompleteMultipartUpload': {
                    'DaysAfterInitiation': 7
                }
            }
        ]
    }
    
    s3.put_bucket_lifecycle_configuration(
        Bucket=bucket_name,
        LifecycleConfiguration=lifecycle_config
    )

Cost Monitoring

Budget Alerts

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# CloudFormation budget
AWSTemplateFormatVersion: '2010-09-09'
Resources:
  MonthlyBudget:
    Type: AWS::Budgets::Budget
    Properties:
      Budget:
        BudgetName: monthly-infrastructure
        BudgetLimit:
          Amount: 10000
          Unit: USD
        TimeUnit: MONTHLY
        BudgetType: COST
      NotificationsWithSubscribers:
        - Notification:
            NotificationType: ACTUAL
            ComparisonOperator: GREATER_THAN
            Threshold: 80
          Subscribers:
            - SubscriptionType: EMAIL
              Address: ops@example.com
        - Notification:
            NotificationType: FORECASTED
            ComparisonOperator: GREATER_THAN
            Threshold: 100
          Subscribers:
            - SubscriptionType: SNS
              Address: !Ref AlertTopic

Daily Cost Report

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# daily_cost_report.py
import boto3
from datetime import datetime, timedelta

def get_daily_costs():
    """Get costs broken down by service."""
    ce = boto3.client('ce')
    
    end = datetime.utcnow().date()
    start = end - timedelta(days=7)
    
    response = ce.get_cost_and_usage(
        TimePeriod={
            'Start': start.isoformat(),
            'End': end.isoformat()
        },
        Granularity='DAILY',
        Metrics=['UnblendedCost'],
        GroupBy=[
            {'Type': 'DIMENSION', 'Key': 'SERVICE'}
        ]
    )
    
    report = []
    for day in response['ResultsByTime']:
        date = day['TimePeriod']['Start']
        for group in day['Groups']:
            service = group['Keys'][0]
            cost = float(group['Metrics']['UnblendedCost']['Amount'])
            if cost > 1:  # Only show significant costs
                report.append({
                    'date': date,
                    'service': service,
                    'cost': cost
                })
    
    return report

# Generate and send report
for item in sorted(get_daily_costs(), key=lambda x: -x['cost'])[:10]:
    print(f"{item['date']} | {item['service']}: ${item['cost']:.2f}")

Checklist

  • Enable Cost Explorer and set up budgets
  • Tag all resources for cost allocation
  • Review and act on right-sizing recommendations monthly
  • Implement lifecycle policies for S3 and EBS snapshots
  • Use Spot for fault-tolerant workloads
  • Purchase Savings Plans for baseline compute
  • Stop dev/staging resources outside business hours
  • Clean up unused resources weekly
  • Review Reserved Instance coverage quarterly

Cloud cost optimization isn’t a one-time project—it’s an ongoing practice. Automate the easy wins, review monthly, and treat your cloud bill like any other metric that matters.