Je l'ai implémenté car je souhaitais souvent synchroniser Google Drive (ci-après Drive) et Google Cloud Storage (ci-après GCS). Puisque GCS n'a pas le concept de répertoire, la copie peut être parallélisée si le chemin du fichier est connu.
[Drive] [GCS]
root/ gs:root/
├ hoge.txt ├ hoge.txt
├ folderA/ ├ folderA/fuga.txt
│ └ fuga.txt ├ folderB/folderC/hogehoge.txt
├ folderB/ -----> └ piyo.txt
│ └ folderC/
│ └ hogehoge.txt/
└ piyo.txt
*Image où le chemin du fichier sur Drive devient le nom de fichier sur GCS
Au début, j'ai écrit un traitement de copie parallèle avec Google App Engine (ci-après GAE). Cependant, si les tâches de copie parallèle sont distribuées, il est difficile de détecter que toutes les copies ont été terminées. De plus, GAE n'est tout simplement pas bon pour le traitement par lots, et j'ai récemment été exposé à Dataflow au travail. Avec Dataflow, vous pouvez attendre la fin du processus distribué. Après cela, j'ai pensé qu'il serait préférable d'écrire Pub / Sub ou CustomIO et de le connecter au traitement suivant.
Copiez la structure hiérarchique directement sous le dossier avec Drive (ci-après dénommé dossier racine) dans GCS en parallèle. Les fichiers qui ne peuvent pas être copiés, tels que la feuille de calcul, sont exclus. Qu'en est-il des fichiers avec le même nom de fichier dans le même dossier?
Recherchez les éléments suivants à partir de l'ID de dossier racine et créez une liste d'objets avec les ID de fichier et les chemins de fichier. Distribuez les objets créés à chaque tâche et parallélisez la partie "Télécharger les fichiers depuis Drive et les télécharger vers GCS".
――DriveIO existe-t-il en standard?
// *Point 1:C'est une erreur de faire d'abord une entrée avec une valeur appropriée
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())
.apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
.apply("Write GCS", ParDo.of(new WriteStorageDoFn()));
// *Point 2:Je veux attendre que tout le traitement de copie soit terminé, donc je prends la valeur totale de Output
.apply("Combine!", Sum.integersGlobally()))
.apply("La copie est terminée, alors faites ce que vous voulez avec le traitement suivant!")
p.run();
--ReadDriveDoFn: créer une liste de fichiers directement sous le dossier racine
public class ReadDriveDoFn extends DoFn<String, File> {
private List<File> file;
@ProcessElement
public void processElement(ProcessContext c) {
recursiveSearch(rootFolderID, filePath); //Créer une liste
for (File file : fileList) {
c.output(file); //Distribuez la liste!
}
}
}
--WriteStorageDoFn: téléchargez le fichier depuis Drive et téléchargez-le sur GCS
public class WriteStorageDoFn extends DoFn<File, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
downloadFromDrive(fileId);
uploadToGCS(filePath);
c.output(1);
}
}
--Sum.integersGlobally: Ajoutez le nombre d'éléments de sortie> Ici montre le nombre de fichiers copiés
C'est plus de deux fois plus rapide que le processus que j'ai écrit à l'origine dans GAE / Go. Cependant, les API de type G Suite (Apps) sont extrêmement fragiles, n'est-ce pas? Il est désormais possible de distribuer des copies, mais en essayant de gérer un grand nombre de fichiers, une erreur considérable se produit. Écrivons correctement le processus de nouvelle tentative. Dataflow n'est toujours pas bon pour les détails, mais je pense qu'il a des possibilités infinies, alors j'aimerais continuer à l'utiliser à diverses fins à l'avenir.