[JAVA] Utiliser l'API Bulk avec RestHighLevelClient

Article précédent et contenu de l'article

À partir de l'exemple d'application créé dans l'article précédent Migrating from Transport Client to Rest High Level Client, nous résumons cette fois le traitement d'insertion en masse du client Java vers Elastic saerch à l'aide de l'API Bulk. ..

environnement

macOS Elasticsearch6.5.2 Java8 Spring Boot 2.1.1

Exemple d'application

L'application que j'ai créée est répertoriée sur GitHub. https://github.com/ohanamisan/Elasticsearch_on_Java

Si la version d'Elasticsearch est différente, modifiez l'emplacement d'importation du fichier JAR du fichier gradle selon vos besoins. Pour plus de détails, veuillez consulter LISEZ-MOI

la mise en oeuvre

Cette fois, nous mettrons en œuvre le traitement des insertions de données à l'aide du processeur en bloc dans l'API en bloc. Je pense que vous pouvez le faire même si vous vous référez au document sans grande différence, mais pensez qu'il s'agit d'un mémo sur la comparaison entre Transport Client et Rest High Level Client. Cliquez ici pour le document original du processeur en bloc ↓

RestHighLevelClient https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-bulk.html

TransportClient https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html

Génération d'un processeur en bloc avec le client de transport


BulkProcessor processor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {

			@Override
			public void beforeBulk(long l, BulkRequest bulkRequest) {
				System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
			}

			@Override
			public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
				throwable.printStackTrace();
			}

			@Override
			public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
				System.out.println(
						"bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
			}
		}).setBulkActions(20)
		  .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
		  .setFlushInterval(TimeValue.timeValueSeconds(5))
          .setConcurrentRequests(0)
		  .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
          .build();

Génération d'un processeur en bloc avec RestHighLevelClient


BulkProcessor processor =  BulkProcessor.builder((request, bulkListener) ->
                client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {

            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                System.out.println("bulkRequest = " + bulkRequest.numberOfActions());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                System.out.println(
                        "bulkResponse = " + bulkResponse.hasFailures() + " " + bulkResponse.buildFailureMessage());
            }
        }).setBulkActions(20)
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
          .setFlushInterval(TimeValue.timeValueSeconds(5))
          .setConcurrentRequests(0)
          .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
          .build();

Il n'y a pas de grande différence entre eux et TransportClient transmet le client généré tel qu'il est dans le premier argument de la méthode de générateur BulkProcessor, et RestHighLevelClient transmet les informations client à l'aide de la méthode bulkAsync dans lambda. Dans les deux deuxièmes arguments, définissez un écouteur qui implémente le traitement avant et après l'exécution.

Traitement des insertions

Vous pouvez utiliser la méthode add sans modifier le traitement au moment de l'insertion.

processor.add(new IndexRequest("Index de destination d'entrée").source("Des données d'entrée"));
processor.add(new IndexRequest("Index de destination d'entrée", "Type de destination d'entrée").source("Des données d'entrée"));
processor.add(new IndexRequest("Index de destination d'entrée", "Type de destination d'entrée", "Identifiant unique").source("Des données d'entrée"));

à la fin

Bien sûr, après avoir créé une instance de Listener comme documenté, vous pouvez séparer les définitions avec le générateur Bulk Processor. Lors de la mise en œuvre ferme du traitement de pré-exécution et de post-exécution, si vous séparez le traitement autour de cela, les responsabilités seront claires et vous vous sentirez bien.

Ensuite, je voudrais ajouter une fonction de pagination à l'exemple d'application à l'aide de l'API Scroll.

Recommended Posts

Utiliser l'API Bulk avec RestHighLevelClient
Comment utiliser l'API Java avec des expressions lambda
Utilisez l'API Clipboard de JavaFX
Utiliser ProGuard avec Gradle
Utiliser Puphpeteer avec Docker
Utilisez XVim2 avec Xcode 12.0.1
Utilisation de CentOS avec LXD
Utiliser Webmock avec Rspec
Utiliser les WebJars avec Gradle
Utilisez jlink avec gradle
Utiliser des couches Lambda avec Java
Utiliser GDAL avec Python avec Docker
Comment utiliser l'API Chain
Tester l'API Web avec junit
Utiliser le certificat pfx avec Okhttp3
Utilisez SDKMAN! Avec Git Bash
Utilisez plusieurs bases de données avec Rails 6.0
Test de l'API REST avec REST Assured
Utiliser Spring JDBC avec Spring Boot
Lier l'API avec Spring + Vue.js
Utilisez Ruby avec Google Colab
Utiliser SpatiaLite avec Java / JDBC
Utilisez log4j2 avec YAML + Gradle
[Docker] À utiliser à tout moment avec Docker + Rails
Exemple d'utilisation de l'API Bulk de Salesforce à partir d'un client Java avec PK-chunking
Utiliser PlantUML avec Visual Studio Code
Utiliser l'authentification de base avec Spring Boot
Spring avec Kotorin - 4 Conception d'API REST
Utiliser java avec MSYS et Cygwin
Utiliser le constructeur avec des arguments dans cucumber-picocontainer
Utiliser le type inet PostgreSQL avec DbUnit
Test de l'API REST à l'aide de REST Assured Part 2
Utilisez bootstrap 4 avec PlayFramework 2.6 (pas de CDN)
Utiliser Git avec SourceTree et Eclipse
Utiliser Azure Bing SpellCheck avec Java
[Rails] Utiliser des cookies en mode API
Utilisez JDBC avec Java et Scala.
Micro benchmark réalisé avec l'API JFR
Utiliser DataDog APM avec des frameworks non pris en charge
Utiliser Java 11 avec Google Cloud Functions
Comment utiliser mssql-tools avec Alpine
À partir de Spring Boot 0. Utilisez Spring CLI
Utilisation de cuda11.0 avec pytorch en utilisant Docker