Aus der Beispiel-App, die im vorherigen Artikel Migration vom Transport-Client zum Rest-High-Level-Client erstellt wurde, fassen wir diesmal die Masseneinfügungsverarbeitung vom Java-Client zum elastischen Saerch mithilfe der Bulk-API zusammen. ..
macOS Elasticsearch6.5.2 Java8 Spring Boot 2.1.1
Die von mir erstellte Anwendung ist auf GitHub aufgeführt. https://github.com/ohanamisan/Elasticsearch_on_Java
Wenn sich die Version von Elasticsearch unterscheidet, ändern Sie den JAR-Importspeicherort der Gradle-Datei entsprechend. Weitere Informationen finden Sie unter READ ME.
Dieses Mal implementieren wir die Dateneinfügungsverarbeitung mithilfe des Massenprozessors in der Massen-API. Ich denke, dass Sie dies auch dann tun können, wenn Sie sich ohne großen Unterschied auf das Dokument beziehen, aber bitte denken Sie, dass es sich um ein Memo über den Vergleich zwischen Transport Client und Rest High Level Client handelt. Klicken Sie hier für das Originaldokument des Massenprozessors ↓
RestHighLevelClient https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html
TransportClient https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
BulkProcessor processor = BulkProcessor.builder((request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
Es gibt keinen großen Unterschied zwischen ihnen, und TransportClient übergibt den generierten Client wie im ersten Argument der Builder-Methode von BulkProcessor, und RestHighLevelClient übergibt Client-Informationen in Lambda mithilfe der BulkAsync-Methode. Definieren Sie in beiden zweiten Argumenten einen Listener, der die Verarbeitung vor und nach der Ausführung implementiert.
Sie können die Methode add verwenden, ohne die Verarbeitung zum Zeitpunkt des Einfügens zu ändern.
processor.add(new IndexRequest("Zielindex eingeben").source("Eingabedaten"));
processor.add(new IndexRequest("Zielindex eingeben", "Geben Sie den Zieltyp ein").source("Eingabedaten"));
processor.add(new IndexRequest("Zielindex eingeben", "Geben Sie den Zieltyp ein", "Eindeutige ID").source("Eingabedaten"));
Nachdem Sie eine Instanz von Listener wie dokumentiert erstellt haben, können Sie die Definitionen natürlich mit dem Builder für Massenprozessoren trennen. Wenn Sie die Verarbeitung vor und nach der Ausführung fest implementieren und die Verarbeitung danach trennen, sind die Verantwortlichkeiten klar und es wird sich gut anfühlen.
Als Nächstes möchte ich der Beispielanwendung mithilfe der Scroll-API eine Paging-Funktion hinzufügen.
Recommended Posts