[JAVA] Data processing using Apache Flink

Introduction

Hello! This is @RyosukeKawamura from LOB. ** This article is ** LOB Advent Calendar 2018 ** Article on the 19th day **.

This is the second post. Right now I'm implementing data processing using Apache Flink in my business, so I'd like to write about it this time.

Regarding Flink, there are not many concrete examples on the net, and even if there are, only hits such as "I tried" or "Explanation of Flink's ◯◯ API" are hit. So what should I do after all? It was hard to understand, but there was a colleague who was very familiar with it, and as I proceeded while learning various things, I finally said, "This is really convenient if you make full use of it ...!" Now that I understand it, I would like to summarize it for the purpose of organizing my knowledge.

What is Apache Flink

Original document is quite substantial. If you read it carefully, you can understand it (should). In a nutshell, it's a "distributed stream processing platform." I don't know because there are a lot of similar things (Storm, Spark, etc.), but it seems that Flink has evolved from stream processing and has exuded to other areas such as batch.

Good points of Flink

I haven't touched on the essence yet, but I think it's about the following three points.

(1) Both stream and batch are supported and can be implemented in a similar way.

I'm pretty happy that it can be implemented in almost the same way when processing stream data sent from time to time, when linking files, and when reading and processing tables. What we are currently working on is batch processing in which tsv files are linked, but this may change so that logs are sent via Kafka etc. instead of files. This is a pretty nice point, as I think that large configuration changes will occur frequently as the project progresses. However, there are APIs that can only be used with streaming processing, so this time we are implementing ** handling finite data linked to files but handling it as streaming processing ** (for details, see "Clogged up". See below).

(2) Rich API makes it easy to reach the itch when counting

This was the most surprising. It is a mechanism that allows you to perform aggregation etc. just by connecting the methods crisply as if you are processing an array. People who are accustomed to functional languages will find it particularly easy. (I have no experience ...) This time it is implemented in Java, but since Flink can also be used in Scala, it seems that it is in better condition. Sample code looks like this.

(3) When a failure occurs, it recovers wisely and processing can be resumed from there (implementation like this is possible)

I'm sorry I haven't implemented this yet, so I don't understand the details. .. .. Depending on the implementation, it seems that you can set up something like a checkpoint and start over from there. I'm glad that idempotency seems to be easy to guarantee. The console that comes with the Flink cluster by default is also easy to use and very quick to check the job status.

image.png

Where it gets stuck

Treat finite data such as CSV as streaming processing

Although we were proceeding with processing using a finite file as input, we had to meet the requirements of "I want to use the streaming processing API" and "may change to streaming processing eventually". ** Read file-> Read as Table once-> Load data from Table-> Treat result as data stream ** I solved it by doing something a little tricky. If you look closely, you can find it in the official document, so you should read it properly ... The implementation looks like this.

csvToStream.java


//Read the environment required for stream processing and table processing
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv); 

//File reading
TableSource source =  new CsvTableSource.Builder() 
    .path("/path/to/user_data.tsv")
    .ignoreFirstLine() //I'm glad that it's OK to skip the header just by writing this method
    .ignoreParseErrors() //Illegal data can now be ignored
    .fieldDelimiter("\t") //Tab delimited
    .field("cookie", Types.STRING) 
    .field("date", Types.TIMESTAMP)
    .build(); 

//Registered as a table data source
tEnv.registerTableSource("users", source);

//Get data as a table
Table table = tEnv
    .scan("users"); //here.filter()And.select()And繋ぐとSQLチックな処理もできる

//Convert to streaming
DataStream<UserEntity> ds = tEnv.toAppendStream(table, Row.class) //If you define Entity, you can read it as a stream of that type.

ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...); //I'm omitting here(It seems to be a form of implementation that is often like FlatMap and passing it to addSink)
sEnv.execute(); //If you forget here, nothing will work

At first glance, it's hard to imagine how it works in the first place.

That's all. Lol A little while ago

python


ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...);

In other words, hogeFlatMap () processes streaming in parallel for the number of parallelNum, and what is out in hogeFlatMap () is passed to addSink. It's comfortable to be able to connect the processes as if you were piped with a shell. I didn't understand at first. Since the inside of flatMap is called every time, if you make a connection in this, you will not be able to connect to the socket and you will die (you need to pass it to the constructor), so it is difficult to understand what works and at what timing. The point that I had a hard time at first was that it was hard to see even if I stepped.

in conclusion

It's hard to understand until you get a habit (although I haven't grasped it yet), but Flink has a scent that seems to be a strong ally of the data pipeline if you can master it. I have a chance to come into contact with big data of ad tech, so I hope to study more and master it.

We have many opportunities to use new data processing technologies, not just Flink. In particular, we can handle quantity and quality of data that other companies cannot handle, so if you are interested or want to do it together, let's do it together!

We are looking for friends to "change the order of distribution and create an advertising platform"! We are in a daily strategy meeting to create a good data infrastructure! !! https://lob-inc.com/recruit/

Recommended Posts

Data processing using Apache Flink
Data processing using stream API from Java 8
Excel operation using Apache POI
[Swift] Share data using AppDelegate
Sign XML using Apache Santuario
I tried using Apache Wicket
[Swift] Asynchronous processing using PromiseKit
[Processing] Try using GT Force.
Csv output processing using super-csv
Excel output using Apache POI!
Development of Flink using DataStream API
[Kotlin] Example of processing using Enum
Deleting files using recursive processing [Java]
Perform parallel processing using Java's CyclicBarrier
I measured the performance of the in-memory data grid Apache Ignite using Yardstick.