Simplify put code
This commit is contained in:
parent
36cd976ac9
commit
790883889a
@ -55,7 +55,6 @@ import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -65,6 +64,7 @@ import java.util.stream.Stream;
|
||||
|
||||
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
|
||||
import org.github.gestalt.config.exceptions.GestaltException;
|
||||
import org.jetbrains.annotations.Contract;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.jetbrains.annotations.VisibleForTesting;
|
||||
@ -474,6 +474,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
||||
}
|
||||
|
||||
@Override
|
||||
@Contract("_, false -> true; _, true -> _")
|
||||
public boolean closeTransaction(long transactionId, boolean commit) {
|
||||
var start = System.nanoTime();
|
||||
try {
|
||||
@ -484,15 +485,20 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return false if failed optimistic commit
|
||||
*/
|
||||
@Contract("_, false -> true; _, true -> _")
|
||||
private boolean closeTransactionInternal(long transactionId, boolean commit) {
|
||||
var tx = txs.get(transactionId);
|
||||
if (tx != null) {
|
||||
try {
|
||||
var committed = closeTransactionInternal(tx, commit);
|
||||
if (committed) {
|
||||
txs.remove(transactionId, tx);
|
||||
var succeeded = closeTransactionInternal(tx, commit);
|
||||
if (!succeeded) {
|
||||
return false;
|
||||
}
|
||||
return committed;
|
||||
txs.remove(transactionId, tx);
|
||||
return true;
|
||||
} catch (Throwable ex) {
|
||||
txs.remove(transactionId, tx);
|
||||
throw ex;
|
||||
@ -507,13 +513,18 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return false if failed optimistic commit
|
||||
*/
|
||||
@Contract("_, false -> true; _, true -> _")
|
||||
private boolean closeTransactionInternal(@NotNull Tx tx, boolean commit) {
|
||||
ops.beginOp();
|
||||
try {
|
||||
// Transaction found
|
||||
try {
|
||||
if (commit) {
|
||||
if (!commitTxOptimistically(tx)) {
|
||||
boolean succeeded = commitTxOptimistically(tx);
|
||||
if (!succeeded) {
|
||||
// Do not call endOp here, since the transaction is still open
|
||||
return false;
|
||||
}
|
||||
@ -547,6 +558,9 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return false if failed optimistic commit
|
||||
*/
|
||||
private boolean commitTxOptimistically(@NotNull Tx tx) throws org.rocksdb.RocksDBException {
|
||||
try {
|
||||
tx.val().commit();
|
||||
@ -873,47 +887,6 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param txConsumer this can be called multiple times, if the optimistic transaction failed
|
||||
*/
|
||||
public <T extends DBWriter, R> R wrapWithTransactionIfNeeded(@Nullable T tx, boolean needTransaction,
|
||||
ExFunction<@Nullable T, R> txConsumer) throws Exception {
|
||||
if (needTransaction) {
|
||||
return ensureWrapWithTransaction(tx, txConsumer);
|
||||
} else {
|
||||
return txConsumer.apply(tx);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param txConsumer this can be called multiple times, if the optimistic transaction failed
|
||||
*/
|
||||
public <T extends DBWriter, R> R ensureWrapWithTransaction(@Nullable T tx,
|
||||
ExFunction<@NotNull T, R> txConsumer) throws Exception {
|
||||
R result;
|
||||
if (tx == null) {
|
||||
// Retry using a transaction: transactions are required to handle this kind of data
|
||||
var newTx = this.openTransactionInternal(Long.MAX_VALUE, false);
|
||||
try {
|
||||
boolean committed;
|
||||
do {
|
||||
//noinspection unchecked
|
||||
result = txConsumer.apply((T) newTx);
|
||||
committed = this.closeTransactionInternal(newTx, true);
|
||||
if (!committed) {
|
||||
Thread.yield();
|
||||
}
|
||||
} while (!committed);
|
||||
} finally {
|
||||
this.closeTransactionInternal(newTx, false);
|
||||
}
|
||||
} else {
|
||||
result = txConsumer.apply(tx);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private <U> U put(Arena arena,
|
||||
@Nullable DBWriter optionalDbWriter,
|
||||
ColumnInstance col,
|
||||
@ -941,85 +914,107 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
|
||||
"Column with buckets don't support write batches");
|
||||
}
|
||||
return wrapWithTransactionIfNeeded(optionalDbWriter, needsTx, dbWriter -> {
|
||||
MemorySegment previousValue;
|
||||
MemorySegment calculatedKey = col.calculateKey(arena, keys.keys());
|
||||
if (updateId != 0L) {
|
||||
assert dbWriter instanceof Tx;
|
||||
((Tx) dbWriter).val().setSavePoint();
|
||||
}
|
||||
if (col.hasBuckets()) {
|
||||
assert dbWriter instanceof Tx;
|
||||
var bucketElementKeys = col.getBucketElementKeys(keys.keys());
|
||||
try (var readOptions = newReadOptions()) {
|
||||
var previousRawBucketByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
|
||||
MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray);
|
||||
var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col);
|
||||
previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value));
|
||||
var k = Utils.toByteArray(calculatedKey);
|
||||
var v = Utils.toByteArray(bucket.toSegment(arena));
|
||||
((Tx) dbWriter).val().put(col.cfh(), k, v);
|
||||
} catch (org.rocksdb.RocksDBException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e);
|
||||
|
||||
|
||||
|
||||
U result;
|
||||
DBWriter newTx;
|
||||
boolean owningNewTx = needsTx && optionalDbWriter == null;
|
||||
// Retry using a transaction: transactions are required to handle this kind of data
|
||||
newTx = owningNewTx ? this.openTransactionInternal(120_000, false) : optionalDbWriter;
|
||||
try {
|
||||
boolean committedOwnedTx;
|
||||
do {
|
||||
MemorySegment previousValue;
|
||||
MemorySegment calculatedKey = col.calculateKey(arena, keys.keys());
|
||||
if (updateId != 0L) {
|
||||
((Tx) newTx).val().setSavePoint();
|
||||
}
|
||||
} else {
|
||||
if (RequestType.requiresGettingPreviousValue(callback)) {
|
||||
assert dbWriter instanceof Tx;
|
||||
if (col.hasBuckets()) {
|
||||
assert newTx instanceof Tx;
|
||||
var bucketElementKeys = col.getBucketElementKeys(keys.keys());
|
||||
try (var readOptions = newReadOptions()) {
|
||||
byte[] previousValueByteArray;
|
||||
previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
|
||||
previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray));
|
||||
var previousRawBucketByteArray = ((Tx) newTx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
|
||||
MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray);
|
||||
var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col);
|
||||
previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value));
|
||||
var k = Utils.toByteArray(calculatedKey);
|
||||
var v = Utils.toByteArray(bucket.toSegment(arena));
|
||||
((Tx) newTx).val().put(col.cfh(), k, v);
|
||||
} catch (org.rocksdb.RocksDBException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
|
||||
}
|
||||
} else if (RequestType.requiresGettingPreviousPresence(callback)) {
|
||||
// todo: in the future this should be replaced with just keyExists
|
||||
assert dbWriter instanceof Tx;
|
||||
try (var readOptions = newReadOptions()) {
|
||||
byte[] previousValueByteArray;
|
||||
previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
|
||||
previousValue = previousValueByteArray != null ? MemorySegment.NULL : null;
|
||||
} catch (org.rocksdb.RocksDBException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e);
|
||||
}
|
||||
} else {
|
||||
previousValue = null;
|
||||
}
|
||||
switch (dbWriter) {
|
||||
case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
|
||||
case SSTWriter sstWriter -> {
|
||||
var keyBB = calculatedKey.asByteBuffer();
|
||||
ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer();
|
||||
sstWriter.put(keyBB, valueBB);
|
||||
if (RequestType.requiresGettingPreviousValue(callback)) {
|
||||
assert newTx instanceof Tx;
|
||||
try (var readOptions = newReadOptions()) {
|
||||
byte[] previousValueByteArray;
|
||||
previousValueByteArray = ((Tx) newTx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
|
||||
previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray));
|
||||
} catch (org.rocksdb.RocksDBException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
|
||||
}
|
||||
} else if (RequestType.requiresGettingPreviousPresence(callback)) {
|
||||
// todo: in the future this should be replaced with just keyExists
|
||||
assert newTx instanceof Tx;
|
||||
try (var readOptions = newReadOptions()) {
|
||||
byte[] previousValueByteArray;
|
||||
previousValueByteArray = ((Tx) newTx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
|
||||
previousValue = previousValueByteArray != null ? MemorySegment.NULL : null;
|
||||
} catch (org.rocksdb.RocksDBException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
|
||||
}
|
||||
} else {
|
||||
previousValue = null;
|
||||
}
|
||||
case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
|
||||
case null -> {
|
||||
try (var w = new WriteOptions()) {
|
||||
switch (newTx) {
|
||||
case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
|
||||
case SSTWriter sstWriter -> {
|
||||
var keyBB = calculatedKey.asByteBuffer();
|
||||
ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer();
|
||||
db.get().put(col.cfh(), w, keyBB, valueBB);
|
||||
sstWriter.put(keyBB, valueBB);
|
||||
}
|
||||
case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
|
||||
case null -> {
|
||||
try (var w = new WriteOptions()) {
|
||||
var keyBB = calculatedKey.asByteBuffer();
|
||||
ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer();
|
||||
db.get().put(col.cfh(), w, keyBB, valueBB);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
U result = RequestType.safeCast(switch (callback) {
|
||||
case RequestType.RequestNothing<?> ignored -> null;
|
||||
case RequestType.RequestPrevious<?> ignored -> previousValue;
|
||||
case RequestType.RequestPreviousPresence<?> ignored -> previousValue != null;
|
||||
case RequestType.RequestChanged<?> ignored -> !Utils.valueEquals(previousValue, value);
|
||||
case RequestType.RequestDelta<?> ignored -> new Delta<>(previousValue, value);
|
||||
});
|
||||
result = RequestType.safeCast(switch (callback) {
|
||||
case RequestType.RequestNothing<?> ignored -> null;
|
||||
case RequestType.RequestPrevious<?> ignored -> previousValue;
|
||||
case RequestType.RequestPreviousPresence<?> ignored -> previousValue != null;
|
||||
case RequestType.RequestChanged<?> ignored -> !Utils.valueEquals(previousValue, value);
|
||||
case RequestType.RequestDelta<?> ignored -> new Delta<>(previousValue, value);
|
||||
});
|
||||
|
||||
if (updateId != 0L) {
|
||||
if (!closeTransaction(updateId, true)) {
|
||||
((Tx) dbWriter).val().rollbackToSavePoint();
|
||||
((Tx) dbWriter).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey));
|
||||
throw new RocksDBRetryException();
|
||||
if (updateId != 0L) {
|
||||
if (!closeTransaction(updateId, true)) {
|
||||
((Tx) newTx).val().rollbackToSavePoint();
|
||||
((Tx) newTx).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey));
|
||||
throw new RocksDBRetryException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
});
|
||||
if (owningNewTx) {
|
||||
committedOwnedTx = this.closeTransactionInternal((Tx) newTx, true);
|
||||
if (!committedOwnedTx) {
|
||||
Thread.yield();
|
||||
}
|
||||
} else {
|
||||
committedOwnedTx = true;
|
||||
}
|
||||
} while (!committedOwnedTx);
|
||||
} finally {
|
||||
if (owningNewTx) {
|
||||
this.closeTransactionInternal((Tx) newTx, false);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
} catch (Exception ex) {
|
||||
if (updateId != 0L && !(ex instanceof RocksDBRetryException)) {
|
||||
closeTransaction(updateId, false);
|
||||
@ -1046,23 +1041,26 @@ public class EmbeddedDB implements RocksDBSyncAPI, InternalConnection, Closeable
|
||||
try {
|
||||
// Column id
|
||||
var col = getColumn(columnId);
|
||||
Tx tx = transactionOrUpdateId != 0 ? getTransaction(transactionOrUpdateId, true) : null;
|
||||
Tx prevTx = transactionOrUpdateId != 0 ? getTransaction(transactionOrUpdateId, true) : null;
|
||||
Tx tx;
|
||||
long updateId;
|
||||
if (requestType instanceof RequestType.RequestForUpdate<?>) {
|
||||
if (tx == null) {
|
||||
if (prevTx == null) {
|
||||
tx = openTransactionInternal(MAX_TRANSACTION_DURATION_MS, true);
|
||||
updateId = allocateTransactionInternal(tx);
|
||||
} else {
|
||||
tx = prevTx;
|
||||
updateId = transactionOrUpdateId;
|
||||
}
|
||||
} else {
|
||||
tx = prevTx;
|
||||
updateId = 0;
|
||||
}
|
||||
|
||||
try {
|
||||
return get(arena, tx, updateId, col, keys, requestType);
|
||||
} catch (Throwable ex) {
|
||||
if (updateId != 0 && tx.isFromGetForUpdate()) {
|
||||
if (tx != prevTx) {
|
||||
closeTransaction(updateId, false);
|
||||
}
|
||||
throw ex;
|
||||
|
Loading…
x
Reference in New Issue
Block a user