This tutorial will briefly show you how to build a ** Apache Flink application ** from scratch in minutes.
Apache Flink works on Linux, Max OS X and Windows and is compatible. To develop Flink applications, either Java version 8.0 or later or Maven on your computer /install.html?spm=a2c65.11461447.0.0.29c056bf7J56YG) You need to run one of the environments. If you are using the Java environment, executing the $ java Cversion
command will output the version information you are using as shown below.
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
If you are using Maven environment, running the $ mvn -version command will output the following version information.
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
In addition, use IntelliJ IDEA as the IDE for your Flink application (the community free version is sufficient for this tutorial). It is recommended. Eclipse also works for this purpose, but Eclipse has had problems with Scala and Java hybrid projects in the past, so Eclipse It is not recommended to select.
You can follow the steps in this section to create a Flink project and import it into IntelliJ IDEA. Use the Flink Maven Archetype to create the project structure and some initial default dependencies. In your working directory, run the mvn archetype: generate
command to create the project.
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=my-flink-project \
-DartifactId=my-flink-project \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false
The above groupId, artifactId, and package can be edited to any path. Using the above parameters, Maven will automatically create a project structure like this:
$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
└── main
├── java
│ └── myflink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
The pom.xml file already contains the required Flink dependencies, and some sample programming frameworks are provided in src / main / java
.
Now, follow the steps below to create your own Flink program. To do this, launch IntelliJ IDEA, select Import Project, and select pom.xml under the root directory of my-link-project. Then import the project as instructed.
Create a SocketWindowWordCount.java
file under src / main / java / myflink
.
package myflink;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
}
}
For now, this program is just a basic framework, so we'll fill in the code step by step. Please note that the import statement will be added automatically by the IDE, so please note that it will not be written below. At the end of this section, you'll see the completed code. If you want to skip the steps below, paste the final complete code directly into the editor.
The first step in the Flink program is to create a StreamExecutionEnvironment
. This is an entry class that can be used for setting parameters, creating data sources, submitting tasks, etc. Now let's add it to the main function.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Next, create a data source that reads data from the socket on local port 9000.
DataStream text = env.socketTextStream("localhost", 9000, "\n");
This will create a string type DataStream
. DataStream
is Flink's core API for stream processing. It defines many common operations (filtering, transformation, aggregation, windows, associations, etc.). In this example, we are interested in the number of times each word appears in a particular time window, for example a 5 second window. For this purpose, the string data is first parsed into words and their occurrences (represented by Tuple2 <String, Integer>
), with the first field being the word and the second field being the word occurrence. It will be the number of times. The initial value of the number of occurrences is set to 1. Since there can be multiple words in a row of data, flatmap
is implemented for parsing.
DataStream> wordCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
Then group the data streams based on the word field (that is, index field 0). Here we use the keyBy (int index)
method to get a word-keyed Tuple2 <String, Integer>
data stream. Then specify any window on the stream and calculate the result based on the data in the window. In this example, word occurrences are aggregated every 5 seconds and each window counts from scratch.
DataStream> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
The second .timeWindow ()
specifies a tumble window for 5 seconds. The third call specifies the aggregate function for each key and each window. In this example, this is added by the occurrences field (that is, index field 1). The resulting data stream outputs the number of occurrences of each word every 5 seconds.
Finally, it outputs the data stream to the console and starts running.
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
A final ʻenv.execute call is required to start the actual Flink job. All Operator operations (source creation, aggregation, printing, etc.) only build a graph of the internal Operator operations. Only when ʻexecute ()
is called will they be sent to the cluster or local computer for execution.
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// Create the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9000 port. If 9000 port is not available, you'll need to change the port.
DataStream text = env.socketTextStream("localhost", 9000, "\n");
// Parse the data, group by word, and perform the window and aggregation operations.
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// Print the results to the console. Note that here single-threaded printed is used, rather than multi-threading.
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
To run the sample program, start NetCat on your terminal and get the input stream.
nc -lk 9000
For Windows, you can install and run Ncat via NMAP.
ncat -lk 9000
Then directly execute the main method of SocketWindowWordCount
.
Simply type a word into the NetCat console and the SocketWindowWordCount
output console will display statistics on how often each word appears. If you want to see a count of 1 or more, enter the same word repeatedly within 5 seconds.
Recommended Posts