[JAVA] Implémenté dans Dataflow pour copier la structure hiérarchique de Google Drive vers Google Cloud Storage

introduction

Je l'ai implémenté car je souhaitais souvent synchroniser Google Drive (ci-après Drive) et Google Cloud Storage (ci-après GCS). Puisque GCS n'a pas le concept de répertoire, la copie peut être parallélisée si le chemin du fichier est connu.

[Drive]                            [GCS]

root/                              gs:root/
 ├ hoge.txt                         ├ hoge.txt
 ├ folderA/                         ├ folderA/fuga.txt
 │  └ fuga.txt                      ├ folderB/folderC/hogehoge.txt 
 ├ folderB/               ----->    └ piyo.txt
 │  └ folderC/                      
 │   └ hogehoge.txt/             
 └ piyo.txt

*Image où le chemin du fichier sur Drive devient le nom de fichier sur GCS

Pourquoi Dataflow?

Au début, j'ai écrit un traitement de copie parallèle avec Google App Engine (ci-après GAE). Cependant, si les tâches de copie parallèle sont distribuées, il est difficile de détecter que toutes les copies ont été terminées. De plus, GAE n'est tout simplement pas bon pour le traitement par lots, et j'ai récemment été exposé à Dataflow au travail. Avec Dataflow, vous pouvez attendre la fin du processus distribué. Après cela, j'ai pensé qu'il serait préférable d'écrire Pub / Sub ou CustomIO et de le connecter au traitement suivant.

Exigences

Copiez la structure hiérarchique directement sous le dossier avec Drive (ci-après dénommé dossier racine) dans GCS en parallèle. Les fichiers qui ne peuvent pas être copiés, tels que la feuille de calcul, sont exclus. Qu'en est-il des fichiers avec le même nom de fichier dans le même dossier?

Aperçu de la mise en œuvre

Recherchez les éléments suivants à partir de l'ID de dossier racine et créez une liste d'objets avec les ID de fichier et les chemins de fichier. Distribuez les objets créés à chaque tâche et parallélisez la partie "Télécharger les fichiers depuis Drive et les télécharger vers GCS".

Conception de pipeline

――DriveIO existe-t-il en standard?

Code de pipeline simple

// *Point 1:C'est une erreur de faire d'abord une entrée avec une valeur appropriée
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())
 
 .apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
 .apply("Write GCS", ParDo.of(new WriteStorageDoFn()));
 
 // *Point 2:Je veux attendre que tout le traitement de copie soit terminé, donc je prends la valeur totale de Output
 .apply("Combine!", Sum.integersGlobally()))
 
 .apply("La copie est terminée, alors faites ce que vous voulez avec le traitement suivant!")

p.run();

--ReadDriveDoFn: créer une liste de fichiers directement sous le dossier racine

	public class ReadDriveDoFn extends DoFn<String, File> {

		private List<File> file;

	    @ProcessElement
	    public void processElement(ProcessContext c) {
	        recursiveSearch(rootFolderID, filePath); //Créer une liste
	        for (File file : fileList) {
	            c.output(file); //Distribuez la liste!
	        }
	    }
	}

--WriteStorageDoFn: téléchargez le fichier depuis Drive et téléchargez-le sur GCS

	public class WriteStorageDoFn extends DoFn<File, Integer> {
	    @ProcessElement
	    public void processElement(ProcessContext c) {
	    	downloadFromDrive(fileId);
	    	uploadToGCS(filePath);
	    	c.output(1);
	    }
	}

--Sum.integersGlobally: Ajoutez le nombre d'éléments de sortie> Ici montre le nombre de fichiers copiés

à la fin

C'est plus de deux fois plus rapide que le processus que j'ai écrit à l'origine dans GAE / Go. Cependant, les API de type G Suite (Apps) sont extrêmement fragiles, n'est-ce pas? Il est désormais possible de distribuer des copies, mais en essayant de gérer un grand nombre de fichiers, une erreur considérable se produit. Écrivons correctement le processus de nouvelle tentative. Dataflow n'est toujours pas bon pour les détails, mais je pense qu'il a des possibilités infinies, alors j'aimerais continuer à l'utiliser à diverses fins à l'avenir.

Recommended Posts

Implémenté dans Dataflow pour copier la structure hiérarchique de Google Drive vers Google Cloud Storage
Copier des données d'Amazon S3 vers Google Cloud Storage avec Python (boto)
J'ai vérifié le package Python pré-installé dans Google Cloud Dataflow
Comment réparer la merde lors de la lecture d'images Google Cloud Storage de Django déployées sur GAE
Obtenir la liste des objets Google Cloud Storage en Java
Comment utiliser l'API Google Cloud Translation
Mémorandum ((1) Copier et coller à partir d'un autre livre (2) Reportez-vous au tableau de comparaison avec openpyxl)
Faire une copie d'un fichier Google Drive à partir de Python
Envoyer les données du journal du serveur vers Splunk Cloud
Comment charger des fichiers dans Google Drive avec Google Colaboratory
Envoyer un message du serveur à l'extension Chrome à l'aide de Google Cloud Messaging pour Chrome
[Python] Modifier le contrôle du cache des objets téléchargés sur Cloud Storage
Connectez-vous à l'écran de gestion fortigate (6.0) à partir de sélénium-essayez de vous déconnecter
Comment se connecter automatiquement comme 1Password depuis CLI
Dupliquer le modèle de document préparé dans Google Drive avec PyDrive2
Qu'est-ce que Google Cloud Dataflow?
[GCP] Comment publier une URL signée Cloud Storage (URL temporaire) en Python
Publier un message d'IBM Cloud Functions sur Slack en Python
Comment se connecter à Cloud Firestore à partir de Google Cloud Functions avec du code Python
Script pour sauvegarder les dossiers sur le serveur sur Google Drive
Changer la version active dans Pyenv d'Anaconda en Python ordinaire
À partir de la page du produit AWS cloud, placez le nom du service AWS sur csv
Téléchargez les images et vidéos contenues dans les tweets que vous avez aimés sur Twitter et téléchargez-les sur Google Drive
Comment copier et coller le contenu d'une feuille au format JSON avec une feuille de calcul Google (en utilisant Google Colab)