Apache kafka ist ein verteiltes Open Source-Nachrichtensystem, das eine große Anzahl von Nachrichten mit hoher Geschwindigkeit veröffentlichen / abonnieren kann. Hier erstellen wir eine Anwendung, die mithilfe von Apache Camel, einem integrierten Framework, Nachrichten mit kafka austauscht. Verwenden Sie die Camel-Kafka-Komponente, um mit Apache Camel eine Verbindung zu Kafka herzustellen.
In dieser Umgebung verwendet kafka v2.0.0. Weitere Informationen zum Erstellen der Umgebung finden Sie im folgenden Artikel.
Ich habe bereits im nächsten Artikel eine ähnliche Anwendung in XML DSL geschrieben, aber dieses Mal werde ich eine Anwendung mit Java DSL erstellen. Außerdem möchte ich ausführlicher als beim letzten Mal schreiben.
Die diesmal erstellte Anwendung veröffentlicht die Datums- und Uhrzeitnachricht jede Sekunde an kafka, wie in der folgenden Abbildung gezeigt, und abonniert dieselbe Nachricht von kafka.
Mit Apache Camel kann eine solche Anwendung mit Kafka mit nur wenigen Dutzend Codezeilen implementiert werden. Es ist kein hübscher Code, weil ich den gesamten Code in die Hauptfunktion eingefügt habe, aber ich hoffe, Sie können sehen, dass er von Camel einfach implementiert werden kann.
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
KafkaComponent kafka = new KafkaComponent();
kafka.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); // kafka1, kafka2,Registrieren Sie 3 Server von kafka3 als Broker
context.addComponent("kafka", kafka);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:trigger?period=1000") //Führen Sie alle 1000 Millisekunden aus
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Stellen Sie das aktuelle Datum und die aktuelle Uhrzeit für den KÖRPER der Nachricht ein
.to("kafka:test01"); //Nachricht zum Thema test01 veröffentlichen
from("kafka:test01") //Nachricht vom Thema test01 abonnieren
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
Jetzt erstellen wir eine Kafka-Anwendung.
Fügen Sie für Maven die folgende Bibliothek zu pom.xml hinzu. camel-kafka ist eine Komponente für den Umgang mit Kafka in Camel, und $ {camel.version} gibt die Version des verwendeten Kamels an. kafka-clients ist eine Clientbibliothek für Kafka.
pom.xml
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
Die URI für die Verwendung der Kamel-Kafka-Komponente sieht folgendermaßen aus:
kafka:topic[?options]
Geben Sie den Namen des Zielthemas im Kontextpfad (Thema) an.
Erstellen Sie eine Route, um die Nachricht in kafka zu veröffentlichen. Die in Java DSL geschriebene Route lautet wie folgt.
Das Thema wird automatisch erstellt. Wenn Sie jedoch die Themeneinstellungen ändern möchten, erstellen Sie es manuell. Der Code ist zu einfach zu erklären.
from("timer:trigger?period=1000") //Führen Sie alle 1000 Millisekunden aus
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Stellen Sie das aktuelle Datum und die aktuelle Uhrzeit für den KÖRPER der Nachricht ein
.to("kafka:test01"); //Nachricht zum Thema test01 veröffentlichen
Erstellen Sie als Nächstes eine Route, die die Nachricht an kafka abonniert. Die in Java DSL geschriebene Route lautet wie folgt.
from("kafka:test01") //Nachricht vom Thema test01 abonnieren
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an
Da es allein nicht möglich ist, eine Verbindung zu kafka herzustellen, schreiben Sie die Einstellungen für die Verbindung. Erstellen Sie zunächst eine Instanz der Kafka-Konfiguration, die die Kafka-Einstellungen registriert, und konfigurieren Sie die Einstellungen für die Verbindung mit Kafka. Es gibt verschiedene Einstellungselemente, die ich später kurz erläutern möchte. Im folgenden Beispiel werden einige Einstellungen vorgenommen. Wenn Sie jedoch nur eine Verbindung herstellen möchten, legen Sie einfach den Broker fest, zu dem eine Verbindung mit der setBrokers-Methode hergestellt werden soll. Ansonsten wird der Standardwert verwendet und Sie müssen ihn nicht festlegen.
KafkaConfiguration kafkaConfig = new KafkaConfiguration();
kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Verbinden Sie sich mit 3 Kafka-Brokern
kafkaConfig.setGroupId("group1"); //Legen Sie die Verbrauchergruppen-ID fest
kafkaConfig.setAutoCommitEnable(true); //Aktivieren Sie die automatische Festschreibung
kafkaConfig.setAutoCommitIntervalMs(5000); //Legen Sie das Intervall für das automatische Festschreiben fest
kafkaConfig.setAutoOffsetReset("earliest"); //Verhalten beim Abonnieren einer Partition ohne festgeschriebenen Offset
kafkaConfig.setRequestRequiredAcks("all"); //Die Bedingungen für die Beurteilung dieser Veröffentlichung waren erfolgreich
kafkaConfig.setConsumersCount(1); //Anzahl der Verbraucher
Erstellen Sie als Nächstes eine Instanz von KafkaComponent und legen Sie die zuvor erstellte Instanz von KafkaConfiguration fest.
KafkaComponent kafka = new KafkaComponent();
kafka.setConfiguration(kafkaConfig);
Dies ist die einzige Einstellung für die Verbindung zu Kafka.
Damit sind die Publisher- und Consumer-Routen sowie die Kafka-Verbindungseinstellungen abgeschlossen. Die gesamte Quelle, die die Hauptfunktion usw. erstellt hat, die diese verwendet, ist wie folgt.
package example.camelbegginer.kafka;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.impl.DefaultCamelContext;
public class KafkaExample {
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
context.addComponent("kafka", createKafkaComponent());
context.addRoutes(createProducerRouteBuilder());
context.addRoutes(createConsumerRouteBuilder());
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
static KafkaComponent createKafkaComponent() {
KafkaConfiguration kafkaConfig = new KafkaConfiguration();
kafkaConfig.setBrokers("kafka1:9092,kafka2:9092,kafka3:9092"); //Verbinden Sie sich mit 3 Kafka-Brokern
kafkaConfig.setGroupId("group1"); //Gruppen-ID einstellen
kafkaConfig.setAutoCommitEnable(true); //Aktivieren Sie die automatische Festschreibung
kafkaConfig.setAutoCommitIntervalMs(5000); //Legen Sie das Intervall für das automatische Festschreiben fest
kafkaConfig.setAutoOffsetReset("earliest"); //Verhalten beim Lesen einer Partition ohne festgeschriebenen Offset
kafkaConfig.setRequestRequiredAcks("all"); //Die Bedingungen für die Beurteilung dieser Veröffentlichung waren erfolgreich
kafkaConfig.setConsumersCount(1); //Anzahl der Verbraucher
KafkaComponent kafka = new KafkaComponent();
kafka.setConfiguration(kafkaConfig);
return kafka;
}
static RouteBuilder createProducerRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
from("timer:trigger?period=1000") //Führen Sie alle 1000 Millisekunden aus
.routeId("kafka_producer_route")
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) //Stellen Sie das aktuelle Datum und die aktuelle Uhrzeit für den KÖRPER der Nachricht ein
.to("kafka:test01"); //Nachricht zum Thema test01 veröffentlichen
}
};
}
static RouteBuilder createConsumerRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
from("kafka:test01") //Nachricht vom Thema test01 abonnieren
.routeId("kafka_consumer_route")
.log("body = ${body}"); //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an
}
};
}
}
Wenn diese Anwendung ausgeführt wird, wird das Protokoll wie unten gezeigt jede Sekunde ausgegeben.
[2019-01-31 21:15:38.112], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:38
[2019-01-31 21:15:39.031], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:39
[2019-01-31 21:15:40.034], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:40
[2019-01-31 21:15:41.026], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:41
[2019-01-31 21:15:42.029], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:42
[2019-01-31 21:15:43.024], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:43
[2019-01-31 21:15:44.044], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:44
[2019-01-31 21:15:45.028], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:45
[2019-01-31 21:15:46.032], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, body = 2019-01-31 21:15:46
Wenn Sie den Header mit dem folgenden Code ausgeben, werden der Nachrichtenoffset und die Partitionsinformationen in das Protokoll ausgegeben.
from("kafka:test01") //Nachricht vom Thema test01 abonnieren
.routeId("kafka_consumer_route")
.log("body = ${body}") //Zeigen Sie den KÖRPER-Inhalt der Nachricht im Protokoll an
.log("${headers}"); //★ Hier hinzufügen
Das Ausgabeprotokoll lautet wie folgt.
2019-02-01 10:10:46.236], [INFO ], kafka_consumer_route, Camel (camel-1) thread #1 - KafkaConsumer[test01], kafka_consumer_route, {breadcrumbId=[B@58a2fc46, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = breadcrumbId, value = [73, 68, 45, 109, 107, 121, 45, 80, 67, 45, 49, 53, 52, 56, 57, 56, 51, 52, 52, 51, 56, 55, 51, 45, 48, 45, 49])], isReadOnly = false), kafka.OFFSET=52, kafka.PARTITION=0, kafka.TIMESTAMP=1548983446195, kafka.TOPIC=test01}
Auf diese Weise können Sie die Kafka-Komponente des Apache Camel-Frameworks verwenden, um eine Anwendung zu erstellen, die einfache Nachrichten sendet und empfängt. Wussten Sie, dass die Verwendung von Camel die Verbindung mit kafka erleichtert?
Abschließend werde ich die Haupteigenschaften der Kamel-Kafka-Komponente erläutern. Es gibt viele andere Eigenschaften als die folgende Tabelle. Weitere Informationen finden Sie auf der offiziellen Seite.
Es gibt viele Eigenschaften, aber einige sind die gleichen wie die vom Kafka-Client verwendeten, sodass Sie sie ohne Beschwerden verwenden können.
Name des Anwesens | producer/consumer | Erläuterung | Standardwert | Schimmel |
---|---|---|---|---|
brokers | Verbreitet | Geben Sie den Kafka-Broker an. Das Format ist host1:Wenn Sie mehrere Broker auf Port1, Host1 angeben:port1,host2:Getrennt von Port2 und Komma. | String | |
clientId | Verbreitet | Die Client-ID ist eine benutzerdefinierte Zeichenfolge zum Verfolgen von Anrufen aus der Clientanwendung. | String | |
autoCommitEnable | consumer | Wenn true festgelegt ist, schreibt der Verbraucher den Offset der abgerufenen Nachricht regelmäßig automatisch fest. Das Autocommit-Intervall wird mit der Option autoCommitIntervalMs angegeben. | TRUE | Boolean |
autoCommitIntervalMs | consumer | Gibt das Intervall (Millisekunden) an, in dem der Verbraucher den Nachrichtenoffset automatisch festschreibt. | 5000 | Integer |
autoCommitOnStop | consumer | Gibt an, ob die zuletzt abonnierte Nachricht vom Broker explizit automatisch festgeschrieben werden soll, wenn der Verbraucher ausfällt. Dazu muss die Option autoCommitEnable aktiviert sein. Mögliche Werte sind sync, async oder none. | sync | String |
autoOffsetReset | consumer | Wenn es keinen anfänglichen Versatz gibt oder wenn der Versatz außerhalb des Bereichs liegt, gehen Sie wie folgt vor: frühestens: Setzt den Versatz auf den allerersten Versatz zurück. Neueste: Setzt den Versatz automatisch auf den letzten (letzten) Versatz zurück. fail: Löst eine Ausnahme für den Verbraucher aus. | latest | String |
consumerRequestTimeoutMs | consumer | Die Konfiguration steuert die maximale Zeit, die ein Client auf die Beantwortung einer Anfrage wartet. Wenn vor Ablauf des Zeitlimits keine Antwort eingeht, sendet der Client die Anforderung nach Bedarf erneut oder schlägt die Anforderung fehl, wenn die Wiederholungsversuche erschöpft sind. | 40000 | Integer |
consumersCount | consumer | Gibt die Anzahl der Stemmers an, die eine Verbindung zum Kafka-Server herstellen. | 1 | int |
consumerStreams | consumer | Anzahl gleichzeitiger Verbraucher. | 10 | int |
groupId | consumer | Gibt eine Zeichenfolge an, die die Gruppe angibt, zu der dieser Verbraucher gehört. Das Festlegen derselben Gruppen-ID zeigt an, dass sich mehrere Verbraucher in derselben Verbrauchergruppe befinden. Dies ist eine erforderliche Option für Verbraucher. | String | |
maxPollRecords | consumer | poll()Gibt die maximale Anzahl von Datensätzen an, die in einem einzelnen Aufruf an zurückgegeben werden. | 500 | Integer |
pollTimeoutMs | consumer | Der Timeout-Wert (Millisekunden), der beim Abrufen des KafkaConsumer verwendet wird. | 5000 | Long |
compressionCodec | producer | Mit diesem Parameter können Sie den Komprimierungscodec für die vom Hersteller generierten Daten angeben. Mögliche Werte sind none, gzip und snappy. Standardmäßig ist es nicht komprimiert (keine). | none | |
requestRequiredAcks | producer | Legen Sie die Bedingungen fest, unter denen berücksichtigt werden kann, dass die Anforderung des Herstellers abgeschlossen wurde. Sie geben den Zuverlässigkeitsgrad der Nachricht an. Es sind die folgenden drei Werte anzugeben. acks=Wenn es 0 ist, geht die Nachricht verloren, wenn der Broker des Anführers ausfällt, also normalerweise acks=Geben Sie 1 oder alle an. ・ Bestätigt=0producer betrachtet die Übertragung als abgeschlossen, ohne auf eine Bestätigung vom Server zu warten. In diesem Fall können wir nicht garantieren, dass kafka die Nachricht erhalten hat. Der für jeden Datensatz zurückgegebene Offset ist immer-Auf 1 setzen. ・ Bestätigt=1 Der Leser schreibt die Nachricht lokal, antwortet jedoch, ohne auf die Fertigstellung durch andere Follower zu warten. In diesem Fall können andere Follower die Nachricht nicht replizieren, wenn der Leser unmittelbar nach der Genehmigung der Nachricht ausfällt, und die Nachricht geht verloren. ・ Bestätigt=all Wartet, bis alle anderen synchronen Replikat-Schreibvorgänge abgeschlossen sind, bevor der Leser eine Antwort zurückgibt. Dadurch wird sichergestellt, dass Nachrichten nicht verloren gehen, solange mindestens ein synchrones Replikat aktiv ist. | 1 | String |
requestTimeoutMs | producer | Der Broker fordert an, bevor er einen Fehler an den Client (Produzenten) zurücksendet..required.Wartezeit, bevor die Anforderungen erfüllt werden. | 305000 | Integer |
retries | producer | Bei einem Wert größer als 0 sendet der Client erneut Datensätze (wiederholt sie), die aufgrund eines vorübergehenden Fehlers nicht gesendet werden konnten. | 0 | Integer |
Recommended Posts