In Fortsetzung von Artikel, den ich zuvor geschrieben habe habe ich diesmal versucht, Reladomos MT Loader zu verwenden.
Der Multi-Threaded Matcher Loader (MT Loader) ist eine Funktion zum Zusammenführen von Änderungen von einer anderen Quelle (Datei, Feed, anderer Datenbank usw.) zur Ausgabe-Datenbank. Nach dem Lesen der Dokumentation wird MT Loader häufig empfohlen, wenn die von Reladomo verarbeitete Datenmenge groß ist.
Das E / A kann mithilfe mehrerer Threads zum Lesen, Vergleichen und Schreiben verteilt werden.
Ich schreibe oft APIs in Kombination mit Spring Boot, also schreibe ich in dieser Kombination. Den aktuellen Code finden Sie unter https://github.com/amtkxa/spring-boot-reladomo-mt-loader.
Vor dem Ausführen des Testcodes möchte ich einen Zustand erstellen, in dem die Testdaten vorab in die Testdatenbank eingelesen wurden. Da davon ausgegangen wird, dass "MithraTestResource.addTestDataToDatabase" zum Lesen der Testdaten verwendet wird, bereiten Sie die folgende Datei vor.
customer_data.txt
class com.amtkxa.domain.entity.Customer
customerId, name, country, businessDateFrom, businessDateTo, processingDateFrom, processingDateTo
1,"Liam","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
2,"Emma","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
3,"Noah","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
4,"Olivia","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
5,"William","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
6,"Ava","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
7,"James","USA","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000","2019-12-01 00:00:00.000","9999-12-01 23:59:00.000"
Wir haben auch eine abstrakte Klasse vorbereitet, um die H2-Datenbank mit Testdaten vorab zu füllen.
public abstract class AbstractReladomoTest {
private static Logger log = LoggerFactory.getLogger(AbstractReladomoTest.class);
private MithraTestResource mithraTestResource;
protected abstract String[] getTestDataFilenames();
protected String getMithraConfigXmlFilename() {
return "reladomo/config/TestReladomoRuntimeConfiguration.xml";
}
@BeforeEach
public void setUp() {
log.info("Setting up reladomo on h2");
this.mithraTestResource = new MithraTestResource(this.getMithraConfigXmlFilename());
ConnectionManagerForTests connectionManager = ConnectionManagerForTests.getInstanceForDbName("testdb");
this.mithraTestResource.createSingleDatabase(connectionManager);
for (String filename : this.getTestDataFilenames()) {
this.mithraTestResource.addTestDataToDatabase(filename, connectionManager);
}
this.mithraTestResource.setUp();
}
@AfterEach
public void tearDown() {
this.mithraTestResource.tearDown();
}
}
Geben Sie MT Loader die folgenden Eingabedaten und versuchen Sie, die Datenbank zu aktualisieren.
--customerId: Land für 6 Benutzer aktualisiert
Der Testcode, den ich tatsächlich geschrieben habe, lautet wie folgt.
public class SingleQueueExecutorParallelLoadTest extends AbstractReladomoTest {
private static int NUMBER_OF_THREADS = 2;
private static int BATCH_SIZE = 5;
private static int INSERT_THREADS = 3;
@Override
public String[] getTestDataFilenames() {
return new String[] { "testdata/customer_data.txt" };
}
private List<Customer> getInputData() {
Timestamp businessDate = DateUtil.parse("2019-12-05 00:00:00");
CustomerList customerList = new CustomerList();
customerList.add(new Customer(6, "Ava", "JPN", businessDate));
customerList.add(new Customer(8, "Arthur", "USA", businessDate));
return customerList;
}
private CustomerList getDbRecords() {
return CustomerFinder.findMany(
CustomerFinder.all()
.and(CustomerFinder.businessDate().equalsEdgePoint())
.and(CustomerFinder.processingDate().equalsInfinity())
);
}
@Test
public void testLoadDataParallel() {
try {
QueueExecutor queueExecutor = new SingleQueueExecutor(
NUMBER_OF_THREADS,
CustomerFinder.customerId().ascendingOrderBy(),
BATCH_SIZE,
CustomerFinder.getFinderInstance(),
INSERT_THREADS
);
MatcherThread<Customer> matcherThread = new MatcherThread<>(
queueExecutor,
new Extractor[] { CustomerFinder.customerId() }
);
matcherThread.start();
// Database data load: Parallel
DbLoadThread dbLoadThread = new DbLoadThread(getDbRecords(), null, matcherThread);
dbLoadThread.start();
// Input data load: Parallel
PlainInputThread inputThread = new PlainInputThread(new InputDataLoader(), matcherThread);
inputThread.run();
matcherThread.waitTillDone();
// Assert
checkResult(queueExecutor);
} catch (Exception e) {
throw new ReladomoMTLoaderException("Failed to load data. " + e.getMessage(), e.getCause());
}
}
private void checkResult(QueueExecutor queueExecutor) {
// Whatever is in Output Set but not in Input Set will be closed out (terminated).
CustomerList customerList = getDbRecords();
assertEquals(2, customerList.count());
// Whatever is in the intersection, will be updated (but only if something changed)
Customer customer = CustomerFinder.findOne(
CustomerFinder.customerId().eq(6)
.and(CustomerFinder.businessDate().equalsEdgePoint())
.and(CustomerFinder.processingDate().equalsInfinity())
);
assertAll("Check updated customer data",
() -> assertEquals("Ava", customer.getName()),
() -> assertEquals("JPN", customer.getCountry()) // Updated from USD to JPN
);
// Whatever in in Input Set but not in Output Set will be inserted
Customer customer8 = CustomerFinder.findOne(
CustomerFinder.customerId().eq(8)
.and(CustomerFinder.businessDate().equalsEdgePoint())
.and(CustomerFinder.processingDate().equalsInfinity())
);
assertAll("Check inserted customer data",
() -> assertEquals("Arthur", customer8.getName()),
() -> assertEquals("USA", customer8.getCountry())
);
assertAll("Check the count of inserts, updates, terminates",
() -> assertEquals(1, queueExecutor.getTotalInserts()),
() -> assertEquals(1, queueExecutor.getTotalUpdates()),
() -> assertEquals(6, queueExecutor.getTotalTerminates())
);
}
private class InputDataLoader implements InputLoader {
private boolean firstTime = true;
@Override
public List<? extends MithraTransactionalObject> getNextParsedObjectList() {
return getInputData();
}
@Override
public boolean isFileParsingComplete() {
if (firstTime) {
firstTime = false;
return false;
} else {
return true;
}
}
}
}
Bevor der Aktualisierungsprozess von MT Loader funktioniert, befindet sich die Datenbank im folgenden Status.
-- select * from customer where business_date_to = '9999-12-01 23:59:00.000' and processing_date_to = '9999-12-01 23:59:00.000';
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| customer_id | name | country | business_date_from | business_date_to | processing_date_from | processing_date_to |
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| 1 | Liam | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 2 | Emma | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 3 | Noah | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 4 | Olivia | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 5 | William | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 6 | Ava | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
| 7 | James | USA | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 | 2019-12-01 00:00:00.000 | 9999-12-01 23:59:00.000 |
+-------------+---------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
Nachdem der Aktualisierungsprozess von MT Loader funktioniert hatte, befand sich die Datenbank im folgenden Zustand und das Ergebnis war wie erwartet.
-- select * from customer where business_date_to = '9999-12-01 23:59:00.000' and processing_date_to = '9999-12-01 23:59:00.000';
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| customer_id | name | country | business_date_from | business_date_to | processing_date_from | processing_date_to |
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
| 6 | Ava | JPN | 2020-10-10 14:24:52.387 | 9999-12-01 23:59:00.000 | 2020-10-10 14:24:53.250 | 9999-12-01 23:59:00.000 |
| 8 | Arthur | USA | 2020-10-10 14:24:52.387 | 9999-12-01 23:59:00.000 | 2020-10-10 14:24:53.250 | 9999-12-01 23:59:00.000 |
+-------------+--------+---------+-------------------------+-------------------------+-------------------------+-------------------------+
Reladomo ist eine meiner Lieblingstechnologien, die ich häufig bei der Arbeit verwende, aber ich habe den Eindruck, dass die Hürden, um einen Punkt zu erreichen, an dem es verstanden und verwendet werden kann, etwas hoch sind.
Zum Beispiel ..... Es gibt kein Beispiel in Kata, das in Spring Boot integriert werden kann, das relativ häufig verwendet wird. Wenn Sie versuchen, es zu verwenden, müssen Sie überlegen, wie Sie DBConnectionManager integrieren. Ich schäme mich zu sagen, dass ich große Probleme habe, etwas zu machen, das überhaupt funktioniert.
Reladomo selbst ist eine sehr gute Technologie, aber ich konnte nicht viele Artikel darüber finden ... Ich fand es etwas verschwenderisch, also habe ich es diesmal selbst überprüft. Ich beschloss, alles zusammen zu teilen.
Recommended Posts