Environment OS: Ubuntu 16.04 Kafka: kafka_2.10-0.10.2.0 Elasticsearch: elasticsearch-2.4.3 Kibana: kibana-4.6.4-linux-x86_64 Flink: flink-1.3.1-bin-hadoop27-scala_2.10 Java: openjdk version "1.8.0_131" Build Tool: Apache Maven 3.5.0 IDE: IntelliJ
Add dependencies to pom.xml to use Kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.10</artifactId>
<version>1.2.0</version>
</dependency>
Write data to Kafka To write data to Kafka, a program needs to write DataStream into "Kafka topic". Instruction: http://training.data-artisans.com/exercises/toFromKafka.html The souce code is here.
val rides = env.addSource(new TaxiRideSource(input, maxDelay, speed))
val filteredRides = rides.filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
filteredRides.addSink(
new FlinkKafkaProducer010[TaxiRide](
LOCAL_KAFKA_BROKER,
CLEANSED_RIDES_TOPIC,
new TaxiRideSchema))
Memo: What is a serializer? According to the instruction, "TaxiRideSchema" in the above code is a "Serializer". What is a Serializer?
https://stackoverflow.com/questions/633402/what-is-serialization Serialization is the process of turning an object in memory into a stream of bytes so you can do stuff like store it on disk or send it over the network. Deserialization is the reverse process: turning a stream of bytes into an object in memory.
Read data from Kafka After the Kafka topic was filled with Datastream, the program needs to read input from the topic by using a KafkaConsumer data source. Instruction: http://training.data-artisans.com/exercises/toFromKafka.html The souce code is here.
// configure Kafka consumer
val kafkaProps = new Properties
kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP)
// always read the Kafka topic from the start
kafkaProps.setProperty("auto.offset.reset", "earliest")
// create a Kafka consumer
val consumer = new FlinkKafkaConsumer010[TaxiRide](
RideCleansingToKafka.CLEANSED_RIDES_TOPIC,
new TaxiRideSchema,
kafkaProps)
Memo: What is "FlinkKafkaConsumer010"? According to the API source code: https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html, "RideCleansingToKafka.CLEANSED_RIDES_TOPIC" is the name of the topic that should be consumed. "TaxiRideSchema" is a serializer which is used at "ilteredRides.addSink" to write data to Kafka as well. "kafkaProps" is the properties used to configure the Kafka consumer client and the ZooKeeper client.
val rides = env.addSource(consumer)
Recommended Posts