Using the Java client library (Google Cloud Client Library for Java) to use the datastore There is. We investigated how this transaction works for multithreaded processing. The official document had the following specification explanation, so I checked it while running the code.
For one or more common entity groups, if multiple transactions try to change an entity at the same time, only the first transaction that commits the change will succeed and all other transactions will fail to commit. Become.
We also confirmed what happens when processing via Transaction object and processing via Datastore object are mixed.
Documentation Transaction docs
Check what happens when you try to update an entity from multiple threads in the following three cases
Get entity from Thread1 via Transaction and sleep -> Get entity from Thread2 via Transaction & update -> Update via Transaction with Thread1
Get entity from Thread1 via Transaction and sleep -> Get entity from Thread2 via Datastore & update -> Update via Transaction with Thread1
Get entity from Thread1 via Datastore and sleep -> Get entity from Thread2 via Transaction & update -> Update via Datastore with Thread1
In case of transaction.update, update after confirming whether the entity has changed from the time of acquisition. The exception is when the state of the entity has changed. It doesn't matter if the update process is via a transaction object or a datastore object.
code
package jp.ne.opt.spinapp.runner;
import com.google.cloud.datastore.*;
public final class DatastoreTest {
final static String NAME_SPACE = "dataflow";
final static String LOCK_KIND = "lock";
final static String LOCK_PROP_VALUE = "value";
final static Datastore DATASTORE = DatastoreOptions.getDefaultInstance().getService();
final static KeyFactory KEY_FACTORY = DATASTORE.newKeyFactory().setNamespace(NAME_SPACE).setKind(LOCK_KIND);
final static Transaction TRANSACTION = DATASTORE.newTransaction();
public static void main(final String[] args) throws InterruptedException {
MultiThread1 mt1 = new MultiThread1();
MultiThread2 mt2 = new MultiThread2();
mt1.start();
Thread.sleep(1000);
mt2.start();
}
}
class MultiThread1 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Thread.sleep(3000);
System.out.println("sleep 1 ended");
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread1 done.");
}
}
class MultiThread2 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread2 done.");
}
}
got lock from 2: 0
got lock from 1: 0
thread2 done.
sleep 1 ended
[WARNING]
com.google.cloud.datastore.DatastoreException: transaction is no longer active
After thread2 commits, I get a transaction is no longer active error when trying to update on thread1. Datastore is updated with Thread2.
class MultiThread1 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Thread.sleep(10000);
System.out.println("sleep 1 ended");
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread1 done.");
}
}
class MultiThread2 extends Thread {
public void run() {
Entity lock = DatastoreTest.DATASTORE.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
DatastoreTest.DATASTORE.update(entity);
System.out.println("thread2 done.");
}
}
got lock from 1: 0
got lock from 2: 0
thread2 done.
sleep 1 ended
[WARNING]
com.google.cloud.datastore.DatastoreException: too much contention on these datastore entities. please try again. entity groups: [(app=b~spinapptest-151310!dataflow, lock, "test")]
After updating with thread2, an error trying to update with thread1. Datastore is updated with Thread2.
class MultiThread1 extends Thread {
public void run() {
try {
Entity lock = DatastoreTest.DATASTORE.get(DatastoreTest.KEY_FACTORY.newKey("test"));
Thread.sleep(10000);
System.out.println("sleep 1 ended");
System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
DatastoreTest.DATASTORE.update(entity);
System.out.println("thread1 done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MultiThread2 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread2 done.");
}
}
got lock from 2: 0
thread2 done.
sleep 1 ended
got lock from 1: 0
thread1 done.
Both Thread processes succeed.
Recommended Posts