*** Below are some of the APIs listed in the "AWS SDK for Python" that are related to boto3 (Python library) that I often use. *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html
*** Please read the resource name and various setting values as appropriate ~ ***
Athena
# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")
class QueryFailed(Exception):
"""
Exception class called when Athena query execution fails
"""
pass
#do query as a async process
start_query_response = athena.start_query_execution(
QueryString = 'Query',
QueryExecutionContext={
"Database": "Your GlueDB name"
},
ResultConfiguration={
"OutputLocation" : "s3://xxx/yyy"
}
)
query_execution_id = start_query_response["QueryExecutionId"]
#Check the query status
while True:
query_status = athena.get_query_execution(
QueryExecutionId = query_execution_id
)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
break
elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
raise QueryFailed(f"query_execution_status = {query_execution_status}")
else:
time.sleep(10)
#Get query result. Just only for your query successed
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)
CloudWatchLogs
# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
from logging import getLogger, StreamHandler, DEBUG, INFO, WARNING, ERROR, CRITICAL
import traceback
#logger setting
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(os.getenv("LogLevel", DEBUG))
logger.addHandler(handler)
logger.propagate = False
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])
#define timezone as JST
JST = timezone(timedelta(hours=9),"JST")
#DateFormat when you log out into S3
DATE_FORMAT = "%Y-%m-%d"
def lambda_handler(event, context):
"""
Output one day's worth of CloudWatch Logs to S3.
The target time is as follows.
AM 00:00:00.000000 ~ PM 23:59:59.999999
"""
try:
#Yesterday PM23:59:59.999999
tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
#Yesterday AM00:00:00.000000
tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
#Used as a prefix when outputting S3 logs
target_date = tmp_yesterday.strftime(DATE_FORMAT)
#Convert to time stamp type for log output (take up to microseconds)
today = int(tmp_today.timestamp() * 1000)
yesterday = int(tmp_yesterday.timestamp() * 1000)
#Get the CloudWatchLogGroups from your environment variable
logGroups = os.environ["LOG_GROUPS"].split(",")
for logGroupName in logGroups:
try:
keys = ["logGroupName","yesterday","today","target_date"]
values = [logGroupName,yesterday,today,target_date]
payload = dict(zip(keys,values))
#Output CloudWatchLogs into S3 (Async Process)
response = logs.create_export_task(
logGroupName = payload["logGroupName"],
fromTime = payload["yesterday"],
to = payload["today"],
destination = BUCKET_NAME,
destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
)
taskId = response["taskId"]
while True:
response = logs.describe_export_tasks(
taskId = taskId
)
status = response["exportTasks"][0]["status"]["code"]
if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
logger.info(f"taskId {taskId} has finished exporting")
break
else:
logger.info(f"taskId {taskId} is now exporting")
time.sleep(WAITING_TIME)
continue
except Exception as e:
traceback.print_exc()
logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)
except Exception as e:
traceback.print_exc()
logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
raise
DynamoDB
# -*- coding: utf-8 -*-
import boto3
from boto3.dynamodb.conditions import Key,Attr
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')
"""
DML select update insert delete
"""
#get a record
response = table.get_item(
Key = {
"id" : "1"
}
)
#write a record
response = table.put_item(
Item = {
"id" : "1",
"key" : "value"
}
)
#update a record
response = table.update_item(
Key = {
"id" : "1"
},
UpdateExpression = "set #age = :val",
ExpressionAttributeNames = {
"#age" : "age"
},
ExpressionAttributeValues = {
":val" : 95
},
ReturnValues = "UPDATED_NEW"
)
#delete a record
response = table.delete_item(
Key = {
"id" : "1"
}
)
#write records as a batch process
with table.batch_writer() as batch:
for i in range(10 ** 6):
batch.put_item(
Item = {
"id" : str(i + 1),
"key" : f"key{i + 1}"
}
)
# truncate or delete from table
delete_items = []
parameters = {}
while True:
response = table.scan(**parameters)
delete_items.extend(response["Items"])
if "LastEvaluatedKey" in response:
parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
else:
break
# get the hash key and range key
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#delete datas as batch process
with table.batch_writer() as batch:
for key in delete_keys:
batch.delete_item(Key = key)
"""
DDL Table Process
"""
#Delete a table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
for table_name in response['TableNames']:
if table_name == "Table name to be deleted":
dynamodb_client.delete_table(TableName = table_name)
waiter = dynamodb_client.get_waiter("table_not_exists")
waiter.wait(TableName = table_name)
#If you have Target Tracking type Auto Scaling, you can delete CloudWatch Alarm at the same time by deleting ScalingPolicy.
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}ReadCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}WriteCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
#Create a table
table_name = "table"
dynamodb.create_table(
TableName = table_name,
KeySchema = [{
"AttributeName" : "id",
"KeyType" : "HASH"
}],
AttributeDefinitions = [{
"AttributeName" : "id",
"AttributeType" : "S"
}],
ProvisionedThroughput = {
"ReadCapacityUnits" : 1,
"WriteCapacityUnits" : 1
}
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
# AutoScaling setting of Target Tracking
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "IAM Role ARN for Auto Scaling"
)
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "IAM Role ARN for Auto Scaling"
)
# set up the autoscaling policy
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}ReadCapacity",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBReadCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}WriteCapacity",
ScalableDimension='dynamodb:table:WriteCapacityUnits',
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
#update the table scheme
response = dynamodb_client.update_table(
AttributeDefinitions = [
{
'AttributeName': 'string',
'AttributeType': 'S'|'N'|'B'
},
],
TableName = 'string',
BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
ProvisionedThroughput = {
'ReadCapacityUnits': 123,
'WriteCapacityUnits': 123
}
)
EMR
# -*- coding: utf-8 -*-
import boto3
#EMR
emr = boto3.client("emr")
#show cluster list
cluster_list = emr.list_clusters(
ClusterStates = ["STARTING","BOOTSTRAPPING","RUNNING","WAITING"]
)
#Set up Cluster
params = {
#https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow
}
response = emr.run_job_flow(**params)
#describe cluster
response = emr.describe_cluster(
ClusterId = "The identifier of the cluster to describe."
)
#Add steps
step = {
#https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.add_job_flow_steps
}
response = emr.add_job_flow_steps(
JobFlowId = "A string that uniquely identifies the job flow. This identifier is returned by RunJobFlow and can also be obtained from ListClusters .",
Steps = [step]
)
Kinesis Data Stream
# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")
#Data transmission
kinesis.put_records(
Records = [
{
"Data" : b"String",
"PartitionKey" : "String"
}
],
StreamName = "Kinesis Data Stream Name"
)
Lambda
# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')
#Invoke Lambda
response = lambda_client.invoke(
FunctionName = 'Lambda Function Name',
InvocationType = 'Event'|'RequestResponse'|'DryRun',
LogType = 'None'|'Tail',
Payload = b'bytes'|file
)
#Event Source Mappings enable or disable
import time
expected_status = "Enabled" #when disable, Disabled
progressing_status = "Enabling" #when disable, Disabling
Enabled = True #when disable, False
response = lambda_client.list_event_source_mappings(
EventSourceArn = "Kinesis or DynamoDB ARN",
FunctionName = "Lambda Function name"
)
if "EventSourceMappings" in response:
for e in response["EventSourceMappings"]:
if e["State"] != expected_status or e["State"] != progressing_status:
response = lambda_client.update_event_source_mapping(
UUID = e["UUID"],
FunctionName = e["FunctionArn"],
Enabled = Enabled,
BatchSize = 100
)
if response["State"] != expected_status:
while True:
response = lambda_client.get_event_source_mapping(
UUID = e["UUID"]
)
if response["State"] == expected_status:
break
time.sleep(10)
SageMaker
# -*- coding: utf-8 -*-
import boto3
sagemaker = boto3.client("sagemaker-runtime")
#Invoke Endpoint and get a predicted result
response = sagemaker.invoke_endpoint(
EndpointName = "SageMaker Endpoint Name",
Body=b'bytes'|file,
ContentType = 'text/csv', #The MIME type of the input data in the request body.
Accept = 'application/json' #The desired MIME type of the inference in the response.
)
SQS
# -*- coding: utf-8 -*-
import boto3
sqs = boto3.client('sqs')
QUEUE_URL= "SQS Queue URL"
#Get all messages from Queue of SQS
while True:
sqs_message = sqs.receive_message(
QueueUrl = QUEUE_URL,
MaxNumberOfMessages = 10
)
if "Messages" in sqs_message:
for message in sqs_message["Messages"]:
try:
print(message)
#When you get a message, delete it. Otherwise, You get a duplicate message
sqs.delete_message(
QueueUrl = QUEUE_URL,
ReceiptHandle = message["ReceiptHandle"]
)
except Exception as e:
print(f"type = {type(e)} , message = {e}")
else:
break
SSM
# -*- coding: utf-8 -*-
import boto3
ssm = boto3.client('ssm')
parameters = "your SSM parameter"
response = ssm.get_parameters(
Names=[
parameters,
],
#Return decrypted values for secure string parameters. This flag is ignored for String and StringList parameter types.
WithDecryption = True
)
print(response['Parameters'][0]['Value'])
S3
# -*- coding: utf-8 -*-
import boto3
#kms
kms = boto3.client("kms")
#s3
s3 = boto3.client('s3')
BUCKET_NAME = "Bucket name"
"""
One object Process
"""
#Write object
s3.put_object(
Bucket = BUCKET_NAME,
Body = "test body".encode("UTF-8"),
Key = "S3 key s3://Bucket Name/Subsequent directories and file names"
)
#Read object
s3.get_object(
Bucket = BUCKET_NAME,
Key = "S3 key s3://Bucket Name/Subsequent directories and file names"
)["Body"].read().decode("UTF-8")
#Delete object
s3.delete_object(
Bucket = BUCKET_NAME,
Key = "S3 key s3://Bucket Name/Subsequent directories and file names"
)
#copy object
s3.copy_object(
Bucket = BUCKET_NAME,
Key = "Destination S3 key. s3:://Bucket name/Subsequent directories and file names",
CopySource = {
"Bucket" : "Source bucket name",
"Key" : "Migration source S3 key"
}
)
"""
Multiple Objects Process
"""
#Get all objects under the specified s3 prefix
contents = []
kwargs = {
"Bucket" : BUCKET_NAME,
"Prefix" : "Search target prefix"
}
while True:
response = s3.list_objects_v2(**kwargs)
if "Contents" in response:
contents.extend(response["Contents"])
if 'NextContinuationToken' in response:
kwargs["ContinuationToken"] = response['NextContinuationToken']
continue
break
"""
Server Side Encryption
1. Default Encryption
1-1. SSE with AES-256
1-2. SSE with KMS AWS Managed Keys
1-3. SSE with KMS CMK(Customer Managed Keys)
2. Non Default Encryption
2-1. SSE with AES-256
2-2. SSE with KMS AWS Managed Keys
2-3. SSE with KMS CMK(Customer Managed Keys)
2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
"""
#1-1. SSE with AES-256
#1-2. SSE with KMS AWS Managed Keys
#1-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
Bucket = BUCKET_NAME,
Key = "test",
Body = "Encrypted".encode("UTF-8")
)
print(f'ServerSideEncryption'.ljust(20) + f' = {response["ServerSideEncryption"]}')
#just only for KMS. check the KeyManager
if response["ServerSideEncryption"] == "aws:kms":
KeyManager = kms.describe_key(
KeyId = response["SSEKMSKeyId"]
)["KeyMetadata"]["KeyManager"]
print(f"KeyManager".ljust(20) + f" = {KeyManager}")
#2-1. SSE with AES-256
response = s3.put_object(
Bucket = BUCKET_NAME,
Key = "test",
Body = "Encrypted".encode("UTF-8"),
ServerSideEncryption = "AES256"
)
#2-2. SSE with KMS AWS Managed Keys
response = s3.put_object(
Bucket = BUCKET_NAME,
Key = "test",
Body = "Encrypted".encode("UTF-8"),
ServerSideEncryption = "aws:kms"
)
#2-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
Bucket = BUCKET_NAME,
Key = "test",
Body = "Encrypted".encode("UTF-8"),
ServerSideEncryption = "aws:kms",
SSEKMSKeyId = "Your Customer Manged Key ID"
)
#2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
response = s3.put_object(
Bucket = BUCKET_NAME,
Key = "test",
Body = "Encrypted".encode("UTF-8"),
SSECustomerAlgorithm = "AES256",
SSECustomerKey = "The Key You generated. ex) SSE_CUSTOMER_KEY=$(cat /dev/urandom | base64 -i | fold -w 32 | head -n 1)"
)
Recommended Posts