If you want to test AWS S3, I will show you how to use MinIO to build a mock of S3 locally.
MinIO can be used by simply executing the following (without installing it).
docker run -d -p 9000:9000 --name minio -v $PWD/data:/data \
-e "MINIO_ACCESS_KEY=AKIA0123456789ABCDEF" \
-e "MINIO_SECRET_KEY=0123456789/abcdefghi/ABCDEFGHI0123456789" \
minio/minio server /data
--Change MINIO_ACCESS_KEY
and MINIO_SECRET_KEY
to 20 and 40 characters as appropriate.
--You can manage it with your browser at http://127.0.0.1:9000
.
--The contents of S3 will be created in data
in the current directory.
import os
import boto3
bucket_name = 'sample' #Bucket name
use_minio = True #Whether to use MinIO
os.environ['AWS_ACCESS_KEY_ID'] = 'AKIA0123456789ABCDEF'
os.environ['AWS_SECRET_ACCESS_KEY'] = '0123456789/abcdefghi/ABCDEFGHI0123456789'
Suppose the environment variables ʻAWS_ACCESS_KEY_ID and ʻAWS_SECRET_ACCESS_KEY
contain the ones specified by docker.
kwargs = dict(
region_name="ap-northeast-1",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)
if use_minio:
kwargs["endpoint_url"] = "http://127.0.0.1:9000"
bucket = boto3.resource("s3", **kwargs).Bucket(bucket_name)
When using MinIO, you can switch by simply specifying " http://127.0.0.1:9000 "
in ʻendpoint_url`.
127.0.0.1
.Suppose the file test_file
exists in the current directory.
bucket.create() #Bucket creation
bucket.upload_file("test_file", "upload_file") # upload_Upload as file
print(list(bucket.objects.all())) #File list
bucket.download_file("upload_file", "download_file") # download_Download as file
bucket.Object("upload_file").delete() #File deletion
bucket.delete() #Bucket deletion
It works just like regular S3.
Please check http://127.0.0.1:9000
as appropriate.
We recommend using profiles rather than environment variables.
-Please install aws CLI.
--Create a profile with ʻaws configure --profile test. Please change
test` as appropriate.
Create a bucket using the profile you created.
When using MinIO, you will specify profile_name
and ʻendpoint_url`.
profile_name = "test"
cr = boto3.Session(profile_name=profile_name).get_credentials()
kwargs = dict(aws_access_key_id=cr.access_key, aws_secret_access_key=cr.secret_key)
if use_minio:
kwargs["endpoint_url"] = "http://localhost:9000"
bucket = boto3.resource("s3", **kwargs).Bucket(bucket_name)
The created bucket can be used in the same way as the previous sample.
that's all
Recommended Posts