[JAVA] Einführung in Apache Beam (2) ~ ParDo ~

Gesamtzweck

Erstellen Sie ein einfaches Apache Beam-Programm, um zu verstehen, wie es funktioniert

Vorheriger Inhalt

Ich habe den gelesenen Text so geschrieben, wie er ist, um den Start zu bestätigen. Einführung in Apache Beam (1) ~ Lesen und Schreiben von Text ~

Zweck dieser Zeit

Ändern Sie den Datentyp, indem Sie für jede gelesene Textzeile eine ParDo-Verarbeitung durchführen Insbesondere wird der Prozess "Lesen von Bitcoin-Tickerinformationen aus Textdaten und Extrahieren eines beliebigen Werts daraus" durchgeführt.

Was ist ParDo überhaupt?

Core Beam Transforms Beam bietet die folgenden 6 als Basisdatenvarianten (wie die Gliederung der Verarbeitung)

Eines davon ist ParDo

Welche Art der Verarbeitung?

Kurz gesagt, der Prozess der Verarbeitung der Eingabedaten (1 Stück) und der Ausgabe (unabhängig von der Anzahl)

Hauptgeschichte

Umgebung

Genauso wie letztes Mal

IntelliJ


IntelliJ IDEA 2017.3.3 (Ultimate Edition)
Build #IU-173.4301.25, built on January 16, 2018
Licensed to kaito iwatsuki
Subscription is active until January 24, 2019
For educational use only.
JRE: 1.8.0_152-release-1024-b11 x86_64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
Mac OS X 10.12.6

Maven : 3.5.2

Verfahren

Zu verwendende Textdaten

Die verwendeten Daten sind wie folgt BTC / JPY-Rateninformationen im Bitflyer werden 10 Mal alle 10 Sekunden erfasst.

Die Bestellung lautet ", , , , ".

Sample.txt


BTC/JPY,bitflyer,1519845731987,1127174.0,1126166.0
BTC/JPY,bitflyer,1519845742363,1127470.0,1126176.0
BTC/JPY,bitflyer,1519845752427,1127601.0,1126227.0
BTC/JPY,bitflyer,1519845762038,1127591.0,1126316.0
BTC/JPY,bitflyer,1519845772637,1127801.0,1126368.0
BTC/JPY,bitflyer,1519845782073,1126990.0,1126411.0
BTC/JPY,bitflyer,1519845792827,1127990.0,1126457.0
BTC/JPY,bitflyer,1519845802008,1127980.0,1126500.0
BTC/JPY,bitflyer,1519845812088,1127980.0,1126566.0
BTC/JPY,bitflyer,1519845822743,1127970.0,1126601.0

Lesen Sie diesen Text und extrahieren Sie mit ParDo beliebige Informationen (diesmal "BID") aus jeder Zeile.

Codeänderung

Der Code lautet diesmal wie folgt.

SimpleBeam.java


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

public class SimpleBeam {
    //hinzufügen
    public static class ExtractBid extends DoFn<String, String> {
        @ProcessElement
        public void process(ProcessContext c){
            //Holen Sie sich Reihe
            String row = c.element();
            //Mit Komma teilen
            String[] cells = row.split(",");
            //Gibt BID zurück
            c.output(cells[4]);
        }
    }

    public static void main(String[] args){
        PipelineOptions options = PipelineOptionsFactory.create();

        Pipeline p = Pipeline.create(options);
        //Text lesen
        PCollection<String> textData = p.apply(TextIO.read().from("Sample.txt"));

        //Von hier hinzugefügt
        PCollection<String> BidData = textData.apply(ParDo.of(new ExtractBid()));
        //Addiere hier

        //Text schreiben
        BidData.apply(TextIO.write().to("output/bid"));
        //Pipeline-Lauf
        p.run().waitUntilFinish();
    }
}

Die Ausgabe des vorherigen Codes war "Wortanzahl" und ich wusste, dass ich das Beispiel kopiert und eingefügt habe, also habe ich das Ausgabeziel geändert. .. ..

Die Ausführungsmethode ist dieselbe wie beim letzten Mal. Weitere Informationen finden Sie unter hier.

Ausführungsergebnis

Das Ausführungsergebnis ist wie folgt und Sie können sehen, dass die BID-Daten (ganz rechts von Sample.txt) extrahiert wurden. Dieses Mal wird der Einfachheit halber nur ein String extrahiert. Auf diese Weise können Sie jedoch den Datentyp ändern und die Daten formatieren. Mit dieser Art der Verarbeitung können Sie einen beliebigen Schlüssel für die Eingabedaten festlegen und zur Verarbeitung "Reduzieren" wechseln.

https://gyazo.com/e67be50bd8887122e625a7d869a561c9

Ist die Ausgabe wie das Bild in drei Dateien unterteilt, weil der Kartenprozess abgeschlossen ist und dann Reduzieren verteilt und ausgeführt wird?

Über SimpleFunction

Eine Klasse namens "SimpleFinction" ist bereit, die gleiche Verarbeitung zu realisieren, und ich war neugierig, was anders ist, also werde ich es als Bonus zusammenfassen.

Laut dem offiziellen Dokument

If your ParDo performs a one-to-one mapping of input elements to output elements–that is, for each input element, it applies a function that produces exactly one output element, you can use the higher-level MapElements transform. MapElements can accept an anonymous Java 8 lambda function for additional brevity.

Es scheint eine Geschichte zu sein, die Sie mit einer anonymen Funktion abstrakter und einfacher beschreiben können als DoFn von ParDo.

Versuchen Sie, mit der einfachen Funktion neu zu schreiben

Code

beamSample.java


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

public class SimpleBeam {
    public static void main(String[] args){
        PipelineOptions options = PipelineOptionsFactory.create();

        Pipeline p = Pipeline.create(options);
        //Text lesen
        PCollection<String> textData = p.apply(TextIO.read().from("Sample.txt"));

        //Implementieren Sie denselben Prozess mit anonymen Funktionen
        PCollection<String> BidData = textData.apply(
                MapElements.into(TypeDescriptors.strings())
                        .via((String row) -> row.split(",")[4])
        );

        //Text schreiben
        BidData.apply(TextIO.write().to("output/bid"));
        //Pipeline-Lauf
        p.run().waitUntilFinish();
    }
}

Es ist viel einfacher! Was den Eindruck betrifft, dass ich Beam eine Weile berührt habe, ist es ziemlich mühsam, jedes Mal "DoFn" zu erstellen, da der Vorgang des einfachen Änderns des Datentyps übereinstimmt und ich ihn daher gerne aktiv nutzen möchte.

Mit anderen Worten

Java 8 kann nicht aktiviert werden

Normale Änderungsmethode File => Project Structure...

https://gyazo.com/04d6d3533e116905963341ce23bc4e4e

ProjectSettings => Project => Setzen Sie Project language level auf 8

https://gyazo.com/198eb18089ef794afc0c6ab5b53e3553

** Besteht immer noch nicht ... Befolgen Sie in einem solchen Fall die folgenden Schritte **

Lösung

Die folgende Beschreibung wurde zu "pom.xml" hinzugefügt

pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>beamSample</groupId>
    <artifactId>beamSample</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 1.Hinzugefügt, um 8 erkannt zu machen-->
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <!--Addiere hier-->

    <dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>2.4.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>


</project>

vom nächsten mal

Dieses Mal habe ich ParDo implementiert, was der Kartenverarbeitung von MapReduce entspricht. Beim nächsten Mal möchte ich die Reduce-Verarbeitung einfach implementieren und den Durchschnitt der Ticker-Informationen ermitteln.

Recommended Posts

Einführung in Apache Beam (2) ~ ParDo ~
Einführung in Apache Beam (1) ~ Lesen und Schreiben von Text ~
Einführung in Ruby 2
Einführung in web3j
Einführung in Micronaut 1 ~ Einführung ~
[Java] Einführung in Java
Einführung in die Migration
Einführung in Java
Einführung in Doma
Einführung in Ratpack (8) -Session
Einführung in die Bitarithmetik
Einführung in Ratpack (6) --Promise
Einführung in Ratpack (9) - Thymeleaf
Apache Beam Beispielcode
Einführung in PlayFramework 2.7 ① Übersicht
Einführung in das Android-Layout
Einführung in Entwurfsmuster (Einführung)
Einführung in die praktische Programmierung
Einführung in den Befehl javadoc
Einführung in den Befehl jar
Einführung in Ratpack (2) -Architektur
Einführung in den Lambda-Stil
Einführung in den Java-Befehl
Einführung in die Keycloak-Entwicklung
Einführung in den Befehl javac
Einführung in Entwurfsmuster (Builder)
Einführung in die Android App-Entwicklung
Einführung in Ratpack (5) --Json & Registry
Einführung in Metabase ~ Umgebungskonstruktion ~
Einführung in Ratpack (7) - Guice & Spring
(Punktinstallation) Einführung in Java8_Impression
Einführung in Entwurfsmuster (Composite)
Einführung in JUnit (Studiennotiz)
Einführung in Spring Boot ~ ~ DI ~
Einführung in Designmuster (Fliegengewicht)
[Java] Einführung in den Lambda-Ausdruck
Einführung in Spring Boot ② ~ AOP ~
Einführung in die EHRbase 2-REST-API
Einführung in Entwurfsmuster Prototyp
Verwendung von Apache POI
[Java] Einführung in die Stream-API
Einführung in Entwurfsmuster (Iterator)
Einführung in Spring Boot Teil 1
Einführung in Ratpack (1) - Was ist Ratpack?
Einführung in Entwurfsmuster (Strategie)
[Einführung in Janken (ähnliche) Spiele] Java
Einführung in Linux Container / Docker (Teil 1)
Einführung in die schnelle Übungsausgabe Kapitel 5
[Einführung in Java] Über Lambda-Ausdrücke
Einführung in Algorithmen mit Java-kumulativer Summe
[Einführung in Java] Informationen zur Stream-API