[JAVA] Étudiez Flilnk avec le code d'exercice Kafka

Environment OS: Ubuntu 16.04 Kafka: kafka_2.10-0.10.2.0 Elasticsearch: elasticsearch-2.4.3 Kibana: kibana-4.6.4-linux-x86_64 Flink: flink-1.3.1-bin-hadoop27-scala_2.10 Java: openjdk version "1.8.0_131" Build Tool: Apache Maven 3.5.0 IDE: IntelliJ

Add dependencies to pom.xml to use Kafka

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
  <version>1.2.0</version>
</dependency>

Write data to Kafka To write data to Kafka, a program needs to write DataStream into "Kafka topic". Instruction: http://training.data-artisans.com/exercises/toFromKafka.html The souce code is here.

  1. Pass taxi ride data as input and filter out data outside of NYC
    val rides = env.addSource(new TaxiRideSource(input, maxDelay, speed))
    val filteredRides = rides.filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
  1. To write DataStream "TaxiRide" to a Kafka topic, use Flink’s Kafka Connector which provides the FlinkKafkaProducer010 class. Here, LOCAL_KAFKA_BROKER is "host_ip:port" of Kafka broker. CLENSED_RIDES_TOPIC is the name of the topic write Datastream to. When running a program, the results are appended to the Kafka topic. Also, Kafka topics are designed as durable logs, as a result, when restarting the program, Kafka topic is not overwritten but all records are appended.
    filteredRides.addSink(
      new FlinkKafkaProducer010[TaxiRide](
        LOCAL_KAFKA_BROKER,
        CLEANSED_RIDES_TOPIC,
        new TaxiRideSchema))

Memo: What is a serializer? According to the instruction, "TaxiRideSchema" in the above code is a "Serializer". What is a Serializer?

https://stackoverflow.com/questions/633402/what-is-serialization Serialization is the process of turning an object in memory into a stream of bytes so you can do stuff like store it on disk or send it over the network. Deserialization is the reverse process: turning a stream of bytes into an object in memory.

Read data from Kafka After the Kafka topic was filled with Datastream, the program needs to read input from the topic by using a KafkaConsumer data source. Instruction: http://training.data-artisans.com/exercises/toFromKafka.html The souce code is here.

  1. Configure and create a Kafka consumer
    // configure Kafka consumer
    val kafkaProps = new Properties
    kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
    kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP)
    // always read the Kafka topic from the start
    kafkaProps.setProperty("auto.offset.reset", "earliest")

    // create a Kafka consumer
    val consumer = new FlinkKafkaConsumer010[TaxiRide](
        RideCleansingToKafka.CLEANSED_RIDES_TOPIC,
        new TaxiRideSchema,
        kafkaProps)    

Memo: What is "FlinkKafkaConsumer010"? According to the API source code: https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html, "RideCleansingToKafka.CLEANSED_RIDES_TOPIC" is the name of the topic that should be consumed. "TaxiRideSchema" is a serializer which is used at "ilteredRides.addSink" to write data to Kafka as well. "kafkaProps" is the properties used to configure the Kafka consumer client and the ZooKeeper client.

  1. Pass the new conumser as a source data which is used to identify popular places.
val rides = env.addSource(consumer)

Recommended Posts

Étudiez Flilnk avec le code d'exercice Kafka
Étudiez Flilnk avec le code d'exercice Elasticsearch
Lombok avec VS Code
Vérifier la conformité avec les exercices orientés objet
Gestion Docker avec VS Code
Formater Ruby avec VS Code
Bonjour tout le monde avec VS Code!
Accédez à Apache Kafka avec Micronaut
Mémo d'étude Java 2 avec Progate
Réduisez le code redondant avec Lombok
Étudier Java avec Progate Note 1