Ich habe den folgenden Artikel über Cloud Dataflow bereits geschrieben. https://qiita.com/yuyu_hf/items/e8e738f542e1f30d7be4
In dem Artikel war ich zu diesem Zeitpunkt nicht an Apache Beam gewöhnt, daher habe ich Java anstelle von Scala verwendet. Wenn ich jedoch überlege, ob ich Java, Scala oder Scala + Scio verwenden soll, gibt es etwas, das auf einen Blick verglichen werden kann. Ich dachte, es wäre gut und versuchte es mit einfacher Verarbeitung
Es ist ein sehr einfacher Prozess, der eine bestimmte Spalte von BigQuery extrahiert und an GCS ausspuckt.
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DgroupId=com.example \
-DartifactId=bigquery-to-gcs \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>dataflow-bigquerytogcs</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<properties>
<beam.version>2.16.0</beam.version>
<bigquery.version>v2-rev20181104-1.27.0</bigquery.version>
<google-clients.version>1.27.0</google-clients.version>
<hamcrest.version>2.1</hamcrest.version>
<jackson.version>2.9.10</jackson.version>
<joda.version>2.10.3</joda.version>
<junit.version>4.13-beta-3</junit.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
<mockito.version>3.0.0</mockito.version>
<pubsub.version>v1-rev20181105-1.27.0</pubsub.version>
<slf4j.version>1.7.25</slf4j.version>
<spark.version>2.4.4</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
<nemo.version>0.1</nemo.version>
<flink.artifact.name>beam-runners-flink-1.8</flink.artifact.name>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<parallel>all</parallel>
<threadCount>4</threadCount>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${maven-surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>
<!-- Ensure that the Maven jar plugin runs before the Maven
shade plugin by listing the plugin higher within the file. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
</plugin>
<!--
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>direct-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<!-- Makes the DirectRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>portable-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<!-- Makes the PortableRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-reference-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>apex-runner</id>
<!-- Makes the ApexRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-apex</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!--
Apex depends on httpclient version 4.3.6, project has a transitive dependency to httpclient 4.0.1 from
google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
can be removed when the project no longer has a dependency on a different httpclient version.
-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.6</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--
Apex 3.6 is built against YARN 2.6. Version in the fat jar has to match
what's on the cluster, hence we need to repeat the Apex Hadoop dependencies here.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>dataflow-runner</id>
<!-- Makes the DataflowRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-runner</id>
<!-- Makes the FlinkRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<!-- Please see the Flink Runner page for an up-to-date list
of supported Flink versions and their artifact names:
https://beam.apache.org/documentation/runners/flink/ -->
<artifactId>${flink.artifact.name}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-runner</id>
<!-- Makes the SparkRunner available when running a pipeline. Additionally,
overrides some Spark dependencies to Beam-compatible versions. -->
<properties>
<netty.version>4.1.17.Final</netty.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
<scope>runtime</scope>
</dependency>
<!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<profile>
<id>gearpump-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-gearpump</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>samza-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>nemo-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-compiler-frontend-beam</artifactId>
<version>${nemo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<profile>
<id>jet-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-jet-experimental</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>${google-clients.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
<version>${bigquery.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>${google-clients.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>${pubsub.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>
<!-- Add slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<scope>runtime</scope>
</dependency>
<!-- Hamcrest and JUnit are required dependencies of PAssert,
which is used in the main code of DebuggingWordCount example. -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest.version}</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!-- The DirectRunner is needed for unit tests. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
BigqeuryToGCS.java
package com.example;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class BigqeuryToGCS {
public interface BigQueryToGCSOptions extends PipelineOptions {}
public static void main(final String[] args) {
final BigQueryToGCSOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(BigQueryToGCSOptions.class);
final Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BigQuery", BigQueryIO.readTableRows()
.fromQuery("SELECT * FROM <DB>.<Table>"))
.apply(ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
final TableRow row = c.element();
final String col = String.valueOf(row.get("<Column Name>"));
c.output(col);
}
}))
.apply(TextIO.write().to("<GCS URL for output files>"));
pipeline.run();
}
}
mvn compile exec:java \
~/dataflow-intro
-Dexec.mainClass=com.example.BigqueryToGCS \
-Dexec.args="--project=<Project Name> \
--tempLocation=<GCS URL> \
--gcpTempLocation=<GCS URL> \
--runner=DataflowRunner \
--jobName=BigqueryToGCS" \
-Pdataflow-runner
Als nächstes werde ich das in Java erstellte Projekt in Scala umschreiben.
Da Scala sbt verwendet, wird die Bibliothek von build.sbt verwaltet. Ich möchte nur die für die Ausführung erforderliche Mindestbibliothek angeben, also wie folgt einstellen
build.sbt
name := "<Project Name>"
version := "0.0.1"
scalaVersion := "2.12.10"
val beamVersion = "2.5.0"
libraryDependencies ++= Seq(
"com.google.cloud.dataflow" % "google-cloud-dataflow-java-sdk-all" % beamVersion
)
BigqeuryToGCS.scala
package com.hello
import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.options.{ PipelineOptions, PipelineOptionsFactory }
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.util.Transport
import org.apache.beam.sdk.transforms.{ DoFn, ParDo }
import DoFn.ProcessElement
import org.apache.beam.sdk.io.TextIO
import scala.util.{ Failure, Success, Try}
class ColumnDoFn extends DoFn[TableRow, String] {
@ProcessElement
def processElement(c: ProcessContext) {
val input = c.element()
val col = String.valueOf(input.get("<Column>"))
c.output(col)
}
}
object BigqueryToGCS {
def main(args: Array[String]) :Unit = {
trait BigQueryToGCSOptions extends PipelineOptions with DataflowPipelineOptions
val options = PipelineOptionsFactory.create().as(classOf[BigQueryToGCSOptions])
options.setProject("<Project>")
options.setRunner(classOf[DataflowRunner])
options.setRegion("asia-northeast1")
options.setZone("asia-northeast1-a")
options.setNumWorkers(1)
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE)
options.setWorkerMachineType("n1-standard-1")
options.setStagingLocation("<GCS URL for staging>")
options.setGcpTempLocation("<GCS URL for tmp>")
options.setTempLocation("<GCS URL for tmp>")
val p = Pipeline.create(options)
p.apply("", BigQueryIO.readTableRows().fromQuery("SELECT * FROM <DB>.<Table>"))
.apply("", ParDo.of(new ColumnDoFn))
.apply("", TextIO.write().to("<GCS URL for output files>"))
p.run()
}
}
sbt "runMain com.hello.BigqueryToGCS --project=<Project Name> --runner=DataflowRunner -Pdataflow-runner"
Scio ist eine von Spotify entwickelte Scala-API für Apache Beam und Cloud Dataflow.
sbt new spotify/scio.g8
Die Bigquery-Bibliothek fehlt in der Scio-Vorlage
build.sbt
import sbt._
import Keys._
val scioVersion = "0.7.4"
val beamVersion = "2.11.0"
val scalaMacrosVersion = "2.1.1"
lazy val commonSettings = Defaults.coreDefaultSettings ++ Seq(
organization := "example",
// Semantic versioning http://semver.org/
version := "0.1.0-SNAPSHOT",
scalaVersion := "2.12.10",
scalacOptions ++= Seq("-target:jvm-1.8",
"-deprecation",
"-feature",
"-unchecked"),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
)
lazy val paradiseDependency =
"org.scalamacros" % "paradise" % scalaMacrosVersion cross CrossVersion.full
lazy val macroSettings = Seq(
libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value,
addCompilerPlugin(paradiseDependency)
)
lazy val root: Project = project
.in(file("."))
.settings(commonSettings)
.settings(macroSettings)
.settings(
name := "yuyu_hf",
description := "yuyu_hf",
publish / skip := true,
run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat,
libraryDependencies ++= Seq(
"com.spotify" %% "scio-core" % scioVersion,
"com.spotify" %% "scio-test" % scioVersion % Test,
"com.spotify" %% "scio-bigquery" % scioVersion,
"org.apache.beam" % "beam-runners-direct-java" % beamVersion,
// optional dataflow runner
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
"org.slf4j" % "slf4j-simple" % "1.7.25"
)
)
.enablePlugins(PackPlugin)
lazy val repl: Project = project
.in(file(".repl"))
.settings(commonSettings)
.settings(macroSettings)
.settings(
name := "repl",
description := "Scio REPL for yuyu_hf",
libraryDependencies ++= Seq(
"com.spotify" %% "scio-repl" % scioVersion
),
Compile / mainClass := Some("com.spotify.scio.repl.ScioShell"),
publish / skip := true
)
.dependsOn(root)
BigqeuryToGCS.scala
package example
import com.spotify.scio.bigquery._
import com.spotify.scio.ContextAndArgs
object BigqueryToGCS {
@BigQueryType.fromQuery("SELECT * FROM [<Project>:<DB>.<Table>]")
class Row
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.typedBigQuery[Row]()
.map(r => r.sepal_length.getOrElse(""))
.saveAsTextFile("<GCS URL for output files>")
sc.close
}
}
sbt "runMain example.BigqueryToGCS --project=<Project Name> --runner=DataflowRunner"
Persönlich habe ich einige Zeit gebraucht, um mich an Apache Beam zu gewöhnen, also habe ich es in Java geschrieben, das weltweit viel Beispielcode enthält. Ich denke, es ist eine gute Idee, zunächst den Java-Code in Scala zu konvertieren. Wenn Sie eine Person sind, die Scala-ähnlichen Code verwendet, können Sie möglicherweise Zwischendaten besser verarbeiten als Java. Ob Sie Scio verwenden sollen oder nicht, sollten Sie anhand der Aktualisierungshäufigkeit der Bibliothek und der Teamsituation prüfen.