J'ai écrit un échantillon d'API SOAP et Bulk dans l'année dernière, donc ce sera la suite. J'ai écrit le code, mais je n'ai pas eu le temps de vérifier l'opération, et je ne suis pas sûr que cela fonctionne correctement.
L'API Bulk normale divise les résultats de la requête en fichiers de 1 Go (jusqu'à 15 fichiers) et les télécharge. PK-chunking est un mécanisme pour fractionner la requête à l'aide de l'ID Salesforce.
Si la requête normale est:
SELECT Name FROM Account
Dans PK-chunking, c'est une image à diviser comme suit.
SELECT Name FROM Account WHERE Id >= 001300000000000 AND Id < 00130000000132G
SELECT Name FROM Account WHERE Id >= 00130000000132G AND Id < 00130000000264W
SELECT Name FROM Account WHERE Id >= 00130000000264W AND Id < 00130000000396m
...
SELECT Name FROM Account WHERE Id >= 00130000000euQ4 AND Id < 00130000000fxSK
S'il n'y a pas de conditions, même s'il faut du temps pour terminer la requête, vous pouvez raccourcir le temps requis pour chaque requête en ajoutant les conditions de fractionnement à l'aide de PK. (C'est une image qui raccourcit le temps de traitement en divisant un travail en plusieurs processus)
De plus, étant donné que le nombre maximal de divisions est de 250 000, on s'attend à ce que la taille de fichier du fichier de résultat puisse être réduite.
Site officiel Décrit la procédure d'exécution d'une série de flux avec la commande curl.
Par exemple, si vous essayez de segmenter PK 1 million de données
Ce sera.
Les résultats de la requête sont créés de manière asynchrone dans plusieurs fichiers, ce qui est difficile à gérer. Pour des raisons pratiques, j'ai créé un échantillon qui regroupe ces multiples résultats dans un seul fichier et le gzip. https://github.com/JunNakamura/sfsample/blob/master/src/main/java/BulkChunkSaveSample.java
En fait, l'utilisation de tubes dans de tels cas est une méthode standard, non limitée à Java. S'il s'agit d'un shell, il s'appellera pipe, mkfifo, etc.
Pour Java
Ce sera.
try (PipedOutputStream pipedOut = new PipedOutputStream();
PipedInputStream pipedIn = new PipedInputStream(pipedOut);
....
ExecutorService executor = Executors.newFixedThreadPool(batchList.size() + 1);
//Commencez à écrire le contenu du tube de lecture dans un fichier dans un thread séparé
executor.submit(() -> {
try {
String line;
while ((line = pipedReader.readLine()) != null) {
bw.write(line);
bw.newLine();
}
} catch (Exception e) {
logger.error("Failed.", e);
}
});
//Contrôle de l'état de chaque lot+Ecrire le résultat dans le tube
for (BatchInfo chunkBatch: batchList) {
//Effectuer de manière asynchrone s'il n'y a aucune restriction sur le trafic réseau.
// executor.submit(() -> BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter));
BulkChunkSaveSample.retrieveResult(job, connection, chunkBatch, pipedWriter);
}
Si le nombre de fichiers à combiner est petit, par exemple 2-3, et que la lecture peut être effectuée en série, SequenceInputStream. En intégrant cette classe, vous pouvez lire logiquement plusieurs fichiers en un seul. En interne, ils sont lus un par un dans l'ordre. C'est un peu difficile à utiliser car il n'y a que deux modèles d'arguments de constructeur, énumération ou deux variables.
Pour Java, enveloppez-le simplement dans un GZIPOutputStream. Si vous souhaitez spécifier le code de caractère, encapsulez-le davantage avec OutputStreamWriter. Si vous souhaitez utiliser une bibliothèque CSV en lecture / écriture, vous avez généralement un constructeur qui prend un Writer comme argument, il vous suffit donc de passer un OutputStreamWriter ou BufferWriter. (Cependant, la quantité de code est importante et je me sens un peu fatigué)
OutputStream os = Files.newOutputStream(resultFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
GZIPOutputStream gzip = new GZIPOutputStream(os);
OutputStreamWriter ow = new OutputStreamWriter(gzip, StandardCharsets.UTF_8);
BufferedWriter bw = new BufferedWriter(ow)
Lorsque vous effectuez une segmentation PK,
Ce sera. Ainsi, dans le premier exemple, 5 lots sont réellement créés, et le fichier résultat est obtenu du 2ème au 5ème lots.
Dans du code
Ce sera.
BatchInfoList batchInfoList = connection.getBatchInfoList(job.getId());
List<BatchInfo> infoList = new ArrayList<>(Arrays.asList(batchInfoList.getBatchInfo()));
BatchInfo batchInfo = infoList.get(0);
switch (batchInfo.getState()) {
case NotProcessed:
//Le lot après le début est lié au résultat de la requête
infoList.remove(0);
result.complete(infoList);
break;
case Failed:
logger.warn("batch:" + job.getId() + " failed.");
result.complete(Collections.emptyList());
break;
default:
logger.info("-- waiting --");
logger.info("state: " + batchInfo.getState());
}
Pour un ID de lot
Est la même que l'API Bulk normale, et avec la segmentation PK, il n'y en a que plusieurs. Cela dépend de vos besoins si vous souhaitez obtenir les fichiers de résultats de manière asynchrone ou en série dans l'ordre de création. Si la vitesse est prioritaire, elle sera asynchrone.
La partie délicate est que, logiquement, chaque lot peut également avoir plusieurs fichiers de résultats. Dans le premier exemple, la taille du fichier de résultats dépasse 1 Go même après 250 000 fractionnements. (Dans ce cas, la taille globale est supérieure à 4 Go, donc même si vous la divisez, il semble que vous serez pris dans la limite de la requête en masse ...)
Toutefois, étant donné que cet exemple utilise un canal, vous pouvez gérer ce cas en écrivant simplement le résultat de chaque fichier dans le canal.
//Écrire dans le tuyau
for (String resultId: resultIds) {
try (InputStream is = connection.getQueryResultStream(job.getId(), chunkBatch.getId(), resultId);
BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));) {
String line;
while ((line = br.readLine()) != null) {
pipedWriter.write(line);
pipedWriter.newLine();
}
}
La source est Salesforce, mais les éléments techniques sont presque des articles Java ... Si vous prenez beaucoup de décisions, il serait nécessaire de prendre un temps plus court avec une implémentation plus appropriée, mais j'ai décidé de faire un essai. C'était bien car j'ai pu acquérir de nouvelles techniques.
Salesforce dispose d'un outil GUI appelé dataloader qui vous permet de manipuler les données Salesforce. Bien sûr, vous pouvez utiliser l'API Bulk, mais PK-chunking ne la prend pas en charge au moment de la rédaction de cet article. (Le PR semble être en place: https://github.com/forcedotcom/dataloader/pull/138)
J'ai l'impression d'avoir compris d'une manière ou d'une autre que je ne l'ai pas soutenu parce que c'était gênant.
p.s Le fichier readme du chargeur de données a un moyen de l'utiliser avec cli. Je savais que c'était un fichier exécutable, alors j'ai pensé que je pouvais le faire, mais je suis reconnaissant que ce soit officiel. Il semble que Exemple de fichier de configuration soit également disponible.
Recommended Posts