Ich habe ein Beispiel für SOAP und Bulk API in [Letztes Jahr] geschrieben (https://qiita.com/n_slender/items/c187949c6b02b42a27b5), also wird es die Fortsetzung sein. Ich habe den Code geschrieben, aber ich hatte keine Zeit, den Vorgang zu überprüfen, und ich bin mir nicht sicher, ob er ordnungsgemäß funktioniert.
Die normale Bulk-API teilt die Abfrageergebnisse in 1-GB-Dateien (bis zu 15 Dateien) auf und lädt sie herunter. PK-Chunking ist ein Mechanismus zum Aufteilen der Abfrage mithilfe der Salesforce-ID.
Wenn die normale Abfrage lautet:
SELECT Name FROM Account
Beim PK-Chunking ist es ein Bild, das wie folgt unterteilt werden muss.
SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G
SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W
SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m
...
SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK
Wenn keine Bedingungen vorliegen, können Sie die für jede Abfrage erforderliche Zeit verkürzen, indem Sie die Bedingungen für die Aufteilung mit PK hinzufügen, auch wenn das Abschließen der Abfrage einige Zeit in Anspruch nimmt. (Es ist ein Bild, das die Verarbeitungszeit verkürzt, indem ein Job in mehrere Prozesse aufgeteilt wird.)
Da die maximale Anzahl von Unterteilungen 250.000 beträgt, wird erwartet, dass die Dateigröße der Ergebnisdatei reduziert werden kann.
Offizielle Website Beschreibt die Prozedur zum Ausführen einer Reihe von Flows mit dem Befehl curl.
Zum Beispiel, wenn Sie versuchen, 1 Million Daten in PK-Chunks zu zerlegen
Es wird sein.
Die Abfrageergebnisse werden asynchron in mehreren Dateien erstellt, was schwierig zu handhaben ist. Aus praktischen Gründen habe ich ein Beispiel erstellt, das diese mehreren Ergebnisse in einer Datei zusammenfasst und komprimiert. https://github.com/JunNakamura/sfsample/blob/master/src/main/java/BulkChunkSaveSample.java
In solchen Fällen ist die Verwendung von Pipes eine Standardmethode, die nicht auf Java beschränkt ist. Wenn es sich um eine Shell handelt, wird sie als Pipe, Mkfifo usw. bezeichnet.
Für Java
Es wird sein.
try (PipedOutputStream pipedOut = new PipedOutputStream();
PipedInputStream pipedIn = new PipedInputStream(pipedOut);
....
ExecutorService executor = Executors.newFixedThreadPool(batchList.size() + 1);
//Schreiben Sie den Inhalt der Lesepipe in eine Datei in einem separaten Thread
executor.submit(() -> {
try {
String line;
while ((line = pipedReader.readLine()) != null) {
bw.write(line);
bw.newLine();
}
} catch (Exception e) {
logger.error("Failed.", e);
}
});
//Statusprüfung für jede Charge+Schreiben Sie das Ergebnis in die Pipe
for (BatchInfo chunkBatch: batchList) {
//Führen Sie eine asynchrone Ausführung durch, wenn der Netzwerkverkehr nicht eingeschränkt ist.
// executor.submit(() -> BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter));
BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter);
}
Wenn die Anzahl der zu kombinierenden Dateien gering ist, z. B. 2-3, und das Lesen in Reihe erfolgen kann, SequenceInputStream ). Durch das Umschließen dieser Klasse können Sie mehrere Dateien logisch in eine lesen. Intern werden sie nacheinander gelesen. Die Verwendung ist etwas schwierig, da es nur zwei Muster von Konstruktorargumenten gibt, die Aufzählung oder zwei Variablen.
Für Java wickeln Sie es einfach in einen GZIPOutputStream ein. Wenn Sie den Zeichencode angeben möchten, schließen Sie ihn mit OutputStreamWriter weiter ein. Wenn Sie eine CSV-Lese- / Schreibbibliothek verwenden möchten, haben Sie normalerweise einen Konstruktor, der einen Writer als Argument verwendet. Übergeben Sie also einfach einen OutputStreamWriter oder BufferWriter. (Die Menge an Code ist jedoch groß und ich fühle mich ein wenig müde)
OutputStream os = Files.newOutputStream(resultFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
GZIPOutputStream gzip = new GZIPOutputStream(os);
OutputStreamWriter ow = new OutputStreamWriter(gzip, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(ow)
Wenn Sie PK-Chunking machen,
Es wird sein. Im ersten Beispiel werden also tatsächlich 5 Stapel erstellt, und die Ergebnisdatei wird aus dem 2. bis 5. Stapel abgerufen.
In Code
Es wird sein.
BatchInfoList batchInfoList = connection.getBatchInfoList(job.getId());
List<BatchInfo> infoList = new ArrayList<>(Arrays.asList(batchInfoList.getBatchInfo()));
BatchInfo batchInfo = infoList.get(0);
switch (batchInfo.getState()) {
case NotProcessed:
//Der Stapel nach dem Beginn bezieht sich auf das Abfrageergebnis
infoList.remove(0);
result.complete(infoList);
break;
case Failed:
logger.warn("batch:" + job.getId() + " failed.");
result.complete(Collections.emptyList());
break;
default:
logger.info("-- waiting --");
logger.info("state: " + batchInfo.getState());
}
Für eine Chargen-ID
Entspricht der normalen Bulk-API, und beim PK-Chunking gibt es nur mehrere. Es hängt von Ihren Anforderungen ab, ob Sie die Ergebnisdateien asynchron oder seriell in der Reihenfolge ihrer Erstellung abrufen möchten. Wenn die Geschwindigkeit priorisiert ist, ist sie asynchron.
Der schwierige Teil ist, dass logischerweise jeder Stapel auch mehrere Ergebnisdateien haben kann. Im ersten Beispiel überschreitet die Größe der Ergebnisdatei 1 GB auch nach 250.000 Teilungen. (In diesem Fall beträgt die Gesamtgröße mehr als 4 GB. Selbst wenn Sie sie teilen, scheint es, dass Sie an der Grenze der Massenabfrage hängen bleiben ...)
Da in diesem Beispiel jedoch eine Pipe verwendet wird, können Sie diesen Fall behandeln, indem Sie einfach das Ergebnis jeder Datei in die Pipe schreiben.
//Schreiben Sie an Pipe
for (String resultId: resultIds) {
try (InputStream is = connection.getQueryResultStream(job.getId(), chunkBatch.getId(), resultId);
BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));) {
String line;
while ((line = br.readLine()) != null) {
pipedWriter.write(line);
pipedWriter.newLine();
}
}
Die Quelle ist Salesforce, aber die technischen Elemente sind fast Java-Artikel ... Wenn Sie viele Entscheidungen treffen, wäre es notwendig, eine kürzere Zeit mit einer angemesseneren Implementierung zu verbringen, aber ich habe beschlossen, es zu versuchen. Es war gut, weil ich einige neue Techniken erlernen konnte.
Salesforce verfügt über ein GUI-Tool namens dataloader, mit dem Sie Salesforce-Daten bearbeiten können. Natürlich können Sie die Bulk-API verwenden, aber PK-Chunking unterstützt sie zum Zeitpunkt dieses Schreibens nicht. (PR scheint aktiv zu sein: https://github.com/forcedotcom/dataloader/pull/138)
Ich habe das Gefühl, irgendwie verstanden zu haben, dass ich es nicht unterstützt habe, weil es problematisch war.
p.s In der Readme-Datei von dataloader gibt es eine Möglichkeit, es mit cli zu verwenden. Ich wusste, dass es ein ausführbares Glas war, also dachte ich, ich könnte es tun, aber ich bin dankbar, dass es offiziell war. Es scheint, dass Beispielkonfigurationsdatei ebenfalls verfügbar ist.
Recommended Posts