Boto3 est la bibliothèque python d'aws, mais elle possède une API de bas niveau qui est une API naïve et une API de haut niveau orientée objet qui l'encapsule.
Manipulation d'objets S3 avec Boto3 (API de haut niveau et API de bas niveau) --Qiita https://qiita.com/sokutou-metsu/items/5ba7531117224ee5e8af
Jusqu'à présent, client.list_objects_v2, qui est une API de bas niveau, était utilisé pour afficher la liste de S3, mais resource.Bucket (). Objects.filter existe en tant qu'API de haut niveau correspondante. (Je ne pouvais pas me retrouver car le matériel de s3 était trop énorme)
C'est un article que la quantité de description est réduite et la vitesse est augmentée en utilisant l'API de haut niveau, alors utilisons l'API de haut niveau.
Utilisez la nouvelle version de l'API S3 ListObjects, ListObjects V2 | Developers.IO https://dev.classmethod.jp/cloud/aws/s3-new-api-list-object-v2/
Dans list_objects_v2, 1000 éléments sont récupérés à la fois. Ceci est un exemple car il nécessite un traitement de pagination. (Appel de cette description récursivement)
s3client = self._session().client('s3')
if next_token:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
ContinuationToken=next_token,
)
else:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
)
if 'Contents' in response:
keys = [i['Key'] for i in response['Contents']]
else:
keys = []
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
next_token = None
78733 Objet → 46 secondes
Executed <function test at 0x10c0743b0> in 46.35232996940613 seconds
Bucket (). Objects est de type ObjectSummary et les attributs sont spécifiés ici par le filtre de chaînage, all, limit, page_size, etc. La valeur de retour est également ObjectSummary https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary
L'ObjectSummary lui-même est un itérateur et les données sont en fait acquises au moment de l'appel de l'itérateur.
Si vous spécifiez KeyMarker
dans l'argument de filter, vous pouvez rechercher à partir du milieu, vous pouvez spécifier RequestPayer, etc. Il semble que vous puissiez faire presque tout ce que vous pouvez faire avec list_objects_v2.
s3_resource = self._session().resource('s3')
a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
# a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix).limit(count=2000)
b = [k.key for k in a]
78733 Objet → 33 secondes
Executed <function test at 0x10191f200> in 33.14992713928223 seconds
Puisqu'il s'agit d'un code d'écriture, il existe des parties appropriées
import os
from pathlib import Path
from typing import Optional
import boto3
from dataclasses import dataclass
from lauda import stopwatch
@dataclass
class S3Manager:
source_bucket: str
source_prefix: str
profile: Optional[str] = None
def _session(self):
s = boto3.session.Session(
profile_name=self.profile
)
return s
def _list_source(self, *, accumulated=None, next_token=None, func=None):
s3client = self._session().client('s3')
if next_token:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
ContinuationToken=next_token,
)
else:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
)
if 'Contents' in response:
keys = [i['Key'] for i in response['Contents']]
else:
keys = []
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
next_token = None
if func:
return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)
def _accumulate(self, *, response, keys, func, next_token, accumulated):
got_keys = (accumulated or []) + keys
if next_token:
print(f'searching... current fetch keys are :{len(got_keys)}')
return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
else:
return got_keys
def list_all(self) -> list:
return self._list_source(func=self._accumulate)
def _delete(self, *, response, keys, func, next_token, accumulated):
if keys:
print(f'deleting: {self.source_bucket}/{self.source_prefix}')
s3client = boto3.Session().client('s3')
s3client.delete_objects(
Bucket=self.source_bucket,
Delete={
'Objects': [{'Key': key} for key in keys],
'Quiet': False
},
)
if next_token:
return self._list_source(next_token=next_token, func=func)
def delete_all(self) -> None:
self._list_source(func=self._delete)
def list_all_test(self):
s3_resource = self._session().resource('s3')
a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
b = [k.key for k in a]
print(len(b))
if __name__ == '__main__':
os.chdir(Path(__file__).parents[1])
@stopwatch
def test():
s3 = S3Manager(
source_bucket='seau',
source_prefix='Chemin de recherche',
)
# s3.list_all()
s3.list_all_test()
test()
L'API de niveau inférieur transmet la fonction d'extensibilité, bien qu'il puisse y avoir une surcharge. L'API de haut niveau n'est pas lente et la description est facile, alors utilisons l'API de haut niveau.
Recommended Posts