Il s'agit d'un package de WACUL Co., Ltd. et CTO. Tout le monde dans l'entreprise passe un bon moment à faire du pain, à faire du curry et à regarder des films.
Ces derniers temps, j'ai joué avec Google Cloud Dataflow, je l'ai donc organisé comme une note d'introduction. J'espère que cela aidera ceux qui sont sur le point de le toucher en premier à avoir un aperçu approximatif. Je n'ai pas d'expertise approfondie en streaming ou en traitement par lots, alors j'apprécierais que vous me disiez ce que je faisais mal.
Il s'agit d'un matériel de référence lié à Google Cloud Dataflow que j'ai lu jusqu'à présent.
Lisons quand même
Un article de commentaire rédigé par Google pour la migration du traitement par lots existant vers le traitement en continu. 101 est une histoire sur les concepts de base, et la partie 102 est une histoire qui va un peu plus loin dans Dataflow.
[Article traduit] de kimutansk (http://qiita.com/kimutansk/items/447df5795768a483caa8) est dans Qiita, c'est trop beau.
Pourquoi Apache Beam: raisons d'encourager Dataflow à entrer dans le rival
Un article expliquant le contexte derrière l'ouverture par Google du modèle de programmation Dataflow. Le dernier «futur du streaming et des lots est dans Apache Beam» est un long chemin.
Cloud Dataflow est un moteur de traitement de données à grande échelle et son service géré. En général, ce serait bien de le considérer comme un compagnon de Hadoop, Spark, etc. Les principales caractéristiques sont la fourniture d'un nouveau modèle de programmation et d'un environnement d'exécution entièrement géré.
La différence entre le traitement par lots et le traitement par flux est de savoir si les données traitées sont finies (bornées) ou infinies (illimitées). La plate-forme de traitement par lots ayant une histoire plus longue et étant plus stable que la plate-forme de traitement en continu, le traitement de données à grande échelle a été conçu sur la base du traitement par lots. D'un autre côté, il existe une demande croissante pour des décisions commerciales plus rapides et une livraison plus rapide des données aux utilisateurs, et une demande croissante de moteurs de traitement en continu qui traitent en permanence des données infinies.
Par conséquent, Lambda Architecture, qui combine le traitement par lots et le traitement en continu pour fournir le résultat final, est apparu. Fait. Cependant, maintenir un système construit avec une architecture lambda peut être une tâche ardue. Il est difficile d'imaginer que vous deviez utiliser différents modèles de programmation et être cohérent au niveau logique. .. ..
Le modèle de programmation fourni par Cloud Dataflow semble avoir pour objectif d'intégrer un traitement de données fini et un traitement de données infini basé sur un traitement en continu. En gros, si vous avez un modèle qui traite avec précision les nouvelles données entrantes et un moteur qui peut être fait dans un temps réaliste, si vous lisez toutes les données passées et les saisissez, vous obtiendrez le même résultat. C'est très bien! à propos de ça.
Le problème avec les moteurs de streaming existants était qu'il était difficile de contrôler l'intégrité des données, principalement en raison de la difficulté de gérer la notion de temps. La difficulté de traiter la notion de temps est due au fait qu'en réalité, il existe de nombreux cas où le moment d'exécution du traitement réel est différent du moment où l'événement s'est réellement produit. Retards de réseau et de traitement, et dans les cas extrêmes, les journaux des applications mobiles sont envoyés au serveur lorsqu'ils sont en ligne. Pour faire face à cela, il est nécessaire de gérer le traitement de mise en mémoire tampon et les données qui sont différentes de l'ordre d'arrivée réel. Cloud Dataflow semble être en mesure de bien gérer cela en introduisant ici de nouveaux concepts. (Je ne comprends pas complètement, veuillez donc consulter le matériel de référence https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101, https://www.oreilly.com/ideas / le-monde-au-delà-par lots-streaming-102)
Ce modèle de programmation était à l'origine dédié à l'exécution sur GCP, mais est devenu open source par Google vers mai 2016 en tant que projet Apache Beam (au fait, Beam est une combinaison de Batch et Stream). ..
Google Cloud Dataflow, Apache Spark, Apache Flink, Apache Apex, etc. peuvent être sélectionnés comme moteur d'exécution d'Apache Beam qui optimise chaque étape du pipeline et distribue efficacement le traitement, qui sera décrit plus tard (non vérifié sauf pour Cloud Dataflow). Sur place, Flink semble être le meilleur.
Donc, pour être précis, Cloud Dataflow se positionne comme un service entièrement géré, le moteur d'exécution d'Apache Beam fonctionnant sur GCP. Lorsque vous le déplacez, l'instance GCE sera lancée dans les coulisses. Compte tenu du coût de maintenance de l'infrastructure, je pense que cela peut être une option intéressante en particulier pour les startups.
Java or Python
Vous pouvez choisir Java ou Python comme langage d'implémentation pour Cloud Dataflow. Cependant, à partir de novembre 2016, la version Python peut ne pas être disponible en version bêta.
Tel. Il sera désormais rempli.
Jetons un coup d'œil au code Java. L'exemple de code se trouve dans le référentiel github lié à partir de la documentation Google. C'est organisé.
Version Java8, comptage des mots en entrée [WordCount](https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8 Prenons .java) comme exemple.
Extrait uniquement pour la partie principale (Pour CHANGE, vous devez saisir l'ID de projet sur votre GCP ou le nom du bucket GCS)
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
// CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
options.setProject("SET_YOUR_PROJECT_ID_HERE");
// CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {}))
.apply(Filter.byPredicate((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
.withOutputType(new TypeDescriptor<String>() {}))
// CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run();
}
Pipeline
Dans l'ambiance du code, vous pouvez voir que Dataflow est un style fonctionnel qui définit le cadre de traitement plutôt que de traitement des valeurs. C'est similaire à Rx (Reactive Extension) ou TensorFlow.
Un pipeline est un objet qui contrôle le flux de traitement et est créé en passant des options à Pipeline.create
.
Spécifiez éventuellement les paramètres requis pour exécuter le pipeline.
Basculer entre le mode batch et le mode streaming
options.setRunner(BlockingDataflowPipelineRunner.class);
Partie de
options.setRunner(DataflowPipelineRunner.class);
À En d'autres termes, les deux seront traités par le flux construit sur l'objet Pipeline
.
PCollection, PTransform
Ce que nous transmettons à Apply de Pipeline est un objet PTransform
. Généré avec FlatMapElements.via
et Filter.byPredicate
avec les expressions lambda java8.
«PTransform» est une interface qui définit le processus de réception de «PInput» et de retour de «POutput».
Pour cette PTransform
, le pipeline fera le travail en transmettant les valeurs suivantes.
Le traitement du point de départ est l'interface «PInput», et le traitement du point final est l'interface «POutput», mais le traitement intermédiaire gère une instance de la classe «PCollection» qui implémente les deux.
La «PCollection» est l'objet qui représente les données qui traversent le pipeline, et d'autre part, la conversion, le branchement et le traitement de jointure des données sont construits sur le pipeline.
Exemple:
//Divisez la collection précédente de chaînes en mots(Rendre le tableau plat à la fin)
input.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {})) ...
//Filtrer uniquement pour les collections non vides de chaînes de la ligne précédente
input.apply(Filter.byPredicate((String word) -> !word.isEmpty())) ...
//Agréger la collection de chaînes de la ligne précédente et la convertir en une collection de cartes de décomptes par valeur
input.apply(Count.<String>perElement()) ...
Entrées et sorties actuellement prises en charge
(La version python est un fichier texte et BigQuery uniquement)
L'entrée et la sortie sont également un type de PTransform
, alors passez-le à ʻapply` pour le traitement.
Exemple:
//Lisez un fichier texte depuis Google Cloud Storage et convertissez-le en une collection ligne par ligne de chaînes
pipeline.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) ...
//Convertir l'entrée de flux de Cloud Pubsub en une collection horodatée
pipeline.apply(PubsubIO.Read
.timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
.subscription(options.getPubsubSubscription())) ...
//Reçoit une collection et la transmet à BigQuery
//tableRef est des informations telles que l'emplacement de la table, les informations de schéma de la table de schéma
... .apply(BigQueryIO.Write.to(tableRef)
.withSchema(schema)
Si vous placez vos informations de projet et de stockage dans l'exemple de code et que vous l'exécutez, vous pouvez surveiller l'état d'exécution à partir de l'écran de gestion de Cloud Dataflow.
Le processus défini dans Pipeline a été traité dans le cloud. Dans les coulisses, une instance de Compute Engine est lancée et traitée. La sortie de cet exemple est générée en divisant le fichier en plusieurs parties sur le bucket Cloud Storage que vous avez créé.
À propos, le fichier texte de l'entrée de cet exemple est
GLOUCESTER A poor unfortunate beggar.
EDGAR As I stood here below, methought his eyes
Were two full moons; he had a thousand noses,
Horns whelk'd and waved like the enridged sea:
It was some fiend; therefore, thou happy father,
Think that the clearest gods, who make them honours
Of men's impossibilities, have preserved thee.
GLOUCESTER I do remember now: henceforth I'll bear
Affliction till it do cry out itself
'Enough, enough,' and die. That thing you speak of,
I took it for a man; often 'twould say
'The fiend, the fiend:' he led me to that place.
C'est comme un scénario de Shakespeare. Le nombre de sorties par mot est
decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6
C'est un fichier au format comme.
J'ai couru un exemple simple pour avoir une idée de la sensation approximative de Google Cloud Dataflow. Ce que j'ai ressenti était
Recommended Posts