import boto3
import os
from typing import List, Sequence, Union
from datetime import datetime
from dateutil import parser
import csv
from io import StringIO
import re

# Constants
NOT_FOUND = ‘not_found’

def ensure_datetime(date) -> datetime:
try:
return parser.parse(date)
except Exception:
return date

def list_rds_instances(session: boto3.Session):
rds = session.client(‘rds’)
paginator = rds.get_paginator(‘describe_db_instances’)
for pag in paginator.paginate():
for instance in pag[‘DBInstances’]:
yield instance

def list_secrets(session: boto3.Session):
sm = session.client(‘secretsmanager’)
paginator = sm.get_paginator(‘list_secrets’)
for pag in paginator.paginate():
for instance in pag[‘SecretList’]:
yield instance

class Secret:
name: str
arn: str
last_changed_date: datetime
last_accessed_date: datetime
created_date: datetime

def __init__(
self,
name: str,
arn: str,
last_changed_date: Union[str, datetime],
last_accessed_date: Union[str, datetime],
created_date: Union[str, datetime],
) -> None:

self.name = name
self.arn = arn
self.last_changed_date = ensure_datetime(last_changed_date)
self.last_accessed_date = ensure_datetime(last_accessed_date)
self.created_date = ensure_datetime(created_date)

def to_dict(self) -> dict:
return {
k: str(v)
for k, v in self.__dict__.items()
}

class Database:
arn: str
identifier: str
engine: str
status: str
username: str
secret: Secret
address: str
port: int

def __init__(
self,
arn: str,
identifier: str,
engine: str,
status: str,
username: str,
address: str,
port: int
) -> None:

self.arn = arn
self.identifier = identifier
self.engine = engine
self.status = status
self.username = username
self.address = address
self.port = port
self.secret = None

def to_dict(self) -> dict:
e = {
‘db_identifier’: self.identifier,
‘db_arn’: self.arn,
‘db_master_username’: self.username,
‘db_engine’: self.engine,
‘db_status’: self.status,
‘db_address’: self.address,
‘db_port’: self.port
}
try:
e[‘secret_arn’] = self.secret.arn
except Exception:
e[‘secret_arn’] = NOT_FOUND

return e

class DBCollection:
databases: List[Database]

def __init__(self):
self.__dbs: List[Database] = []

@property
def databases(self):
return self.__dbs

@classmethod
def __make_db(cls, instance: dict) -> Database:
return Database(
arn=instance[‘DBInstanceArn’],
identifier=instance[‘DBInstanceIdentifier’],
engine=instance[‘Engine’],
status=instance[‘DBInstanceStatus’],
username=instance[‘MasterUsername’],
address=instance[‘Endpoint’][‘Address’],
port=instance[‘Endpoint’][‘Port’],
)

def load_instances(self, instances: Sequence[dict]):
for instance in instances:
print(instance)
db = self.__make_db(instance)
self.__dbs.append(db)

class SecretCollection:
secrets: List[Secret]

def __init__(self):
self.__secrets: List[Secret] = []

@property
def secrets(self):
return self.__secrets

@classmethod
def __make_secret(self, instance: dict) -> Secret:
return Secret(
name=instance[‘Name’],
arn=instance[‘ARN’],
last_changed_date=instance[‘LastChangedDate’],
last_accessed_date=instance.get(‘LastAccessedDate’, None),
created_date=instance[‘CreatedDate’],
)

def load_secrets(self, secrets: Sequence[dict]):
for si in secrets:
secret = self.__make_secret(si)
self.__secrets.append(secret)

def get_secret_for_db(self, db: Database) -> Secret:

# “””
# Reference
# https://gitcorp.prod.aws.cloud.ihf/cloud-publica/infrastructure/inner-source/aws/terraform/dados/core-terraform
# Secret Naming Pattern
# Mysql -> “${var.instance_name}-awssae1dbmy-${var.environment}-RDSInstancePasswordSecret”
# Mssql -> “RDSInstancePasswordSecret-${var.instance_name}-aws-${data.aws_region.current.name}-dbsql-${var.environment}-${random_string.random_id.result}”
# Aurora Postgresql -> “${local.cluster_identifier}-RDSInstancePasswordSecret-${random_string.random_id.result}”
# Aurora Mysql -> “${local.cluster_identifier}-RDSInstancePasswordSecret-${random_string.random_id.result}”
# “””

for s in self.__secrets:
if ‘Password’ in s.name and re.sub(“-(\d+)$”, “”, db.identifier) in s.name:
db.secret = s
return s

def generate_csv_report(dbs: DBCollection) -> str:

if len(dbs.databases) == 0:
return None

fieldnames = list(dbs.databases[0].to_dict().keys())

buffer = StringIO()

writer = csv.DictWriter(buffer, fieldnames=fieldnames)
writer.writeheader()

for db in dbs.databases:
writer.writerow(db.to_dict())

buffer.seek(0)
return buffer.read()

def upload_report(bucket: str, key: str, body: str):
s3 = boto3.client(‘s3’)
if body is not None:
print(“\n Databases identificados… iniciando o upload do CSV para o bucket”)
s3.put_object(Bucket=bucket, Key=key, Body=body)
else:
print(“\n Nenhum database RDS foi identificado. Nenhum arquivo será gerado.”)

def process(account_id: str, output_bucket: str):
session = boto3.Session()
secrets = SecretCollection()
dbs = DBCollection()

secret_list = list_secrets(session)
secrets.load_secrets(secret_list)
db_list = list_rds_instances(session)
dbs.load_instances(db_list)

for database in dbs.databases:
secrets.get_secret_for_db(database)

report = generate_csv_report(dbs)
prefix = datetime.now().strftime(‘%Y/%m/%d’)
key = f'{prefix}/report-{account_id}.csv’
upload_report(output_bucket, key, report)

def handler(event, context):

bucket = ‘tdi-rds-reports’ #os.environ.get(‘BUCKET_NAME’)
account_id = context.invoked_function_arn.split(‘:’)[4]

process(account_id, bucket)

Back to Top
Produto adicionado ao carrinho