Cet article vise à toucher Apache Kafka de manière pratique et à aider autant de personnes que possible à comprendre les avantages de Kafka. J'omettrai la mise en œuvre détaillée et le mécanisme de Kafka, et j'espère que ce sera l'occasion d'élargir l'image, par exemple quel type de traitement est possible en utilisant réellement Kafka et si ce sera une solution aux problèmes existants. ..
Si vous comprenez les bases de Kafka, vous pouvez l'ignorer.
Kafka a été annoncé par LinkedIn en 2011 comme une "file d'attente de messagerie distribuée". Actuellement, la page officielle de Kafka indique "Distributed Streaming Platform", mais fondamentalement, elle devrait être reconnue comme une file d'attente de messagerie.
Il présente les caractéristiques suivantes et est adopté dans divers systèmes à grande échelle en tant que plate-forme de messagerie flexible, évolutive et tolérante aux pannes.
La communauté mature fournit également des API dans divers langages et une multitude de plugins appelés Kafka Connect, offrant un environnement convivial pour les développeurs.
Kafka utilise des termes en fonction de son rôle et a la structure générale suivante. Expéditeur du message: producteur Destinataire du message: consommateur Médiateur de messages: Courtier Chaque message en file d'attente: Sujet Mise en file d'attente des rubriques fragmentées: partition
De plus, la gestion des clusters de Kafka nécessite le démarrage de Zookeeper.Jusqu'à présent, déplaçons nos mains. Cette fois, nous procéderons à des travaux pratiques dans l'environnement suivant.
macOS: 10.14 python: 3.7.4 docker: 2.1.0.5 kafka-docker: https://github.com/wurstmeister/kafka-docker KSQL: https://github.com/confluentinc/ksql
Tout d'abord, clonons localement kafka-docker. Créez un répertoire dans votre environnement local et clonez-le à partir de github.
mkdir ~/kafka && cd ~/kafka
git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker
Puisque kafka-docker fournit docker-compose.yml, je voudrais exécuter docker-compose up -d
tel quel, mais ce fichier a besoin de quelques modifications.
ref) https://github.com/wurstmeister/kafka-docker#advertised-hostname
Vous devez configurer l'adresse IP publiée comme décrit dans.
Remplacez l'adresse IP directement écrite comme KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
par la variable d'environnement DOCKER_HOST_IP
.
sed -i -e 's/KAFKA_ADVERTISED_HOST_NAME:.*/KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}/g' docker-compose.yml
Si vous souhaitez générer une rubrique à l'avance dans Kafka qui est démarrée ensuite, il est pratique de définir la valeur suivante.
ref) https://github.com/wurstmeister/kafka-docker#automatically-create-topics
Insérez ce qui suit dans la ligne suivant le KAFKA_ADVERTISED_HOST_NAME
modifié que vous avez modifié précédemment.
KAFKA_CREATE_TOPICS: "topic1:3:2,topic2:3:2
C'est tout pour la préparation. Maintenant, commençons Kafka.
# .C'est une bonne idée de le définir lors du démarrage d'un shell tel que bashrc.
export DOCKER_HOST_IP=$(ipconfig getifaddr en0)
docker-compose up -d --build
docker-compose ps
#Les numéros de port peuvent varier.
# Name Command State Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:32771->9092/tcp
# kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
Augmentez le nombre de courtiers à trois.
docker-compose scale kafka=3
docker-compose ps
# Name Command State Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:32771->9092/tcp
# kafka-docker_kafka_2 start-kafka.sh Up 0.0.0.0:32772->9092/tcp
# kafka-docker_kafka_3 start-kafka.sh Up 0.0.0.0:32773->9092/tcp
# kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
Maintenant, exploitons Kafka avec la CLI.
#Accès à l'intérieur du conteneur Docker
./start-kafka-shell.sh $DOCKER_HOST_IP
#Les informations sur le courtier sont sorties
bash-4.4# broker-list.sh
# 10.XXX.XXX.XXX:32772,10.XXX.XXX.XXX:32773
# 10.XXX.XXX.XXX:32771
# docker-compose.yml KAFKA_CREATE_Confirmez que le sujet spécifié dans TOPICS est généré
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic1
# topic2
#Créer un sujet
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-from-cli --partitions 3 --replication-factor 2 --bootstrap-server `broker-list.sh`
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic-from-cli
# topic1
# topic2
C'est la fin d'un simple contrôle de fonctionnement Kafka. Dans le référentiel cloné, il existe un fichier sh que vous pouvez essayer Producer et Consumer avec CLI, c'est donc une bonne idée de l'essayer également. Je pense qu'il est rare d'implémenter Producer / Consumer via CLI dans le système réel, alors créons un Producer en utilisant Python3 afin que vous puissiez envoyer un message à Topic via l'application.
Installons la bibliothèque Kafka pour Python3. Veuillez installer chaque module manquant comme il convient.
cd ~/kafka
pip install kafka-python
Ensuite, créez les fichiers suivants. Je n'écris généralement pas Python lui-même. Il s'agit simplement d'un code de niveau de contrôle de fonctionnement.
topic1-producer.py
rom kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random
cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)
date = datetime.now().strftime("%Y/%m/%d")
messageId = datetime.now().strftime("%Y/%m/%d-%H:%M:%S:%f")
user_id = random.choice([1000, 2000, 3000])
word_id = random.randint(1,5)
word_pattern = {1: 'hello', 2: 'world', 3: 'hoge', 4: 'fuga', 5: 'hello world'}
word_count = random.randint(1,3)
word_keys = random.sample(word_pattern.keys(), word_count)
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
for word_type in word_keys:
kafka_msg = {'userId': user_id, 'messageId': messageId, 'message': {'wordId': word_type, 'word': word_pattern[word_type]}}
producer.send('topic1', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)
Utilisez deux onglets de terminal. L'un est pour vérifier les messages dans le sujet et l'autre pour envoyer des messages.
# tab1
#Lancez Kafka CLI
./start-kafka-shell.sh $DOCKER_HOST_IP
#Démarrage consommateur
# --from-Avec l'option de début, il est possible d'afficher les messages qui sont déjà arrivés au sujet.
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic1 --from-beginning --bootstrap-server `broker-list.sh`
---
# tab2
python topic1-producer.py
Lorsque vous exécutez le script Python de tab2, ce sera du côté tab1.
{"userId": 1000, "messageId": "2019/12/21-22:46:03:131468", "message": {"wordId": 2, "word": "world"}}
Vous devriez être en mesure de voir le message circuler comme ça. Si vous exécutez le script comme ci-dessous, vous verrez que le message arrive toutes les 3 secondes.
# bash
while true; do python topic1-producer.py; sleep 3s; done;
# fish
while true; python topic1-producer.py; sleep 3s; end;
Ensuite, effectuons le traitement en continu. Il n'y a rien de spécial à propos du streaming, et tout le message (événement) qui circule sans fin vers Topic est simplement appelé «streaming». KSQL est une API qui vous permet d'interroger des requêtes de type SQL pour ces événements fluides pour le filtrage et l'agrégation. Il est possible de changer les données continues des messages circulant vers Topic en une autre donnée continue (Stream) ou de données agrégées (Table), et d'utiliser ces données comme nouveau sujet pour le traitement par une autre application. Veuillez vous référer au lien ci-dessous pour plus de détails.
ref) https://kafka.apache.org/documentation/streams/ ref) https://www.youtube.com/watch?v=DPGn-j7yD68
Stream et Table sont fondamentalement (24/7) toujours en cours d'exécution, donc si vous reconnaissez qu'ils sont traités de la même manière que Topic, il sera plus facile d'entrer.
Tout d'abord, préparez KSQL développé par confluent.
cd ~/kafka
git clone https://github.com/confluentinc/ksql.git
cd ksql
# kafka-Revenir au répertoire docker
cd ../kafka-docker
#adresse IP en cours d'exécution de kafka+Obtenir le numéro de port
export KSQL_BOOTSTRAP_SERVERS=(docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*' |sort |uniq |tr '\n' ',')
#Déplacer vers le répertoire ksql
cd ../ksql
#démarrer le serveur ksql
docker run -d -p $DOCKER_HOST_IP:8088:8088 \
-e KSQL_BOOTSTRAP_SERVERS=$KSQL_BOOTSTRAP_SERVERS \
-e KSQL_OPTS="-Dksql.service.id=ksql_service_3_ -Dlisteners=http://0.0.0.0:8088/" \
confluentinc/cp-ksql-server:5.3.1
#confirmation du processus docker
docker ps
# confluentinc/cp-ksql-server:5.3.1 conteneur fonctionne
#Démarrez KSQL CLI
docker run -it confluentinc/cp-ksql-cli http://$DOCKER_HOST_IP:8088
Si le démarrage de KSQL CLI réussit, la CLI suivante sera lancée.
Ici, créons un flux et une table pour le traitement en continu à partir des sujets 1 et 2.
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ------------------------------------
# ------------------------------------
ksql> CREATE STREAM topic1_stream1 (userId INT, messageId VARCHAR, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ---------------------------------------
# TOPIC1_STREAM1 | topic1 | JSON
# ---------------------------------------
ksql> CREATE TABLE topic1_table1 (userId INT, wordCount INT, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show tables;
# Table Name | Kafka Topic | Format | Windowed
# -------------------------------------------------
# TOPIC1_TABLE1 | topic1 | JSON | false
# -------------------------------------------------
※important Il existe certaines restrictions lors de la création d'un flux et d'une table. Il m'a fallu beaucoup d'essais et d'erreurs pour apprendre moi-même cette règle. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html
from Topic | from stream | from stream-stream | from table-table | from stream-table | |
---|---|---|---|---|---|
CREATE Stream | o | o | o | x | o |
CREATE Table | o | o | x | o | x |
Comme SQL, utilisez la syntaxe JOIN
pour créer de nouveaux flux et tables à partir de deux ressources. Il convient de noter ici que «JOIN» n'est possible qu'avec la valeur définie dans la KEY de chaque ressource. En d'autres termes, dans l'exemple ci-dessus, vous ne pouvez pas JOIN
avec deux colonnes dans un Stream créé à partir de topic1 et un Stream créé à partir d'une autre rubrique. (Exemple: un événement avec userId = 2000 et wordCount = 5 ne peut pas être un nouveau flux.)
Si vous souhaitez «JOIN» avec plusieurs colonnes, vous pouvez le gérer en préparant une colonne qui les combine dans le message Topic et en le définissant comme «KEY». (Exemple: KEY => $ {userId} - $ {wordCount}
)
De plus, la cible doit être «KEY» pour faire «GROUP BY» dans la requête à Table.
Les requêtes à diffuser sont toujours interrogées pour les messages mis à jour. En d'autres termes, les messages emballés dans Topic avant le moment où la requête est lancée ne seront pas générés à la suite de l'interrogation de Stream. Comme mentionné au début de ce chapitre, les Streams et les Tables sont toujours en cours d'exécution et sont créés à l'avance comme Topic. Lorsque vous touchez KSQL pour la première fois, vous n'en avez peut-être pas conscience et vous vous posez peut-être la question "À quoi sert-il? Quand est-il utilisé?" Dans un système réel, je pense que le traitement Stream est rarement effectué via CLI, mais comme il est pratique, il est possible de vérifier le résultat de la requête même pour les messages déjà dans le sujet comme indiqué ci-dessous avec la signification du débogage. Définissons les valeurs suivantes dans la CLI KSQL.
ksql> SET 'auto.offset.reset'='earliest';
#Obtenez tous les événements dans Stream
ksql> select * from topic1_stream1;
# 1576936834754 | 2019/12/21 | 3000 | 2019/12/21-23:00:34:614230 | {WORD=fuga, WORDID=4}
# 1576936837399 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hello world, WORDID=5}
# 1576936837512 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hoge, WORDID=3}
---
#Combien de messages chaque utilisateur a envoyés en même temps dans Stream
ksql> select userId, count(messageId) from topic1_stream1 group by userId, messageId;
# 1000 | 3
# 3000 | 2
# 3000 | 1
En plus des fonctions d'agrégation fournies par défaut dans KSQL, vous pouvez également utiliser celles définies par le développeur. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#aggregate-functions
De plus, les documents suivants seront très utiles pour l'agrégation. Un très large éventail de requêtes est possible, comme la possibilité de séparer des événements à des moments précis et de les agréger. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/aggregate-streaming-data.html#aggregate-streaming-data-with-ksql
Dans cet état, essayez d'envoyer un message à topic1 en suivant la procédure dans # 2.2.
#3.4 Stream + Stream => Stream => Table Enfin, en tant que version avancée, créons un nouveau Stream à partir de deux Stream et interrogons-le pour créer une Table.
À titre d'exemple, supposons une scène dans laquelle un utilisateur est sélectionné au hasard par loterie et les mots-clés que l'utilisateur a prononcés au cours des 60 dernières minutes sont extraits. (Veuillez me pardonner car je n'ai pas trouvé un bon exemple ;;)
Copions d'abord topic1-producteur.py
et créons topic2-producteur.py
cp topic{1,2}-producer.py
topic2-producer.py
from kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random
cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)
date = datetime.now().strftime("%Y/%m/%d")
user_id = random.choice([1000, 2000, 3000])
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
kafka_msg = {'userId': user_id}
producer.send('topic2', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)
Après avoir créé le fichier comme ci-dessus, créons un Stream à partir de Topic1 et Topic2 avec ʻuserId` comme clé.
ksql> CREATE STREAM topic2_stream1 (userId INTEGER) WITH (KAFKA_TOPIC = 'topic2', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ---------------------------------------
# TOPIC2_STREAM1 | topic2 | JSON
# TOPIC1_STREAM1 | topic1 | JSON
# ---------------------------------------
Ensuite, créez un nouveau Stream à partir de l 'ʻuserId` correspondant des deux Streams. Puisque le déclencheur est qu'un nouveau message (événement) arrive à Topic2, Topic2 devient le Stream sur le côté GAUCHE. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html#semantics-of-stream-stream-joins
# Topic2 Stream + Topic1 Stream => New Stream
ksql> CREATE STREAM topic1_topic2_stream1 AS SELECT t2s1.userId as userId, t1s1.messageId, t1s1.message FROM topic2_stream1 t2s1 INNER JOIN topic1_stream1 t1s1 WITHIN 1 HOURS ON t2s1.userId = t1s1.userId;
# 3.SET à 4'auto.offset.reset'='earliest';Si vous avez fait cela, utilisez la commande ci-dessous pour restaurer la valeur par défaut afin que seules les modifications soient les résultats de la requête.
ksql> SET 'auto.offset.reset'='latest';
ksql> select * from topic1_topic2_stream1;
Dans cet état, essayez d'exécuter topic2-Producer.py
à partir d'un autre onglet.
Une fois exécuté, le message (événement) arrivé à topic1_stream1
au cours de la dernière heure sera affiché comme indiqué ci-dessous.
Enfin, créons une table à partir de la requête pour Stream de topic1_topic2_stream1
.
#Créer une table de la requête au flux
ksql> CREATE TABLE topic1_topic2_table1 AS SELECT userId, COLLECT_SET(message->word) as word FROM topic1_topic2_stream1 GROUP BY userId;
#Si vous exécutez la requête suivante lors de l'envoi d'un message à Topic2, vous pouvez voir comment un nouveau message (événement) est créé.
ksql> select * from topic1_topic2_table1;
# 1576940945888 | 2019/12/22 | 1000 | [hello, hello world, fuga, hoge, world]
# 1576941043356 | 2019/12/22 | 3000 | [hello, hello world, fuga]
C'est la fin du contenu pratique.
Recommended Posts