À partir de l'exemple d'application créé dans l'article précédent Migrating from Transport Client to Rest High Level Client, nous résumons cette fois le traitement d'insertion en masse du client Java vers Elastic saerch à l'aide de l'API Bulk. ..
macOS Elasticsearch6.5.2 Java8 Spring Boot 2.1.1
L'application que j'ai créée est répertoriée sur GitHub. https://github.com/ohanamisan/Elasticsearch_on_Java
Si la version d'Elasticsearch est différente, modifiez l'emplacement d'importation du fichier JAR du fichier gradle selon vos besoins. Pour plus de détails, veuillez consulter LISEZ-MOI
Cette fois, nous mettrons en œuvre le traitement des insertions de données à l'aide du processeur en bloc dans l'API en bloc. Je pense que vous pouvez le faire même si vous vous référez au document sans grande différence, mais pensez qu'il s'agit d'un mémo sur la comparaison entre Transport Client et Rest High Level Client. Cliquez ici pour le document original du processeur en bloc ↓
RestHighLevelClient https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html
TransportClient https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
BulkProcessor processor = BulkProcessor.builder((request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println(
"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
}
}).setBulkActions(20)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(0)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
Il n'y a pas de grande différence entre eux et TransportClient transmet le client généré tel qu'il est dans le premier argument de la méthode de générateur BulkProcessor, et RestHighLevelClient transmet les informations client à l'aide de la méthode bulkAsync dans lambda. Dans les deux deuxièmes arguments, définissez un écouteur qui implémente le traitement avant et après l'exécution.
Vous pouvez utiliser la méthode add sans modifier le traitement au moment de l'insertion.
processor.add(new IndexRequest("Index de destination d'entrée").source("Des données d'entrée"));
processor.add(new IndexRequest("Index de destination d'entrée", "Type de destination d'entrée").source("Des données d'entrée"));
processor.add(new IndexRequest("Index de destination d'entrée", "Type de destination d'entrée", "Identifiant unique").source("Des données d'entrée"));
Bien sûr, après avoir créé une instance de Listener comme documenté, vous pouvez séparer les définitions avec le générateur Bulk Processor. Lors de la mise en œuvre ferme du traitement de pré-exécution et de post-exécution, si vous séparez le traitement autour de cela, les responsabilités seront claires et vous vous sentirez bien.
Ensuite, je voudrais ajouter une fonction de pagination à l'exemple d'application à l'aide de l'API Scroll.
Recommended Posts