[JAVA] Étudiez Flilnk avec le code d'exercice Elasticsearch

Environment OS: Ubuntu 16.04 Kafka: kafka_2.10-0.10.2.0 Elasticsearch: elasticsearch-2.4.3 Kibana: kibana-4.6.4-linux-x86_64 Flink: flink-1.3.1-bin-hadoop27-scala_2.10 Java: openjdk version "1.8.0_131" Scala: 2.10 Build Tool: Apache Maven 3.5.0 IDE: IntelliJ

Add dependencies to pom.xml to use Elasticsearch

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.2.0</version>
</dependency>

Write to Elasticsearch Use "ElasticsearchSink" class which is provided by Flink’s Elasticsearch Connector to write a DataStream to an Elasticsearch index Where, "jConfig" is userConfig defined as a map of user settings. "jTransports" is transportAddresses of Elasticsearch nodes to which to connect using a TransportClient. "PopularPlaceInserter" is elasticsearchSinkFunction used to generate multiple ActionRequest from the incoming element. (https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.html)

 popularPlaces.addSink(
      new ElasticsearchSink(jConfig, jTransports, new PopularPlaceInserter))

"PopularPlaceInserter" is defined as following.

  class PopularPlaceInserter extends ElasticsearchSinkFunction[(Float, Float, Long, Boolean, Int)] {
    def process(record: (Float, Float, Long, Boolean, Int), ctx: RuntimeContext, indexer: RequestIndexer) {
      val json = Map(
        "time" -> record._3.toString,
        "location" -> (record._2 + "," + record._1),
        "isStart" -> record._4.toString,
        "cnt" -> record._5.toString
      )
      val rqst: IndexRequest = Requests.indexRequest
        .index("nyc-places")
        .`type`("popular-locations")
        .source(json.asJava)
      indexer.add(rqst)
    }
  }

Memo: What is Sink?

https://en.wikipedia.org/wiki/Sink_(computing) In computing, a sink, event sink or data sink is a class or function designed to receive incoming events from another object or function. This is commonly implemented in C++ as callbacks. Other Object-oriented languages, such as Java and C#, have built-in support for sinks by allowing events to be fired to delegate functions.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html So what?? In Flink, "Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams". "addSink" is just one of built-in output formats, which "invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions". (I hope someday I will fully understand what they say here.)

Memo: What is "class xxx extends"? Reading through Scala code, I often find statement like "class ArrayElement extends Element". What is this? By using "extends", class ArrayElement can inherit all non-private members from class Element. If a member with the same name and parameters is already implemented in the subclass, a member of a superclass is not inherited. Therefore, in the example above, "PopularPlaceInserter" inherited non-private members from "ElasticsearchSinkFunction". According to this API document, the method creates multiple ActionRequests from an element in a Stream. This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.

Recommended Posts

Étudiez Flilnk avec le code d'exercice Elasticsearch
Lombok avec VS Code
Vérifier la conformité avec les exercices orientés objet
Gestion Docker avec VS Code
Formater Ruby avec VS Code
Bonjour tout le monde avec VS Code!
Mémo d'étude Java 2 avec Progate
Réduisez le code redondant avec Lombok
Étudier Java avec Progate Note 1