Beispiel für die Verwendung der Bulk-API von Salesforce vom Java-Client mit PK-Chunking

Einführung

Ich habe ein Beispiel für SOAP und Bulk API in [Letztes Jahr] geschrieben (https://qiita.com/n_slender/items/c187949c6b02b42a27b5), also wird es die Fortsetzung sein. Ich habe den Code geschrieben, aber ich hatte keine Zeit, den Vorgang zu überprüfen, und ich bin mir nicht sicher, ob er ordnungsgemäß funktioniert.

Was ist PK-Chunking?

Die normale Bulk-API teilt die Abfrageergebnisse in 1-GB-Dateien (bis zu 15 Dateien) auf und lädt sie herunter. PK-Chunking ist ein Mechanismus zum Aufteilen der Abfrage mithilfe der Salesforce-ID.

Wenn die normale Abfrage lautet:

SELECT Name FROM Account

Beim PK-Chunking ist es ein Bild, das wie folgt unterteilt werden muss.

SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G
SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W
SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m
...
SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK

Wenn keine Bedingungen vorliegen, können Sie die für jede Abfrage erforderliche Zeit verkürzen, indem Sie die Bedingungen für die Aufteilung mit PK hinzufügen, auch wenn das Abschließen der Abfrage einige Zeit in Anspruch nimmt. (Es ist ein Bild, das die Verarbeitungszeit verkürzt, indem ein Job in mehrere Prozesse aufgeteilt wird.)

Da die maximale Anzahl von Unterteilungen 250.000 beträgt, wird erwartet, dass die Dateigröße der Ergebnisdatei reduziert werden kann.

Übersicht über die PK-Chunking-Verarbeitung

Offizielle Website Beschreibt die Prozedur zum Ausführen einer Reihe von Flows mit dem Befehl curl.

Zum Beispiel, wenn Sie versuchen, 1 Million Daten in PK-Chunks zu zerlegen

  1. PK-Chunking- und Chunksize-Einstellungen zum Anforderungsheader hinzugefügt
  2. Ein PK-Chunking-Job wird erstellt
  3. Für den Job sind vier Chargen registriert
  4. In jedem Stapel wird eine Abfrage ausgeführt, um 250.000 zu erhalten (parallel).
  5. Nach Abschluss des Stapels können Sie die URL zum Herunterladen der Abfrageergebnisdatei abrufen
  6. Laden Sie Dateien von jeder URL herunter

Es wird sein.

Code-Implementierungsrichtlinie

Die Abfrageergebnisse werden asynchron in mehreren Dateien erstellt, was schwierig zu handhaben ist. Aus praktischen Gründen habe ich ein Beispiel erstellt, das diese mehreren Ergebnisse in einer Datei zusammenfasst und komprimiert. https://github.com/JunNakamura/sfsample/blob/master/src/main/java/BulkChunkSaveSample.java

Kombinieren Sie mehrere Dateien zu einer

In solchen Fällen ist die Verwendung von Pipes eine Standardmethode, die nicht auf Java beschränkt ist. Wenn es sich um eine Shell handelt, wird sie als Pipe, Mkfifo usw. bezeichnet.

Für Java

  1. Schreiben Sie den Inhalt jeder Datei in PipedOutputStream
  2. Sie können den obigen Inhalt erhalten, indem Sie PipedInputStream in einem anderen Prozess lesen.

Es wird sein.

try (PipedOutputStream pipedOut = new PipedOutputStream(); 
     PipedInputStream pipedIn = new PipedInputStream(pipedOut); 
....


ExecutorService executor = Executors.newFixedThreadPool(batchList.size() + 1);
            //Schreiben Sie den Inhalt der Lesepipe in eine Datei in einem separaten Thread
            executor.submit(() -> {
                try {
                    String line;
                    while ((line = pipedReader.readLine()) != null) {
                        bw.write(line);
                        bw.newLine();
                    }
                } catch (Exception e) {
                    logger.error("Failed.", e);
                }
            });

	//Statusprüfung für jede Charge+Schreiben Sie das Ergebnis in die Pipe
            for (BatchInfo chunkBatch: batchList) {
                //Führen Sie eine asynchrone Ausführung durch, wenn der Netzwerkverkehr nicht eingeschränkt ist.
                // executor.submit(() -> BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter));
                BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter);
            }

Ein anderer Weg

Wenn die Anzahl der zu kombinierenden Dateien gering ist, z. B. 2-3, und das Lesen in Reihe erfolgen kann, SequenceInputStream ). Durch das Umschließen dieser Klasse können Sie mehrere Dateien logisch in eine lesen. Intern werden sie nacheinander gelesen. Die Verwendung ist etwas schwierig, da es nur zwei Muster von Konstruktorargumenten gibt, die Aufzählung oder zwei Variablen.

gzip-Komprimierung

Für Java wickeln Sie es einfach in einen GZIPOutputStream ein. Wenn Sie den Zeichencode angeben möchten, schließen Sie ihn mit OutputStreamWriter weiter ein. Wenn Sie eine CSV-Lese- / Schreibbibliothek verwenden möchten, haben Sie normalerweise einen Konstruktor, der einen Writer als Argument verwendet. Übergeben Sie also einfach einen OutputStreamWriter oder BufferWriter. (Die Menge an Code ist jedoch groß und ich fühle mich ein wenig müde)

OutputStream os = Files.newOutputStream(resultFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
GZIPOutputStream gzip = new GZIPOutputStream(os);
OutputStreamWriter ow = new OutputStreamWriter(gzip, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(ow)

Handhabung geteilter Chargen

Wenn Sie PK-Chunking machen,

  1. Die Charge zum Teilen wird zuerst registriert
  2. Der Status der Charge wird NICHT VERARBEITET
  3. Danach wird ein Stapel zum Ausführen der geteilten Abfrage registriert.

Es wird sein. Im ersten Beispiel werden also tatsächlich 5 Stapel erstellt, und die Ergebnisdatei wird aus dem 2. bis 5. Stapel abgerufen.

In Code

  1. Holen Sie sich alle registrierten Stapel-IDs aus dem Job
  2. Wenn der Status der ersten Stapel-ID Nicht verarbeitet ist, geben Sie die Liste der Stapel-IDs mit Ausnahme der ersten zurück.
  3. Wenn der Status fehlerhaft ist, wird die Verarbeitung unterbrochen
  4. Wenn der Status anders ist, warten Sie einen bestimmten Zeitraum

Es wird sein.

BatchInfoList batchInfoList = connection.getBatchInfoList(job.getId());
                List<BatchInfo> infoList = new ArrayList<>(Arrays.asList(batchInfoList.getBatchInfo()));
                BatchInfo batchInfo = infoList.get(0);
                switch (batchInfo.getState()) {
                    case NotProcessed:
                        //Der Stapel nach dem Beginn bezieht sich auf das Abfrageergebnis
                        infoList.remove(0);
                        result.complete(infoList);
                        break;
                    case Failed:
                        logger.warn("batch:" + job.getId() + " failed.");
                        result.complete(Collections.emptyList());
                        break;
                    default:
                        logger.info("-- waiting --");
                        logger.info("state: " + batchInfo.getState());
                }

Für eine Chargen-ID

  1. Fragen Sie regelmäßig ab, bis der Status abgeschlossen ist
  2. Wenn Sie fertig sind, rufen Sie die URL der Ergebnisdatei ab

Entspricht der normalen Bulk-API, und beim PK-Chunking gibt es nur mehrere. Es hängt von Ihren Anforderungen ab, ob Sie die Ergebnisdateien asynchron oder seriell in der Reihenfolge ihrer Erstellung abrufen möchten. Wenn die Geschwindigkeit priorisiert ist, ist sie asynchron.

Der schwierige Teil ist, dass logischerweise jeder Stapel auch mehrere Ergebnisdateien haben kann. Im ersten Beispiel überschreitet die Größe der Ergebnisdatei 1 GB auch nach 250.000 Teilungen. (In diesem Fall beträgt die Gesamtgröße mehr als 4 GB. Selbst wenn Sie sie teilen, scheint es, dass Sie an der Grenze der Massenabfrage hängen bleiben ...)

Da in diesem Beispiel jedoch eine Pipe verwendet wird, können Sie diesen Fall behandeln, indem Sie einfach das Ergebnis jeder Datei in die Pipe schreiben.

	//Schreiben Sie an Pipe
            for (String resultId: resultIds) {
                try (InputStream is = connection.getQueryResultStream(job.getId(), chunkBatch.getId(), resultId);
                     BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));) {
                    String line;
                    while ((line = br.readLine()) != null) {
                        pipedWriter.write(line);
                        pipedWriter.newLine();
                    }
                } 
				

Zusammenfassung

Die Quelle ist Salesforce, aber die technischen Elemente sind fast Java-Artikel ... Wenn Sie viele Entscheidungen treffen, wäre es notwendig, eine kürzere Zeit mit einer angemesseneren Implementierung zu verbringen, aber ich habe beschlossen, es zu versuchen. Es war gut, weil ich einige neue Techniken erlernen konnte.

Salesforce verfügt über ein GUI-Tool namens dataloader, mit dem Sie Salesforce-Daten bearbeiten können. Natürlich können Sie die Bulk-API verwenden, aber PK-Chunking unterstützt sie zum Zeitpunkt dieses Schreibens nicht. (PR scheint aktiv zu sein: https://github.com/forcedotcom/dataloader/pull/138)

Ich habe das Gefühl, irgendwie verstanden zu haben, dass ich es nicht unterstützt habe, weil es problematisch war.

p.s In der Readme-Datei von dataloader gibt es eine Möglichkeit, es mit cli zu verwenden. Ich wusste, dass es ein ausführbares Glas war, also dachte ich, ich könnte es tun, aber ich bin dankbar, dass es offiziell war. Es scheint, dass Beispielkonfigurationsdatei ebenfalls verfügbar ist.

Recommended Posts

Beispiel für die Verwendung der Bulk-API von Salesforce vom Java-Client mit PK-Chunking
API-Integration von Java mit Jersey Client
Beispielcode mit Minio aus Java
Datenverarbeitung mit der Stream-API von Java 8
Beispielcode zum Aufrufen der Yahoo! Shopping Product Search (v3) -API mit der offiziell von Java 11 eingeführten HTTP-Client-API
Interagieren Sie mit der LINE Message API mit Lambda (Java)
Implementieren Sie den API-Client mit nur Anmerkungen unter Verwendung von Feign (OpenFeign).
Verwenden mehrerer Java-Versionen mit Brew auf Mac + jEnv
Tipps zur Verwendung von Salesforce SOAP und Bulk API in Java
Verwenden Sie die Bulk-API mit RestHighLevelClient
Verwenden von Mapper mit Java (Spring)
Verwenden von Docker von Java Gradle
Hochladen / Herunterladen / Löschen von Daten in S3 mithilfe von Amazon S3 Client Builder mit AWS SDK für Java
[Details] Implementierung von Consumer-Anwendungen mit der Kinesis Client Library für Java
Erhalten Sie Flux-Ergebnisse von Spring Web Flux von JS mit der Fetch-API
Eine Geschichte über das Erreichen der League Of Legends-API mit JAVA
Ist die von Ihnen verwendete Version von Elasticsearch mit Java 11 kompatibel?
Was mir bei der Verwendung der Schnittstelle einer Funktion mit Standardargumenten in Kotlin aus Java nicht gefällt
[Java EE] Implementieren Sie den Client mit WebSocket
Exportieren Sie ein Problem mithilfe der Java-API von JIRA
Code Java von Emacs mit Eclim
Entwicklung von Flink mit der DataStream-API
Java HTTP Client API-Zeitlimiteinstellung
Versuchen Sie es mit Redis mit Java (jar)
Umgang mit Zeitzonen mit Java
Arbeiten Sie mit Google-Tabellen aus Java
Ich habe versucht, die Java8 Stream API zu verwenden
Verwenden von Java mit AWS Lambda-Eclipse-Vorbereitung
Beispiel für eine EXCEL-Dateiaktualisierung mit JAVA
HTML5-Entwicklung von Java mit TeaVM
Rufen Sie die Java-API von TensorFlow von Scala aus auf
Zusammenfassung der objektorientierten Programmierung mit Java
Verwenden des Proxy-Dienstes mit Java-Crawling
Ich habe versucht, Google HttpClient von Java zu verwenden
[Java] Json von der URL mit der Standard-API (javax.script) abrufen und verarbeiten
Erstellen Sie mit Docker eine Umgebung für "API-Entwicklung + API-Überprüfung mithilfe der Swagger-Benutzeroberfläche"
Verwenden von Java mit AWS Lambda-Implementierungstipps - Abrufen des Instanznamens aus Reagion und Instanz-ID
[Salesforce] Registrieren und Aktualisieren statischer Ressourcen mit der Tooling-API (Java-Beispiel-SOAP-API)
Java EE 8 (unter Verwendung von NetBeans IDE 8.2) ab Beispielcode Teil 1 Umgebungskonstruktion
[Java] Generieren Sie mithilfe der Stream-API eine verengte Liste aus mehreren Listen
Generieren Sie den Quellcode aus der JAR-Datei mit der JD-GUI des Java Decompiler-Projekts
[Java] Holen Sie sich MimeType aus dem Inhalt der Datei mit Apathce Tika [Kotlin]