Sample of using Salesforce's Bulk API from Java client with PK-chunking

Introduction

I wrote a sample of SOAP and Bulk API in Last year, so it will be the sequel. I wrote the code, but I didn't have time to check the operation, and I'm not sure if it works properly.

What is PK-chunking?

The normal Bulk API splits the query results into 1GB files (up to 15 files) and downloads them. PK-chunking is a mechanism to split the query using Salesforce ID.

If the normal query is:

SELECT Name FROM Account

In PK-chunking, it is an image to be divided as follows.

SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G
SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W
SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m
...
SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK

If there are no conditions, even if it takes time to complete the query, you can shorten the time required for each query by adding the conditions for splitting using PK. (It is an image that shortens the processing time by dividing one job in multiple processes)

Also, since the maximum number of divisions is 250,000, it is expected that the file size of the result file can be reduced.

PK-chunking processing overview

Official Site Describes the procedure for executing a series of flows with the curl command.

For example, if you try to PK-chunking 1 million data

  1. Added PK-chunking and chunksize settings to the request header
  2. PK-chunking job is created
  3. Four batches are registered for the job
  4. In each batch, a query to get 250,000 is executed (parallel)
  5. Once the batch is complete, you can get the URL to download the query result file
  6. Download the file from each URL

It will be.

Code implementation policy

The result of the query is created asynchronously in multiple files, which is difficult to handle. In consideration of practicality, I created a sample that aggregates these multiple results into one file and gzip-compresses them. https://github.com/JunNakamura/sfsample/blob/master/src/main/java/BulkChunkSaveSample.java

Combine multiple files into one

Actually, using pipes in such cases is a standard method, not limited to Java. If it is a shell, it will be named pipe, mkfifo, etc.

For Java

  1. Write the contents of each file to PipedOutputStream
  2. You can get the above contents by reading PipedInputStream in another process.

It will be.

try (PipedOutputStream pipedOut = new PipedOutputStream(); 
     PipedInputStream pipedIn = new PipedInputStream(pipedOut); 
....


ExecutorService executor = Executors.newFixedThreadPool(batchList.size() + 1);
            //Start writing the contents of the read pipe to a file in a separate thread
            executor.submit(() -> {
                try {
                    String line;
                    while ((line = pipedReader.readLine()) != null) {
                        bw.write(line);
                        bw.newLine();
                    }
                } catch (Exception e) {
                    logger.error("Failed.", e);
                }
            });

	//Status check for each batch+Write the result to the pipe
            for (BatchInfo chunkBatch: batchList) {
                //Asynchronous if there is no restriction on network traffic.
                // executor.submit(() -> BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter));
                BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter);
            }

Another way

If the number of files to be combined is small, such as 2-3, and reading can be done in series, SequenceInputStream. By wrapping with this class, you can logically combine the reading of multiple files into one. Internally, they are read one by one in order. It is a little difficult to use because there are only two patterns of constructor arguments, Enumeration or two variables.

gzip compression

For Java, just wrap it in a GZIPOutputStream. If you specify the character code, wrap it further with OutputStreamWriter. If you want to use a CSV read / write library, you usually have a constructor that takes a Writer as an argument, so just pass an OutputStreamWriter or BufferWriter. (However, the amount of code is large and I feel a little tired)

OutputStream os = Files.newOutputStream(resultFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
GZIPOutputStream gzip = new GZIPOutputStream(os);
OutputStreamWriter ow = new OutputStreamWriter(gzip, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(ow)

Handling of divided batches

When you do PK-chunking,

  1. The batch to split is registered first
  2. The status of the batch becomes NOT PROCESSED
  3. After that, a batch to execute the split query is registered

It will be. So, in the first example, 5 batches are actually created, and the result file is obtained from the 2nd to 5th batches.

In terms of code

  1. Get all registered batch IDs from the job
  2. If the status of the first batch ID is Not Processed, return the list of batch IDs excluding the first.
  3. If the status is error, processing is interrupted
  4. If the status is other than that, wait for a certain period of time

It will be.

BatchInfoList batchInfoList = connection.getBatchInfoList(job.getId());
                List<BatchInfo> infoList = new ArrayList<>(Arrays.asList(batchInfoList.getBatchInfo()));
                BatchInfo batchInfo = infoList.get(0);
                switch (batchInfo.getState()) {
                    case NotProcessed:
                        //The first and subsequent batches are related to query results
                        infoList.remove(0);
                        result.complete(infoList);
                        break;
                    case Failed:
                        logger.warn("batch:" + job.getId() + " failed.");
                        result.complete(Collections.emptyList());
                        break;
                    default:
                        logger.info("-- waiting --");
                        logger.info("state: " + batchInfo.getState());
                }

For one batch ID

  1. Periodically poll until status is complete
  2. When done, get the URL of the result file

Is the same as the normal Bulk API, and with PK-chunking, there are only multiple. It depends on your requirements whether you want to get the result files asynchronously or serially in order of creation. If speed is prioritized, it will be asynchronous.

The tricky part is that logically each batch can have multiple result files. In the first example, the size of the result file exceeds 1GB even after 250,000 splits. (In that case, the overall size is over 4GB, so even if you divide it, it seems that you will be caught in the limit of Bulk query ...)

However, since this sample uses a pipe, you can handle that case by simply writing the result of each file to the pipe.

	//Writing to the pipe
            for (String resultId: resultIds) {
                try (InputStream is = connection.getQueryResultStream(job.getId(), chunkBatch.getId(), resultId);
                     BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));) {
                    String line;
                    while ((line = br.readLine()) != null) {
                        pipedWriter.write(line);
                        pipedWriter.newLine();
                    }
                } 
				

Summary

The source is Salesforce, but the technical element is almost a Java article ... If you make a lot of decisions, it's possible to take a shorter time with a more appropriate implementation, but I decided to give it a try. It was good because I was able to acquire some new techniques.

Salesforce has a GUI tool called dataloader that allows you to manipulate Salesforce data. Of course, you can use the Bulk API, but PK-chunking doesn't support it at the time of this writing. (PR seems to be up: https://github.com/forcedotcom/dataloader/pull/138)

I feel like I somehow understood that I didn't support it because it was troublesome.

p.s The dataloader readme has a way to use it with cli. I knew that it was an executable jar, so I thought I could do it, but I'm grateful that it was official. It seems that Sample config file is also available.

Recommended Posts

Sample of using Salesforce's Bulk API from Java client with PK-chunking
API integration from Java with Jersey Client
Sample code using Minio from Java
Data processing using stream API from Java 8
Sample code to call Yahoo! Shopping Product Search (v3) API with HTTP Client API officially introduced from Java 11
Interact with LINE Message API using Lambda (Java)
[Java] Explanation of Strategy pattern (with sample code)
Implement API client with only annotations using Feign (OpenFeign)
Using multiple versions of Java with Brew on Mac + jEnv
Tips for using Salesforce SOAP and Bulk API in Java
Use Bulk API with RestHighLevelClient
Using Mapper with Java (Spring)
Using Docker from Java Gradle
Upload / download / bulk delete data to S3 using Amazon S3 Client Builder with AWS SDK for Java
[Details] Implementation of consumer applications with Kinesis Client Library for Java
Get Flux result of Spring Web Flux from JS with Fetch API
A story about hitting the League Of Legends API with JAVA
Is the version of Elasticsearch you are using compatible with Java 11?
What I don't like when using interface of a function with default arguments in Kotlin from Java
[Java EE] Implement Client with WebSocket
Export issues using JIRA's Java API
Code Java from Emacs with Eclim
Development of Flink using DataStream API
Java HTTP Client API timeout setting
Try using Redis with Java (jar)
Handling of time zones using Java
Work with Google Sheets from Java
I tried using Java8 Stream API
Using Java with AWS Lambda-Eclipse Preparation
EXCEL file update sample with JAVA
Html5 development with Java using TeaVM
Call TensorFlow Java API from Scala
Summary of object-oriented programming using Java
Using proxy service with Java crawling
I tried using GoogleHttpClient of Java
[Java] Get Json from URL and handle it with standard API (javax.script)
Build an environment of "API development + API verification using Swagger UI" with Docker
Using Java with AWS Lambda-Implementation Tips-Get Instance Name from Reagion and Instance ID
[Salesforce] Registering and updating static resources with Tooling API (Java sample SOAP API)
Java EE 8 (using NetBeans IDE 8.2) starting from sample code Part 1 Environment construction
[Java] Generate a narrowed list from multiple lists using the Stream API
Generate source code from JAR file with JD-GUI of Java Decompiler project
[Java] Get MimeType from the contents of the file with Apathce Tika [Kotlin]