*** Below are some of the APIs listed in the "AWS SDK for Python" that are related to boto3 (Python library) that I often use. *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html
*** Please read the resource name and various setting values as appropriate ~ ***
Athena
# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")
class QueryFailed(Exception):
"""
Exception class called when Athena query execution fails
"""
pass
#Query execution (asynchronous processing)
start_query_response = athena.start_query_execution(
QueryString = 'Query',
QueryExecutionContext={
"Database": "GlueDB name used to query"
},
ResultConfiguration={
"OutputLocation" : "s3://Output bucket name/Key"
}
)
query_execution_id = start_query_response["QueryExecutionId"]
#Check the execution status of the query
while True:
query_status = athena.get_query_execution(
QueryExecutionId = query_execution_id
)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
break
elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
raise QueryFailed(f"query_execution_status = {query_execution_status}")
time.sleep(10)
#Get query execution result (only if successful)
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)
CloudWatchLogs
# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback
#Log settings
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])
#Set timezone to Japan time (JST)
JST = timezone(timedelta(hours=9),"JST")
#Date type when outputting logs to S3
DATE_FORMAT = "%Y-%m-%d"
def lambda_handler(event, context):
"""
Output one day's worth of CloudWatch Logs to S3.
The target time is as follows.
AM 00:00:00.000000 ~ PM 23:59:59.999999
"""
try:
#Yesterday PM23:59:59.999999
tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
#Yesterday AM00:00:00.000000
tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
#Used as a prefix when outputting S3 logs
target_date = tmp_yesterday.strftime(DATE_FORMAT)
#Convert to time stamp type for log output (take up to microseconds)
today = int(tmp_today.timestamp() * 1000)
yesterday = int(tmp_yesterday.timestamp() * 1000)
#Get CloudWatchLogGroup from environment variable
logGroups = os.environ["LOG_GROUPS"].split(",")
for logGroupName in logGroups:
try:
keys = ["logGroupName","yesterday","today","target_date"]
values = [logGroupName,yesterday,today,target_date]
payload = dict(zip(keys,values))
#Execute log output
response = logs.create_export_task(
logGroupName = payload["logGroupName"],
fromTime = payload["yesterday"],
to = payload["today"],
destination = BUCKET_NAME,
destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
)
#Wait for the log output to finish executing.
taskId = response["taskId"]
while True:
response = logs.describe_export_tasks(
taskId = taskId
)
status = response["exportTasks"][0]["status"]["code"]
#Break if task execution is finished
if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
logger.info(f"taskId {taskId} has finished exporting")
break
else:
logger.info(f"taskId {taskId} is now exporting")
time.sleep(WAITING_TIME)
continue
except Exception as e:
traceback.print_exc()
logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)
except Exception as e:
traceback.print_exc()
logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
raise
DynamoDB
# -*- coding: utf-8 -*-
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')
#Get 1 item
response = table.get_item(
Key = {
"id" : "1"
}
)
#Register one item
response = table.put_item(
Item = {
"id" : "1",
"key" : "value"
}
)
#Updated 1 item
response = table.update_item(
Key = {
"id" : "1"
},
UpdateExpression = "set key2 = :val",
ExpressionAttributeValues = {
":val" : "value2"
},
ReturnValues = "UPDATED_NEW"
)
#Delete 1 item
response = table.delete_item(
Key = {
"id" : "1"
}
)
#Delete all items (all records) truncate or delete from table
#Get all data
delete_items = []
parameters = {}
while True:
response = table.scan(**parameters)
delete_items.extend(response["Items"])
if "LastEvaluatedKey" in response:
parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
else:
break
#Key extraction
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#Data deletion
with table.batch_writer() as batch:
for key in delete_keys:
batch.delete_item(Key = key)
#Delete table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
for table_name in response['TableNames']:
if table_name == "Table name to be deleted":
dynamodb_client.delete_table(TableName = table_name)
waiter = dynamodb_client.get_waiter("table_not_exists")
waiter.wait(TableName = table_name)
#If you have Target Tracking type Auto Scaling, you can delete CloudWatch Alarm at the same time by deleting ScalingPolicy.
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}ReadCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}WriteCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
#Create table
table_name = "table"
dynamodb.create_table(
TableName = table_name,
KeySchema = [{
"AttributeName" : "id",
"KeyType" : "HASH"
}],
AttributeDefinitions = [{
"AttributeName" : "id",
"AttributeType" : "S"
}],
ProvisionedThroughput = {
"ReadCapacityUnits" : 1,
"WriteCapacityUnits" : 1
}
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
#Target Tracking type Auto Scaling settings
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "IAM Role ARN for Auto Scaling"
)
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "IAM Role ARN for Auto Scaling"
)
#Set scaling policy
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}ReadCapacity",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBReadCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}WriteCapacity",
ScalableDimension='dynamodb:table:WriteCapacityUnits',
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
#Table schema update
response = dynamodb_client.update_table(
AttributeDefinitions = [
{
'AttributeName': 'string',
'AttributeType': 'S'|'N'|'B'
},
],
TableName = 'string',
BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
ProvisionedThroughput = {
'ReadCapacityUnits': 123,
'WriteCapacityUnits': 123
}
)
#Batch processing
table = dynamodb.Table("table")
with table.batch_writer() as batch:
for i in range(10 ** 6):
batch.put_item(
Item = {
"id" : str(i + 1),
"key" : f"key{i + 1}"
}
)
Kinesis Data Stream
# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")
#Data transmission
kinesis.put_records(
Records = [
{
"Data" : b"String",
"PartitionKey" : "String"
}
],
StreamName = "Kinesis Data Stream Name"
)
Lambda
# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')
#Lambda run
response = lambda_client.invoke(
FunctionName = 'Lambda to be processed',
InvocationType = 'Event'|'RequestResponse'|'DryRun',
LogType = 'None'|'Tail',
Payload = b'bytes'|file
)
#Event Source Mappings Enable or Disable
import time
expected_status = "Enabled" #In case of invalidation, Disabled
progressing_status = "Enabling" #In case of invalidation, Disabling
Enabled = True #False for invalidation
response = lambda_client.list_event_source_mappings(
EventSourceArn = "Kinesis or DynamoDB ARN to be processed",
FunctionName = "Lambda function name to be processed"
)
if "EventSourceMappings" in response:
for e in response["EventSourceMappings"]:
if e["State"] != expected_status or e["State"] != progressing_status:
response = lambda_client.update_event_source_mapping(
UUID = e["UUID"],
FunctionName = e["FunctionArn"],
Enabled = Enabled,
BatchSize = 100
)
if response["State"] != expected_status:
while True:
response = lambda_client.get_event_source_mapping(
UUID = e["UUID"]
)
if response["State"] == expected_status:
break
time.sleep(10)
SageMaker
# -*- coding: utf-8 -*-
import boto3
sagemaker = boto3.client("sagemaker-runtime")
#Access sagemaker endpoint and receive forecast results
response = sagemaker.invoke_endpoint(
EndpointName = "SageMaker endpoint name",
Body=b'bytes'|file,
ContentType = 'text/csv', #The MIME type of the input data in the request body.
Accept = 'application/json' #The desired MIME type of the inference in the response.
)
SQS
# -*- coding: utf-8 -*-
import boto3
sqs = boto3.client('sqs')
QUEUE_URL= "SQS queue URL. You can check it on the console."
#Get all messages from the SQS queue.
while True:
sqs_message = sqs.receive_message(
QueueUrl = QUEUE_URL,
MaxNumberOfMessages = 10
)
if "Messages" in sqs_message:
for message in sqs_message["Messages"]:
try:
print(message)
#Delete the acquired message. Otherwise you may get duplicates
sqs.delete_message(
QueueUrl = QUEUE_URL,
ReceiptHandle = message["ReceiptHandle"]
)
except Exception as e:
print(f"type = {type(e)} , message = {e}")
S3
# -*- coding: utf-8 -*-
import boto3
s3 = boto3.client('s3')
BUCKET_NAME = "Bucket name to be processed"
#1 Object (file) writing
s3.put_object(
Bucket = BUCKET_NAME,
Body = "Data content. str type or bytes type",
Key = "S3 key. s3:://Bucket name/Subsequent directories and file names"
)
#1 Read object (file)
s3.get_object(
Bucket = BUCKET_NAME,
Key = "S3 key. s3:://Bucket name/Subsequent directories and file names"
)
#1 Delete object (file)
s3.delete_object(
Bucket = BUCKET_NAME,
Key = "S3 key. s3:://Bucket name/Subsequent directories and file names"
)
#Copy one object (file) to another location
s3.copy_object(
Bucket = BUCKET_NAME,
Key = "Destination S3 key. s3:://Bucket name/Subsequent directories and file names",
CopySource = {
"Bucket" : "Source bucket name",
"Key" : "Migration source S3 key"
}
)
#Get all objects (files) below the specified prefix or in the bucket
BUCKET_NAME= "Bucket name to be processed"
contents = []
kwargs = {
"Bucket" : BUCKET_NAME,
"Prefix" : "Search target prefix"
}
while True:
response = s3.list_objects_v2(**kwargs)
if "Contents" in response:
contents.extend(response["Contents"])
if 'NextContinuationToken' in response:
kwargs["ContinuationToken"] = response['NextContinuationToken']
continue
break
Recommended Posts