Exemple d'utilisation de l'API Bulk de Salesforce à partir d'un client Java avec PK-chunking

introduction

J'ai écrit un échantillon d'API SOAP et Bulk dans l'année dernière, donc ce sera la suite. J'ai écrit le code, mais je n'ai pas eu le temps de vérifier l'opération, et je ne suis pas sûr que cela fonctionne correctement.

Qu'est-ce que PK-chunking?

L'API Bulk normale divise les résultats de la requête en fichiers de 1 Go (jusqu'à 15 fichiers) et les télécharge. PK-chunking est un mécanisme pour fractionner la requête à l'aide de l'ID Salesforce.

Si la requête normale est:

SELECT Name FROM Account

Dans PK-chunking, c'est une image à diviser comme suit.

SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G
SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W
SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m
...
SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK

S'il n'y a pas de conditions, même s'il faut du temps pour terminer la requête, vous pouvez raccourcir le temps requis pour chaque requête en ajoutant les conditions de fractionnement à l'aide de PK. (C'est une image qui raccourcit le temps de traitement en divisant un travail en plusieurs processus)

De plus, étant donné que le nombre maximal de divisions est de 250 000, on s'attend à ce que la taille de fichier du fichier de résultat puisse être réduite.

Présentation du traitement PK-chunking

Site officiel Décrit la procédure d'exécution d'une série de flux avec la commande curl.

Par exemple, si vous essayez de segmenter PK 1 million de données

  1. Ajout des paramètres PK-chunking et chunksize à l'en-tête de la demande
  2. Le travail de segmentation PK est créé
  3. Quatre lots sont enregistrés pour le travail
  4. Dans chaque lot, une requête pour obtenir 250 000 est exécutée (parallèle)
  5. Une fois le lot terminé, vous pouvez obtenir l'URL pour télécharger le fichier de résultat de la requête
  6. Téléchargez des fichiers depuis chaque URL

Ce sera.

Politique de mise en œuvre du code

Les résultats de la requête sont créés de manière asynchrone dans plusieurs fichiers, ce qui est difficile à gérer. Pour des raisons pratiques, j'ai créé un échantillon qui regroupe ces multiples résultats dans un seul fichier et le gzip. https://github.com/JunNakamura/sfsample/blob/master/src/main/java/BulkChunkSaveSample.java

Combinez plusieurs fichiers en un

En fait, l'utilisation de tubes dans de tels cas est une méthode standard, non limitée à Java. S'il s'agit d'un shell, il s'appellera pipe, mkfifo, etc.

Pour Java

  1. Écrivez le contenu de chaque fichier dans PipedOutputStream
  2. Vous pouvez obtenir le contenu ci-dessus en lisant PipedInputStream dans un autre processus.

Ce sera.

try (PipedOutputStream pipedOut = new PipedOutputStream(); 
     PipedInputStream pipedIn = new PipedInputStream(pipedOut); 
....


ExecutorService executor = Executors.newFixedThreadPool(batchList.size() + 1);
            //Commencez à écrire le contenu du tube de lecture dans un fichier dans un thread séparé
            executor.submit(() -> {
                try {
                    String line;
                    while ((line = pipedReader.readLine()) != null) {
                        bw.write(line);
                        bw.newLine();
                    }
                } catch (Exception e) {
                    logger.error("Failed.", e);
                }
            });

	//Contrôle de l'état de chaque lot+Ecrire le résultat dans le tube
            for (BatchInfo chunkBatch: batchList) {
                //Effectuer de manière asynchrone s'il n'y a aucune restriction sur le trafic réseau.
                // executor.submit(() -> BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter));
                BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter);
            }

Autrement

Si le nombre de fichiers à combiner est petit, par exemple 2-3, et que la lecture peut être effectuée en série, SequenceInputStream. En intégrant cette classe, vous pouvez lire logiquement plusieurs fichiers en un seul. En interne, ils sont lus un par un dans l'ordre. C'est un peu difficile à utiliser car il n'y a que deux modèles d'arguments de constructeur, énumération ou deux variables.

compression gzip

Pour Java, enveloppez-le simplement dans un GZIPOutputStream. Si vous souhaitez spécifier le code de caractère, encapsulez-le davantage avec OutputStreamWriter. Si vous souhaitez utiliser une bibliothèque CSV en lecture / écriture, vous avez généralement un constructeur qui prend un Writer comme argument, il vous suffit donc de passer un OutputStreamWriter ou BufferWriter. (Cependant, la quantité de code est importante et je me sens un peu fatigué)

OutputStream os = Files.newOutputStream(resultFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
GZIPOutputStream gzip = new GZIPOutputStream(os);
OutputStreamWriter ow = new OutputStreamWriter(gzip, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(ow)

Traitement des lots divisés

Lorsque vous effectuez une segmentation PK,

  1. Le lot à fractionner est enregistré en premier
  2. Le statut du lot devient NON TRAITÉ
  3. Après cela, un lot pour exécuter la requête fractionnée est enregistré.

Ce sera. Ainsi, dans le premier exemple, 5 lots sont réellement créés, et le fichier résultat est obtenu du 2ème au 5ème lots.

Dans du code

  1. Obtenez tous les ID de lot enregistrés à partir du travail
  2. Si le statut du premier ID de lot est Non traité, renvoyez la liste des ID de lot à l'exclusion du premier.
  3. Si l'état est erreur, le traitement est interrompu
  4. Si le statut est différent de celui-là, attendez un certain temps

Ce sera.

BatchInfoList batchInfoList = connection.getBatchInfoList(job.getId());
                List<BatchInfo> infoList = new ArrayList<>(Arrays.asList(batchInfoList.getBatchInfo()));
                BatchInfo batchInfo = infoList.get(0);
                switch (batchInfo.getState()) {
                    case NotProcessed:
                        //Le lot après le début est lié au résultat de la requête
                        infoList.remove(0);
                        result.complete(infoList);
                        break;
                    case Failed:
                        logger.warn("batch:" + job.getId() + " failed.");
                        result.complete(Collections.emptyList());
                        break;
                    default:
                        logger.info("-- waiting --");
                        logger.info("state: " + batchInfo.getState());
                }

Pour un ID de lot

  1. Interroger périodiquement jusqu'à ce que l'état soit complet
  2. Une fois terminé, obtenez l'URL du fichier de résultats

Est la même que l'API Bulk normale, et avec la segmentation PK, il n'y en a que plusieurs. Cela dépend de vos besoins si vous souhaitez obtenir les fichiers de résultats de manière asynchrone ou en série dans l'ordre de création. Si la vitesse est prioritaire, elle sera asynchrone.

La partie délicate est que, logiquement, chaque lot peut également avoir plusieurs fichiers de résultats. Dans le premier exemple, la taille du fichier de résultats dépasse 1 Go même après 250 000 fractionnements. (Dans ce cas, la taille globale est supérieure à 4 Go, donc même si vous la divisez, il semble que vous serez pris dans la limite de la requête en masse ...)

Toutefois, étant donné que cet exemple utilise un canal, vous pouvez gérer ce cas en écrivant simplement le résultat de chaque fichier dans le canal.

	//Écrire dans le tuyau
            for (String resultId: resultIds) {
                try (InputStream is = connection.getQueryResultStream(job.getId(), chunkBatch.getId(), resultId);
                     BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));) {
                    String line;
                    while ((line = br.readLine()) != null) {
                        pipedWriter.write(line);
                        pipedWriter.newLine();
                    }
                } 
				

Résumé

La source est Salesforce, mais les éléments techniques sont presque des articles Java ... Si vous prenez beaucoup de décisions, il serait nécessaire de prendre un temps plus court avec une implémentation plus appropriée, mais j'ai décidé de faire un essai. C'était bien car j'ai pu acquérir de nouvelles techniques.

Salesforce dispose d'un outil GUI appelé dataloader qui vous permet de manipuler les données Salesforce. Bien sûr, vous pouvez utiliser l'API Bulk, mais PK-chunking ne la prend pas en charge au moment de la rédaction de cet article. (Le PR semble être en place: https://github.com/forcedotcom/dataloader/pull/138)

J'ai l'impression d'avoir compris d'une manière ou d'une autre que je ne l'ai pas soutenu parce que c'était gênant.

p.s Le fichier readme du chargeur de données a un moyen de l'utiliser avec cli. Je savais que c'était un fichier exécutable, alors j'ai pensé que je pouvais le faire, mais je suis reconnaissant que ce soit officiel. Il semble que Exemple de fichier de configuration soit également disponible.

Recommended Posts

Exemple d'utilisation de l'API Bulk de Salesforce à partir d'un client Java avec PK-chunking
Intégration API de Java avec Jersey Client
Exemple de code utilisant Minio de Java
Traitement des données à l'aide de l'API de flux de Java 8
Exemple de code pour appeler l'API Yahoo! Shopping Product Search (v3) avec l'API client HTTP officiellement introduite à partir de Java 11
Interagir avec l'API de message LINE à l'aide de Lambda (Java)
Implémenter le client API avec juste des annotations à l'aide de Feign (OpenFeign)
Utilisation de plusieurs versions de Java avec Brew sur Mac + jEnv
Conseils d'utilisation de Salesforce SOAP et de l'API Bulk en Java
Utiliser l'API Bulk avec RestHighLevelClient
Utilisation de Mapper avec Java (Spring)
Utilisation de Docker depuis Java Gradle
Importer / télécharger / supprimer en bloc des données sur S3 à l'aide d'Amazon S3 Client Builder avec AWS SDK pour Java
[Détails] Implémentation d'applications grand public avec Kinesis Client Library for Java
Obtenez les résultats Flux de Spring Web Flux de JS avec Fetch API
Une histoire sur l'utilisation de l'API League Of Legends avec JAVA
La version d'Elasticsearch que vous utilisez est-elle compatible avec Java 11?
Ce que je n'aime pas lors de l'utilisation de l'interface d'une fonction avec des arguments par défaut dans Kotlin depuis Java
[Java EE] Implémenter le client avec WebSocket
Exporter un problème à l'aide de l'API Java de JIRA
Coder Java depuis Emacs avec Eclim
Développement de Flink à l'aide de l'API DataStream
Paramètre de délai d'expiration de l'API du client HTTP Java
Essayez d'utiliser Redis avec Java (jar)
Gestion des fuseaux horaires avec Java
Travailler avec des feuilles de calcul Google à partir de Java
J'ai essayé d'utiliser l'API Java8 Stream
Utilisation de Java avec AWS Lambda-Eclipse Préparation
Exemple de mise à jour de fichier EXCEL avec JAVA
Développement HTML5 par Java avec TeaVM
Appelez l'API Java de TensorFlow depuis Scala
Résumé de la programmation orientée objet utilisant Java
Utilisation du service proxy avec l'exploration Java
J'ai essayé d'utiliser Google HttpClient de Java
[Java] Obtenir et gérer Json à partir d'une URL avec une API standard (javax.script)
Créez un environnement de "développement d'API + vérification d'API à l'aide de Swagger UI" avec Docker
Utilisation de Java avec AWS Lambda-Implementation Tips - Obtenir le nom de l'instance à partir de la réaction et de l'ID d'instance
[Salesforce] Enregistrement et mise à jour des ressources statiques avec l'API Tooling (exemple d'API SOAP Java)
Java EE 8 (utilisant NetBeans IDE 8.2) à partir de l'exemple de code Partie 1 Construction de l'environnement
[Java] Générez une liste restreinte à partir de plusieurs listes à l'aide de l'API Stream
Générer le code source à partir du fichier JAR avec JD-GUI du projet Java Decompiler
[Java] Récupère MimeType à partir du contenu du fichier avec Apathce Tika [Kotlin]