Hello! This is @RyosukeKawamura from LOB. ** This article is ** LOB Advent Calendar 2018 ** Article on the 19th day **.
This is the second post. Right now I'm implementing data processing using Apache Flink in my business, so I'd like to write about it this time.
Regarding Flink, there are not many concrete examples on the net, and even if there are, only hits such as "I tried" or "Explanation of Flink's ◯◯ API" are hit. So what should I do after all? It was hard to understand, but there was a colleague who was very familiar with it, and as I proceeded while learning various things, I finally said, "This is really convenient if you make full use of it ...!" Now that I understand it, I would like to summarize it for the purpose of organizing my knowledge.
Original document is quite substantial. If you read it carefully, you can understand it (should). In a nutshell, it's a "distributed stream processing platform." I don't know because there are a lot of similar things (Storm, Spark, etc.), but it seems that Flink has evolved from stream processing and has exuded to other areas such as batch.
I haven't touched on the essence yet, but I think it's about the following three points.
I'm pretty happy that it can be implemented in almost the same way when processing stream data sent from time to time, when linking files, and when reading and processing tables. What we are currently working on is batch processing in which tsv files are linked, but this may change so that logs are sent via Kafka etc. instead of files. This is a pretty nice point, as I think that large configuration changes will occur frequently as the project progresses. However, there are APIs that can only be used with streaming processing, so this time we are implementing ** handling finite data linked to files but handling it as streaming processing ** (for details, see "Clogged up". See below).
This was the most surprising. It is a mechanism that allows you to perform aggregation etc. just by connecting the methods crisply as if you are processing an array. People who are accustomed to functional languages will find it particularly easy. (I have no experience ...) This time it is implemented in Java, but since Flink can also be used in Scala, it seems that it is in better condition. Sample code looks like this.
I'm sorry I haven't implemented this yet, so I don't understand the details. .. .. Depending on the implementation, it seems that you can set up something like a checkpoint and start over from there. I'm glad that idempotency seems to be easy to guarantee. The console that comes with the Flink cluster by default is also easy to use and very quick to check the job status.
Although we were proceeding with processing using a finite file as input, we had to meet the requirements of "I want to use the streaming processing API" and "may change to streaming processing eventually". ** Read file-> Read as Table once-> Load data from Table-> Treat result as data stream ** I solved it by doing something a little tricky. If you look closely, you can find it in the official document, so you should read it properly ... The implementation looks like this.
csvToStream.java
//Read the environment required for stream processing and table processing
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
//File reading
TableSource source = new CsvTableSource.Builder()
.path("/path/to/user_data.tsv")
.ignoreFirstLine() //I'm glad that it's OK to skip the header just by writing this method
.ignoreParseErrors() //Illegal data can now be ignored
.fieldDelimiter("\t") //Tab delimited
.field("cookie", Types.STRING)
.field("date", Types.TIMESTAMP)
.build();
//Registered as a table data source
tEnv.registerTableSource("users", source);
//Get data as a table
Table table = tEnv
.scan("users"); //here.filter()And.select()And繋ぐとSQLチックな処理もできる
//Convert to streaming
DataStream<UserEntity> ds = tEnv.toAppendStream(table, Row.class) //If you define Entity, you can read it as a stream of that type.
ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...); //I'm omitting here(It seems to be a form of implementation that is often like FlatMap and passing it to addSink)
sEnv.execute(); //If you forget here, nothing will work
That's all. Lol A little while ago
python
ds.flatMap(new hogeFlatMap()).setParallelism(parallelNum).addSink(...);
In other words, hogeFlatMap () processes streaming in parallel for the number of parallelNum, and what is out in hogeFlatMap () is passed to addSink. It's comfortable to be able to connect the processes as if you were piped with a shell. I didn't understand at first. Since the inside of flatMap is called every time, if you make a connection in this, you will not be able to connect to the socket and you will die (you need to pass it to the constructor), so it is difficult to understand what works and at what timing. The point that I had a hard time at first was that it was hard to see even if I stepped.
It's hard to understand until you get a habit (although I haven't grasped it yet), but Flink has a scent that seems to be a strong ally of the data pipeline if you can master it. I have a chance to come into contact with big data of ad tech, so I hope to study more and master it.
We have many opportunities to use new data processing technologies, not just Flink. In particular, we can handle quantity and quality of data that other companies cannot handle, so if you are interested or want to do it together, let's do it together!
We are looking for friends to "change the order of distribution and create an advertising platform"! We are in a daily strategy meeting to create a good data infrastructure! !! https://lob-inc.com/recruit/
Recommended Posts