[JAVA] Study Flilnk with Elasticsearch Exercise Code

Environment OS: Ubuntu 16.04 Kafka: kafka_2.10-0.10.2.0 Elasticsearch: elasticsearch-2.4.3 Kibana: kibana-4.6.4-linux-x86_64 Flink: flink-1.3.1-bin-hadoop27-scala_2.10 Java: openjdk version "1.8.0_131" Scala: 2.10 Build Tool: Apache Maven 3.5.0 IDE: IntelliJ

Add dependencies to pom.xml to use Elasticsearch

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.2.0</version>
</dependency>

Write to Elasticsearch Use "ElasticsearchSink" class which is provided by Flinkā€™s Elasticsearch Connector to write a DataStream to an Elasticsearch index Where, "jConfig" is userConfig defined as a map of user settings. "jTransports" is transportAddresses of Elasticsearch nodes to which to connect using a TransportClient. "PopularPlaceInserter" is elasticsearchSinkFunction used to generate multiple ActionRequest from the incoming element. (https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.html)

 popularPlaces.addSink(
      new ElasticsearchSink(jConfig, jTransports, new PopularPlaceInserter))

"PopularPlaceInserter" is defined as following.

  class PopularPlaceInserter extends ElasticsearchSinkFunction[(Float, Float, Long, Boolean, Int)] {
    def process(record: (Float, Float, Long, Boolean, Int), ctx: RuntimeContext, indexer: RequestIndexer) {
      val json = Map(
        "time" -> record._3.toString,
        "location" -> (record._2 + "," + record._1),
        "isStart" -> record._4.toString,
        "cnt" -> record._5.toString
      )
      val rqst: IndexRequest = Requests.indexRequest
        .index("nyc-places")
        .`type`("popular-locations")
        .source(json.asJava)
      indexer.add(rqst)
    }
  }

Memo: What is Sink?

https://en.wikipedia.org/wiki/Sink_(computing) In computing, a sink, event sink or data sink is a class or function designed to receive incoming events from another object or function. This is commonly implemented in C++ as callbacks. Other Object-oriented languages, such as Java and C#, have built-in support for sinks by allowing events to be fired to delegate functions.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html So what?? In Flink, "Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams". "addSink" is just one of built-in output formats, which "invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions". (I hope someday I will fully understand what they say here.)

Memo: What is "class xxx extends"? Reading through Scala code, I often find statement like "class ArrayElement extends Element". What is this? By using "extends", class ArrayElement can inherit all non-private members from class Element. If a member with the same name and parameters is already implemented in the subclass, a member of a superclass is not inherited. Therefore, in the example above, "PopularPlaceInserter" inherited non-private members from "ElasticsearchSinkFunction". According to this API document, the method creates multiple ActionRequests from an element in a Stream. This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.

Recommended Posts

Study Flilnk with Elasticsearch Exercise Code
Lombok with VS Code
Check compliance with object-oriented exercise
Docker management with VS Code
Format Ruby with VS Code
Hello World with VS Code!
Java study memo 2 with Progate
Reduce verbose code with Lombok
Study Java with Progate Note 1