I wrote a sample of SOAP and Bulk API in Last year, so it will be the sequel. I wrote the code, but I didn't have time to check the operation, and I'm not sure if it works properly.
The normal Bulk API splits the query results into 1GB files (up to 15 files) and downloads them. PK-chunking is a mechanism to split the query using Salesforce ID.
If the normal query is:
SELECT Name FROM Account
In PK-chunking, it is an image to be divided as follows.
SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G
SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W
SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m
...
SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK
If there are no conditions, even if it takes time to complete the query, you can shorten the time required for each query by adding the conditions for splitting using PK. (It is an image that shortens the processing time by dividing one job in multiple processes)
Also, since the maximum number of divisions is 250,000, it is expected that the file size of the result file can be reduced.
Official Site Describes the procedure for executing a series of flows with the curl command.
For example, if you try to PK-chunking 1 million data
It will be.
The result of the query is created asynchronously in multiple files, which is difficult to handle. In consideration of practicality, I created a sample that aggregates these multiple results into one file and gzip-compresses them. https://github.com/JunNakamura/sfsample/blob/master/src/main/java/BulkChunkSaveSample.java
Actually, using pipes in such cases is a standard method, not limited to Java. If it is a shell, it will be named pipe, mkfifo, etc.
For Java
It will be.
try (PipedOutputStream pipedOut = new PipedOutputStream();
PipedInputStream pipedIn = new PipedInputStream(pipedOut);
....
ExecutorService executor = Executors.newFixedThreadPool(batchList.size() + 1);
//Start writing the contents of the read pipe to a file in a separate thread
executor.submit(() -> {
try {
String line;
while ((line = pipedReader.readLine()) != null) {
bw.write(line);
bw.newLine();
}
} catch (Exception e) {
logger.error("Failed.", e);
}
});
//Status check for each batch+Write the result to the pipe
for (BatchInfo chunkBatch: batchList) {
//Asynchronous if there is no restriction on network traffic.
// executor.submit(() -> BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter));
BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter);
}
If the number of files to be combined is small, such as 2-3, and reading can be done in series, SequenceInputStream. By wrapping with this class, you can logically combine the reading of multiple files into one. Internally, they are read one by one in order. It is a little difficult to use because there are only two patterns of constructor arguments, Enumeration or two variables.
For Java, just wrap it in a GZIPOutputStream. If you specify the character code, wrap it further with OutputStreamWriter. If you want to use a CSV read / write library, you usually have a constructor that takes a Writer as an argument, so just pass an OutputStreamWriter or BufferWriter. (However, the amount of code is large and I feel a little tired)
OutputStream os = Files.newOutputStream(resultFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
GZIPOutputStream gzip = new GZIPOutputStream(os);
OutputStreamWriter ow = new OutputStreamWriter(gzip, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(ow)
When you do PK-chunking,
It will be. So, in the first example, 5 batches are actually created, and the result file is obtained from the 2nd to 5th batches.
In terms of code
It will be.
BatchInfoList batchInfoList = connection.getBatchInfoList(job.getId());
List<BatchInfo> infoList = new ArrayList<>(Arrays.asList(batchInfoList.getBatchInfo()));
BatchInfo batchInfo = infoList.get(0);
switch (batchInfo.getState()) {
case NotProcessed:
//The first and subsequent batches are related to query results
infoList.remove(0);
result.complete(infoList);
break;
case Failed:
logger.warn("batch:" + job.getId() + " failed.");
result.complete(Collections.emptyList());
break;
default:
logger.info("-- waiting --");
logger.info("state: " + batchInfo.getState());
}
For one batch ID
Is the same as the normal Bulk API, and with PK-chunking, there are only multiple. It depends on your requirements whether you want to get the result files asynchronously or serially in order of creation. If speed is prioritized, it will be asynchronous.
The tricky part is that logically each batch can have multiple result files. In the first example, the size of the result file exceeds 1GB even after 250,000 splits. (In that case, the overall size is over 4GB, so even if you divide it, it seems that you will be caught in the limit of Bulk query ...)
However, since this sample uses a pipe, you can handle that case by simply writing the result of each file to the pipe.
//Writing to the pipe
for (String resultId: resultIds) {
try (InputStream is = connection.getQueryResultStream(job.getId(), chunkBatch.getId(), resultId);
BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));) {
String line;
while ((line = br.readLine()) != null) {
pipedWriter.write(line);
pipedWriter.newLine();
}
}
The source is Salesforce, but the technical element is almost a Java article ... If you make a lot of decisions, it's possible to take a shorter time with a more appropriate implementation, but I decided to give it a try. It was good because I was able to acquire some new techniques.
Salesforce has a GUI tool called dataloader that allows you to manipulate Salesforce data. Of course, you can use the Bulk API, but PK-chunking doesn't support it at the time of this writing. (PR seems to be up: https://github.com/forcedotcom/dataloader/pull/138)
I feel like I somehow understood that I didn't support it because it was troublesome.
p.s The dataloader readme has a way to use it with cli. I knew that it was an executable jar, so I thought I could do it, but I'm grateful that it was official. It seems that Sample config file is also available.
Recommended Posts