Environment OS: Ubuntu 16.04 Kafka: kafka_2.10-0.10.2.0 Elasticsearch: elasticsearch-2.4.3 Kibana: kibana-4.6.4-linux-x86_64 Flink: flink-1.3.1-bin-hadoop27-scala_2.10 Java: openjdk version "1.8.0_131" Scala: 2.10 Build Tool: Apache Maven 3.5.0 IDE: IntelliJ
Add dependencies to pom.xml to use Elasticsearch
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.2.0</version>
</dependency>
Write to Elasticsearch Use "ElasticsearchSink" class which is provided by Flink’s Elasticsearch Connector to write a DataStream to an Elasticsearch index Where, "jConfig" is userConfig defined as a map of user settings. "jTransports" is transportAddresses of Elasticsearch nodes to which to connect using a TransportClient. "PopularPlaceInserter" is elasticsearchSinkFunction used to generate multiple ActionRequest from the incoming element. (https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.html)
popularPlaces.addSink(
new ElasticsearchSink(jConfig, jTransports, new PopularPlaceInserter))
"PopularPlaceInserter" is defined as following.
class PopularPlaceInserter extends ElasticsearchSinkFunction[(Float, Float, Long, Boolean, Int)] {
def process(record: (Float, Float, Long, Boolean, Int), ctx: RuntimeContext, indexer: RequestIndexer) {
val json = Map(
"time" -> record._3.toString,
"location" -> (record._2 + "," + record._1),
"isStart" -> record._4.toString,
"cnt" -> record._5.toString
)
val rqst: IndexRequest = Requests.indexRequest
.index("nyc-places")
.`type`("popular-locations")
.source(json.asJava)
indexer.add(rqst)
}
}
Memo: What is Sink?
https://en.wikipedia.org/wiki/Sink_(computing) In computing, a sink, event sink or data sink is a class or function designed to receive incoming events from another object or function. This is commonly implemented in C++ as callbacks. Other Object-oriented languages, such as Java and C#, have built-in support for sinks by allowing events to be fired to delegate functions.
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html So what?? In Flink, "Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams". "addSink" is just one of built-in output formats, which "invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions". (I hope someday I will fully understand what they say here.)
Memo: What is "class xxx extends"? Reading through Scala code, I often find statement like "class ArrayElement extends Element". What is this? By using "extends", class ArrayElement can inherit all non-private members from class Element. If a member with the same name and parameters is already implemented in the subclass, a member of a superclass is not inherited. Therefore, in the example above, "PopularPlaceInserter" inherited non-private members from "ElasticsearchSinkFunction". According to this API document, the method creates multiple ActionRequests from an element in a Stream. This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.
Recommended Posts