[JAVA] Cloud Dataflow template created with Scala


In the article, I wasn't used to Apache Beam at the time, so I used Java instead of Scala, but when considering whether to use Java, Scala, or Scala + Scio, there is something that can be compared at a glance. I thought it would be good and tried each with simple processing

It's a very simple process that extracts a specific column in BigQuery and spits it out to GCS.

Template creation (Java edition)

Project creation

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.beam \
    -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
    -DgroupId=com.example \
    -DartifactId=bigquery-to-gcs \
    -Dversion="0.1" \
    -DinteractiveMode=false \

Library management



<project xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">





package com.example;

import com.google.api.services.bigquery.model.TableRow;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class BigqeuryToGCS {

    public interface BigQueryToGCSOptions extends PipelineOptions {}

    public static void main(final String[] args) {
        final BigQueryToGCSOptions options = PipelineOptionsFactory.fromArgs(args)

        final Pipeline pipeline = Pipeline.create(options);

        pipeline.apply("Read from BigQuery", BigQueryIO.readTableRows()
                            .fromQuery("SELECT * FROM <DB>.<Table>"))
                .apply(ParDo.of(new DoFn<TableRow, String>() {
					public void processElement(final ProcessContext c) {
						final TableRow row = c.element();
						final String col = String.valueOf(row.get("<Column Name>"));
				.apply(TextIO.write().to("<GCS URL for output files>"));


Template execution

 mvn compile exec:java \
    -Dexec.mainClass=com.example.BigqueryToGCS \
    -Dexec.args="--project=<Project Name> \
    --tempLocation=<GCS URL> \
    --gcpTempLocation=<GCS URL> \
    --runner=DataflowRunner \
    --jobName=BigqueryToGCS" \

Template creation (Scala edition)

Next, I will rewrite the project created in Java to Scala.

Library management

Since Scala uses sbt, the library is managed by build.sbt. I want to put only the minimum library required for execution, so set as follows


name := "<Project Name>"
version := "0.0.1"
scalaVersion := "2.12.10"

val beamVersion = "2.5.0"

libraryDependencies ++= Seq(
  "com.google.cloud.dataflow" % "google-cloud-dataflow-java-sdk-all" % beamVersion



package com.hello

import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.options.{ PipelineOptions, PipelineOptionsFactory }
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.util.Transport
import org.apache.beam.sdk.transforms.{ DoFn, ParDo }
import DoFn.ProcessElement
import org.apache.beam.sdk.io.TextIO
import scala.util.{ Failure, Success, Try}

class ColumnDoFn extends DoFn[TableRow, String] {
    def processElement(c: ProcessContext) {
        val input = c.element()
        val col = String.valueOf(input.get("<Column>"))

object BigqueryToGCS {
    def main(args: Array[String]) :Unit = {
        trait BigQueryToGCSOptions extends PipelineOptions with DataflowPipelineOptions
        val options = PipelineOptionsFactory.create().as(classOf[BigQueryToGCSOptions])

        options.setStagingLocation("<GCS URL for staging>")
        options.setGcpTempLocation("<GCS URL for tmp>")
        options.setTempLocation("<GCS URL for tmp>")

        val p = Pipeline.create(options)

        p.apply("", BigQueryIO.readTableRows().fromQuery("SELECT * FROM <DB>.<Table>"))
         .apply("", ParDo.of(new ColumnDoFn))
         .apply("", TextIO.write().to("<GCS URL for output files>"))


Template execution

sbt "runMain com.hello.BigqueryToGCS --project=<Project Name> --runner=DataflowRunner -Pdataflow-runner"

Template creation (Scala + Scio edition)

Scio is a Scala API for Apache Beam and Cloud Dataflow developed by Spotify.

Project creation

sbt new spotify/scio.g8

Library management

Bigquery library is missing in scio template so add it


import sbt._
import Keys._

val scioVersion = "0.7.4"
val beamVersion = "2.11.0"
val scalaMacrosVersion = "2.1.1"

lazy val commonSettings = Defaults.coreDefaultSettings ++ Seq(
  organization := "example",
  // Semantic versioning http://semver.org/
  version := "0.1.0-SNAPSHOT",
  scalaVersion := "2.12.10",
  scalacOptions ++= Seq("-target:jvm-1.8",
  javacOptions ++= Seq("-source", "1.8", "-target", "1.8")

lazy val paradiseDependency =
  "org.scalamacros" % "paradise" % scalaMacrosVersion cross CrossVersion.full
lazy val macroSettings = Seq(
  libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value,

lazy val root: Project = project
    name := "yuyu_hf",
    description := "yuyu_hf",
    publish / skip := true,
    run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat,
    libraryDependencies ++= Seq(
      "com.spotify" %% "scio-core" % scioVersion,
      "com.spotify" %% "scio-test" % scioVersion % Test,
      "com.spotify" %% "scio-bigquery" % scioVersion,
      "org.apache.beam" % "beam-runners-direct-java" % beamVersion,
      // optional dataflow runner
      "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
      "org.slf4j" % "slf4j-simple" % "1.7.25"

lazy val repl: Project = project
    name := "repl",
    description := "Scio REPL for yuyu_hf",
    libraryDependencies ++= Seq(
      "com.spotify" %% "scio-repl" % scioVersion
    Compile / mainClass := Some("com.spotify.scio.repl.ScioShell"),
    publish / skip := true



package example

import com.spotify.scio.bigquery._
import com.spotify.scio.ContextAndArgs

object BigqueryToGCS {
  @BigQueryType.fromQuery("SELECT * FROM [<Project>:<DB>.<Table>]")
  class Row

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

      .map(r => r.sepal_length.getOrElse(""))
      .saveAsTextFile("<GCS URL for output files>")


Template execution

sbt "runMain example.BigqueryToGCS --project=<Project Name> --runner=DataflowRunner"


It took me a while to get used to Apache Beam personally, so at first I wrote it in Java, which has a lot of sample code in the world. I think it's a good idea to start by converting Java code to Scala first, and if you have Scala-like code, you may be able to process intermediate data more beautifully than Java. Whether or not to use Scio should be considered by looking at the library update frequency and team status.


