This article aims to give you hands-on experience with Apache Kafka and help as many people as possible understand the benefits of Kafka. I will omit the detailed implementation and mechanism of Kafka, and I hope it will be an opportunity to expand the image such as what kind of processing is possible by actually using Kafka and whether it will be a solution to existing problems. ..
If you understand the basics of Kafka, you can skip it.
Kafka was announced by LinkedIn in 2011 as a "Distributed Messaging Queue". Currently, Kafka's official page says "Distributed Streaming Platform", but basically it should be recognized as a messaging queue.
It has the following features and is used in various large-scale systems as a flexible, scalable, and fault-tolerant messaging platform.
-** Pub / Sub model ** => Multiple apps can receive the same message (flexible and scalable) -** Cluster configuration with multi-broker ** => Increase servers by message volume to achieve high throughput -** Persistence of message data by saving to disk ** => Message can be reprocessed by reloading the same message
In addition, the mature community provides APIs in various languages and a wealth of plugins called Kafka Connect, providing a developer-friendly environment.
Kafka uses terms according to its role, and is roughly composed as follows. Message sender: Producer Message receiver: Consumer Message broker: Broker Each message queuing: Topic Sharded Topic Queue Queuing: Partition
In addition, Zookeeper must be started for Kafka cluster management.So far, let's actually move our hands. This time, we will proceed with hands-on in the following environment.
macOS: 10.14 python: 3.7.4 docker: 2.1.0.5 kafka-docker: https://github.com/wurstmeister/kafka-docker KSQL: https://github.com/confluentinc/ksql
First, let's clone kafka-docker locally. Create a directory in your local environment and clone it from github.
mkdir ~/kafka && cd ~/kafka
git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker
Since docker-compose.yml is provided by kafka-docker, I would like to execute docker-compose up -d
as it is, but this file needs some modification.
ref) https://github.com/wurstmeister/kafka-docker#advertised-hostname
You need to configure the advertised ip as described in.
Change the IP address directly written as KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
to the environment variable DOCKER_HOST_IP
.
sed -i -e 's/KAFKA_ADVERTISED_HOST_NAME:.*/KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}/g' docker-compose.yml
If you want to generate Topic in advance in Kafka that is started next, it is convenient to set the following value.
ref) https://github.com/wurstmeister/kafka-docker#automatically-create-topics
Insert the following in the line following the modified KAFKA_ADVERTISED_HOST_NAME
that you modified earlier.
KAFKA_CREATE_TOPICS: "topic1:3:2,topic2:3:2
That's all for preparation. Now let's start Kafka.
# .It is good to set it when starting shell such as bashrc
export DOCKER_HOST_IP=$(ipconfig getifaddr en0)
docker-compose up -d --build
docker-compose ps
#The port numbers can be different.
# Name Command State Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:32771->9092/tcp
# kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
Increase the number of brokers to three.
docker-compose scale kafka=3
docker-compose ps
# Name Command State Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:32771->9092/tcp
# kafka-docker_kafka_2 start-kafka.sh Up 0.0.0.0:32772->9092/tcp
# kafka-docker_kafka_3 start-kafka.sh Up 0.0.0.0:32773->9092/tcp
# kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
Now, let's actually operate Kafka with the CLI.
#Access inside docker container
./start-kafka-shell.sh $DOCKER_HOST_IP
#Broker information is output
bash-4.4# broker-list.sh
# 10.XXX.XXX.XXX:32772,10.XXX.XXX.XXX:32773
# 10.XXX.XXX.XXX:32771
# docker-compose.yml KAFKA_CREATE_Confirm that the Topic specified in TOPICS is generated
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic1
# topic2
#Creating Topic
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-from-cli --partitions 3 --replication-factor 2 --bootstrap-server `broker-list.sh`
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic-from-cli
# topic1
# topic2
This is the end of a simple Kafka operation check. The cloned repository has an sh file that you can try Producer and Consumer with CLI, so you should try that as well. I think that it is rare to implement Producer / Consumer via CLI in the actual system, so let's create a Producer using Python3 so that you can send a message to Topic via the application.
Let's install the Kafka library for Python3. Please install each missing module as appropriate.
cd ~/kafka
pip install kafka-python
Next, create the following files. I don't usually write Python itself. It is just an operation check level code.
topic1-producer.py
rom kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random
cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)
date = datetime.now().strftime("%Y/%m/%d")
messageId = datetime.now().strftime("%Y/%m/%d-%H:%M:%S:%f")
user_id = random.choice([1000, 2000, 3000])
word_id = random.randint(1,5)
word_pattern = {1: 'hello', 2: 'world', 3: 'hoge', 4: 'fuga', 5: 'hello world'}
word_count = random.randint(1,3)
word_keys = random.sample(word_pattern.keys(), word_count)
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
for word_type in word_keys:
kafka_msg = {'userId': user_id, 'messageId': messageId, 'message': {'wordId': word_type, 'word': word_pattern[word_type]}}
producer.send('topic1', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)
Use two terminal tabs. One is for checking messages in the topic and the other is for sending messages.
# tab1
#Launch Kafka CLI
./start-kafka-shell.sh $DOCKER_HOST_IP
#Consumer startup
# --from-With the beginning option, it is possible to display messages that have already arrived at Topic.
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic1 --from-beginning --bootstrap-server `broker-list.sh`
---
# tab2
python topic1-producer.py
When you execute the Python script of tab2, it will be on the tab1 side.
{"userId": 1000, "messageId": "2019/12/21-22:46:03:131468", "message": {"wordId": 2, "word": "world"}}
You should be able to see the message flowing like this. If you run the script as below, you will see that the message arrives every 3 seconds.
# bash
while true; do python topic1-producer.py; sleep 3s; done;
# fish
while true; python topic1-producer.py; sleep 3s; end;
Next, let's perform streaming processing. There is nothing special about streaming, and the entire message (event) that flows endlessly to Topic is simply called "streaming". KSQL is an API that allows you to query and filter and aggregate those events in a SQL-like manner. It is possible to change the continuous data of the messages flowing to Topic into another continuous data (Stream) or aggregated data (Table), and use that data as a new topic for processing by another application. Please refer to the link below for details.
ref) https://kafka.apache.org/documentation/streams/ ref) https://www.youtube.com/watch?v=DPGn-j7yD68
Stream and Table are basically (24/7) always running, so if you recognize that they are treated the same as Topic, it will be easier to enter.
First, prepare KSQL developed by confluent.
cd ~/kafka
git clone https://github.com/confluentinc/ksql.git
cd ksql
# kafka-Return to docker directory
cd ../kafka-docker
#running IP address of kafka+Get Port number
export KSQL_BOOTSTRAP_SERVERS=(docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*' |sort |uniq |tr '\n' ',')
#Move to ksql directory
cd ../ksql
#start ksql server
docker run -d -p $DOCKER_HOST_IP:8088:8088 \
-e KSQL_BOOTSTRAP_SERVERS=$KSQL_BOOTSTRAP_SERVERS \
-e KSQL_OPTS="-Dksql.service.id=ksql_service_3_ -Dlisteners=http://0.0.0.0:8088/" \
confluentinc/cp-ksql-server:5.3.1
#docker process confirmation
docker ps
# confluentinc/cp-ksql-server:5.3.1 container is running
#Start KSQL CLI
docker run -it confluentinc/cp-ksql-cli http://$DOCKER_HOST_IP:8088
If the CLI startup of KSQL is successful, the CLI shown below will be launched.
Here, let's create a stream and table for streaming processing from topics 1 and 2.
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ------------------------------------
# ------------------------------------
ksql> CREATE STREAM topic1_stream1 (userId INT, messageId VARCHAR, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ---------------------------------------
# TOPIC1_STREAM1 | topic1 | JSON
# ---------------------------------------
ksql> CREATE TABLE topic1_table1 (userId INT, wordCount INT, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show tables;
# Table Name | Kafka Topic | Format | Windowed
# -------------------------------------------------
# TOPIC1_TABLE1 | topic1 | JSON | false
# -------------------------------------------------
※important There are some restrictions when creating Stream and Table. It took me a lot of trial and error to learn this rule myself. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html
from Topic | from stream | from stream-stream | from table-table | from stream-table | |
---|---|---|---|---|---|
CREATE Stream | o | o | o | x | o |
CREATE Table | o | o | x | o | x |
Like SQL, use the JOIN
syntax to create new Streams and Tables from two resources. It should be noted here that JOIN
is possible only with the value set in the KEY of each resource. In other words, in the above example, you cannot JOIN
with two columns in a Stream created from topic1 and a Stream created from another topic. (Example: An event with userId = 2000 and wordCount = 5 cannot be a new Stream.)
If you want to JOIN
with multiple columns, you can handle it by preparing a column that combines them in the Topic message and setting it as KEY
. (Example: KEY => $ {userId}-$ {wordCount}
)
Also, the target must be KEY
in order to do GROUP BY
in the query to Table.
Queries to Stream are always queried for updated messages. In other words, messages packed into Topic before the time the query is thrown will not be output as the result of querying Stream. As mentioned at the beginning of this chapter, Streams and Tables are always running and are created in advance like Topic. When you first touch KSQL, you may not be aware of it, so you may be left with the question, "What do you use it for? When do you use it?" In an actual system, Stream processing is rarely performed via CLI, but since it is hands-on, the query result can be confirmed even for messages already in Topic as shown below with the meaning of debugging. Let's set the following values in the KSQL CLI.
ksql> SET 'auto.offset.reset'='earliest';
#Get all events in Stream
ksql> select * from topic1_stream1;
# 1576936834754 | 2019/12/21 | 3000 | 2019/12/21-23:00:34:614230 | {WORD=fuga, WORDID=4}
# 1576936837399 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hello world, WORDID=5}
# 1576936837512 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hoge, WORDID=3}
---
#How many messages each user sent at the same time in Stream
ksql> select userId, count(messageId) from topic1_stream1 group by userId, messageId;
# 1000 | 3
# 3000 | 2
# 3000 | 1
Aggregate functions can be defined by the developer in addition to the ones provided by default in KSQL. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#aggregate-functions
In addition, the following documents will be very helpful for aggregation. A very wide range of queries is possible, such as being able to separate events at specific times and aggregate them. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/aggregate-streaming-data.html#aggregate-streaming-data-with-ksql
In this state, try sending a message to topic1 by following the procedure in # 2.2.
#3.4 Stream + Stream => Stream => Table Finally, as an advanced version, let's create a new Stream from two Streams and query it to create a Table.
As an example, let's assume a scene where a user is randomly selected by lottery and the keywords that the user has spoken in the past 60 minutes are extracted. (Please forgive me because I didn't come up with a good example ;;)
First, let's copy topic1-producer.py
and create topic2-producer.py
.
cp topic{1,2}-producer.py
topic2-producer.py
from kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random
cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)
date = datetime.now().strftime("%Y/%m/%d")
user_id = random.choice([1000, 2000, 3000])
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
kafka_msg = {'userId': user_id}
producer.send('topic2', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)
After creating the file as above, let's create a Stream from Topic1 and Topic2 with ʻuserId` as the KEY.
ksql> CREATE STREAM topic2_stream1 (userId INTEGER) WITH (KAFKA_TOPIC = 'topic2', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ---------------------------------------
# TOPIC2_STREAM1 | topic2 | JSON
# TOPIC1_STREAM1 | topic1 | JSON
# ---------------------------------------
Then create a new Stream from the matching ʻuserId` from the two Streams. Since the trigger is that a new message (event) arrives at Topic2, Topic2 becomes the Stream on the LEFT side. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html#semantics-of-stream-stream-joins
# Topic2 Stream + Topic1 Stream => New Stream
ksql> CREATE STREAM topic1_topic2_stream1 AS SELECT t2s1.userId as userId, t1s1.messageId, t1s1.message FROM topic2_stream1 t2s1 INNER JOIN topic1_stream1 t1s1 WITHIN 1 HOURS ON t2s1.userId = t1s1.userId;
# 3.SET at 4'auto.offset.reset'='earliest';If you have done this, use the command below to restore the default so that only the changes are the query results.
ksql> SET 'auto.offset.reset'='latest';
ksql> select * from topic1_topic2_stream1;
In this state, try running topic2-producer.py
from another tab.
When executed, the message (event) that arrived at topic1_stream1
in the past hour will be displayed as shown below.
Finally, let's create a Table from the query for Stream of topic1_topic2_stream1
.
#Create Table from query to Stream
ksql> CREATE TABLE topic1_topic2_table1 AS SELECT userId, COLLECT_SET(message->word) as word FROM topic1_topic2_stream1 GROUP BY userId;
#If you execute the following query while sending a message to Topic2, you can see how a new message (event) is created.
ksql> select * from topic1_topic2_table1;
# 1576940945888 | 2019/12/22 | 1000 | [hello, hello world, fuga, hoge, world]
# 1576941043356 | 2019/12/22 | 3000 | [hello, hello world, fuga]
This is the end of the hands-on content.
Recommended Posts