[JAVA] How to build an Apache Flink application from scratch in 5 minutes

This tutorial will briefly show you how to build a ** Apache Flink application ** from scratch in minutes.

Development environment preparation

Apache Flink works on Linux, Max OS X and Windows and is compatible. To develop Flink applications, either Java version 8.0 or later or Maven on your computer /install.html?spm=a2c65.11461447.0.0.29c056bf7J56YG) You need to run one of the environments. If you are using the Java environment, executing the $ java Cversion command will output the version information you are using as shown below.

java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

If you are using Maven environment, running the $ mvn -version command will output the following version information.

Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

In addition, use IntelliJ IDEA as the IDE for your Flink application (the community free version is sufficient for this tutorial). It is recommended. Eclipse also works for this purpose, but Eclipse has had problems with Scala and Java hybrid projects in the past, so Eclipse It is not recommended to select.

Create a Maven project

You can follow the steps in this section to create a Flink project and import it into IntelliJ IDEA. Use the Flink Maven Archetype to create the project structure and some initial default dependencies. In your working directory, run the mvn archetype: generate command to create the project.

    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false

The above groupId, artifactId, and package can be edited to any path. Using the above parameters, Maven will automatically create a project structure like this:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

The pom.xml file already contains the required Flink dependencies, and some sample programming frameworks are provided in src / main / java.

Compile the Flink program

Now, follow the steps below to create your own Flink program. To do this, launch IntelliJ IDEA, select Import Project, and select pom.xml under the root directory of my-link-project. Then import the project as instructed.

Create a SocketWindowWordCount.java file under src / main / java / myflink.

package myflink;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

    }
}

For now, this program is just a basic framework, so we'll fill in the code step by step. Please note that the import statement will be added automatically by the IDE, so please note that it will not be written below. At the end of this section, you'll see the completed code. If you want to skip the steps below, paste the final complete code directly into the editor.

The first step in the Flink program is to create a StreamExecutionEnvironment. This is an entry class that can be used for setting parameters, creating data sources, submitting tasks, etc. Now let's add it to the main function.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Next, create a data source that reads data from the socket on local port 9000.

DataStream text = env.socketTextStream("localhost", 9000, "\n");

This will create a string type DataStream. DataStream is Flink's core API for stream processing. It defines many common operations (filtering, transformation, aggregation, windows, associations, etc.). In this example, we are interested in the number of times each word appears in a particular time window, for example a 5 second window. For this purpose, the string data is first parsed into words and their occurrences (represented by Tuple2 <String, Integer>), with the first field being the word and the second field being the word occurrence. It will be the number of times. The initial value of the number of occurrences is set to 1. Since there can be multiple words in a row of data, flatmap is implemented for parsing.

DataStream> wordCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                });

Then group the data streams based on the word field (that is, index field 0). Here we use the keyBy (int index) method to get a word-keyed Tuple2 <String, Integer> data stream. Then specify any window on the stream and calculate the result based on the data in the window. In this example, word occurrences are aggregated every 5 seconds and each window counts from scratch.

DataStream> windowCounts = wordCounts
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

The second .timeWindow () specifies a tumble window for 5 seconds. The third call specifies the aggregate function for each key and each window. In this example, this is added by the occurrences field (that is, index field 1). The resulting data stream outputs the number of occurrences of each word every 5 seconds.

Finally, it outputs the data stream to the console and starts running.

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

A final ʻenv.execute call is required to start the actual Flink job. All Operator operations (source creation, aggregation, printing, etc.) only build a graph of the internal Operator operations. Only when ʻexecute () is called will they be sent to the cluster or local computer for execution.

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // Create the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Obtain the input data by connecting to the socket. Here you want to connect to the local 9000 port. If 9000 port is not available, you'll need to change the port.
        DataStream text = env.socketTextStream("localhost", 9000, "\n");

        // Parse the data, group by word, and perform the window and aggregation operations.
        DataStream> windowCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // Print the results to the console. Note that here single-threaded printed is used, rather than multi-threading.
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }
}

Program execution

To run the sample program, start NetCat on your terminal and get the input stream.

nc -lk 9000

For Windows, you can install and run Ncat via NMAP.

ncat -lk 9000

Then directly execute the main method of SocketWindowWordCount.

Simply type a word into the NetCat console and the SocketWindowWordCount output console will display statistics on how often each word appears. If you want to see a count of 1 or more, enter the same word repeatedly within 5 seconds.

image.png

Recommended Posts

How to build an Apache Flink application from scratch in 5 minutes
How to build an executable jar in Maven
How to create an application
I tried to make an application in 3 months from inexperienced
What happened in "Java 8 to Java 11" and how to build an environment
How to publish an application on Heroku
Understand in 5 minutes !! How to use Docker
How to get Class from Element in Java
How to solve an Expression Problem in Java
I tried to develop an application in 2 languages
[Rails] How to build an environment with Docker
How to build the simplest blockchain in Ruby
How to start a subscript from an arbitrary number in Ruby iterative processing
How to make an image partially transparent in Processing
How to implement guest login in 5 minutes in rails portfolio
How to install Docker in the local environment of an existing Rails application [Rails 6 / MySQL 8]
Understand how to use Swift's JSON Decoder in 3 minutes
How to write an external reference key in FactoryBot
How to build android-midi-lib
[Rails] How to display an image in the view
How to pass an object to Mapper in MyBatis without arguments
How to create an application server on an EC2 instance on AWS
[iOS] [Objective-C] How to update a widget from an Objective-C app
How to change a string in an array to a number in Ruby
How to retrieve the hash value in an array in Ruby
How to deploy Java application to Alibaba Cloud EDAS in Eclipse
How to install Web application for each language in Nginx
How to monitor application information in real time using JConsole
[Docker context] ~ How to access docker in remote environment from VScode ~
[Integration test code] How to select an element from date_select
[Ruby] How to batch convert strings in an array to numbers
How to use Apache POI
How to handle an instance
How to migrate from JUnit4 to JUnit5
How to build an environment with Docker, which is the minimum required to start a Rails application
How to prevent past dates from being entered in Rails forms
[Ruby] How to count even or odd numbers in an array
How to output the value when there is an array in the array
How to publish an application using AWS (3) EC2 instance environment construction
How to install the language used in Ubuntu and how to build the environment
How to get and add data from Firebase Firestore in Ruby
How to get the length of an audio file in java
How to use Lombok in Spring
How to find May'n in XPath
How to push from Tarminal to GitHub
How to hide scrollbars in WebView
How to run JUnit in Eclipse
How to iterate infinitely in Ruby
[Rails] How to write in Japanese
How to master programming in 3 months
How to get parameters in Spark
How to insert an external library
How to use InjectorHolder in OpenAM
How to introduce jQuery in Rails 6
How to name variables in Java
How to set Lombok in Eclipse
How to build vim on Ubuntu 20.04
How to build CloudStack using Docker
How to concatenate strings in java
How to install Swiper in Rails
How to change from HTML to Haml