Implement update transactions

This commit is contained in:
Andrea Cavalli 2023-12-09 17:45:47 +01:00
parent dc43a82e71
commit c1d0b78b1e
10 changed files with 236 additions and 54 deletions

View File

@ -47,6 +47,11 @@ public class EmbeddedConnection extends BaseConnection {
return db.closeTransaction(transactionId, commit); return db.closeTransaction(transactionId, commit);
} }
@Override
public void closeFailedUpdate(long updateId) throws RocksDBException {
db.closeFailedUpdate(updateId);
}
@Override @Override
public long createColumn(String name, @NotNull ColumnSchema schema) { public long createColumn(String name, @NotNull ColumnSchema schema) {
return db.createColumn(name, schema); return db.createColumn(name, schema);
@ -64,12 +69,12 @@ public class EmbeddedConnection extends BaseConnection {
@Override @Override
public <T> T put(Arena arena, public <T> T put(Arena arena,
long transactionId, long transactionOrUpdateId,
long columnId, long columnId,
@NotNull MemorySegment @NotNull [] keys, @NotNull MemorySegment @NotNull [] keys,
@NotNull MemorySegment value, @NotNull MemorySegment value,
PutCallback<? super MemorySegment, T> callback) throws RocksDBException { PutCallback<? super MemorySegment, T> callback) throws RocksDBException {
return db.put(arena, transactionId, columnId, keys, value, callback); return db.put(arena, transactionOrUpdateId, columnId, keys, value, callback);
} }
@Override @Override

View File

@ -39,6 +39,10 @@ public abstract class SocketConnection extends BaseConnection {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void closeFailedUpdate(long updateId) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override @Override
public long createColumn(String name, @NotNull ColumnSchema schema) { public long createColumn(String name, @NotNull ColumnSchema schema) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
@ -56,7 +60,7 @@ public abstract class SocketConnection extends BaseConnection {
@Override @Override
public <T> T put(Arena arena, public <T> T put(Arena arena,
long transactionId, long transactionOrUpdateId,
long columnId, long columnId,
MemorySegment @NotNull [] keys, MemorySegment @NotNull [] keys,
@NotNull MemorySegment value, @NotNull MemorySegment value,

View File

@ -1,7 +1,9 @@
package it.cavallium.rockserver.core.common; package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.Callback.CallbackForUpdate;
import it.cavallium.rockserver.core.common.Callback.CallbackPreviousPresence; import it.cavallium.rockserver.core.common.Callback.CallbackPreviousPresence;
import java.util.List; import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> { public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> {
@ -17,7 +19,8 @@ public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> {
} }
static boolean requiresGettingCurrentValue(GetCallback<?, ?> callback) { static boolean requiresGettingCurrentValue(GetCallback<?, ?> callback) {
return callback instanceof CallbackCurrent<?>; return callback instanceof CallbackCurrent<?>
|| callback instanceof Callback.CallbackForUpdate<?>;
} }
static <U> U safeCast(Object previousValue) { static <U> U safeCast(Object previousValue) {
@ -35,6 +38,11 @@ public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> {
return (CallbackCurrent<T>) CallbackCurrent.INSTANCE; return (CallbackCurrent<T>) CallbackCurrent.INSTANCE;
} }
@SuppressWarnings("unchecked")
static <T> CallbackForUpdate<T> forUpdate() {
return (CallbackForUpdate<T>) CallbackForUpdate.INSTANCE;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static <T> CallbackDelta<T> delta() { static <T> CallbackDelta<T> delta() {
return (CallbackDelta<T>) CallbackDelta.INSTANCE; return (CallbackDelta<T>) CallbackDelta.INSTANCE;
@ -88,6 +96,11 @@ public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> {
private static final CallbackCurrent<Object> INSTANCE = new CallbackCurrent<>(); private static final CallbackCurrent<Object> INSTANCE = new CallbackCurrent<>();
} }
record CallbackForUpdate<T>() implements GetCallback<T, @NotNull UpdateContext<@Nullable T>> {
private static final CallbackForUpdate<Object> INSTANCE = new CallbackForUpdate<>();
}
record CallbackExists<T>() implements GetCallback<T, Boolean>, IteratorCallback<T, Boolean> { record CallbackExists<T>() implements GetCallback<T, Boolean>, IteratorCallback<T, Boolean> {
private static final CallbackExists<Object> INSTANCE = new CallbackExists<>(); private static final CallbackExists<Object> INSTANCE = new CallbackExists<>();

View File

@ -26,6 +26,13 @@ public interface RocksDBAPI {
*/ */
boolean closeTransaction(long transactionId, boolean commit) throws RocksDBException; boolean closeTransaction(long transactionId, boolean commit) throws RocksDBException;
/**
* Close a failed update, discarding all changes
*
* @param updateId update id to close
*/
void closeFailedUpdate(long updateId) throws RocksDBException;
/** /**
* Create a column * Create a column
* @param name column name * @param name column name
@ -50,14 +57,14 @@ public interface RocksDBAPI {
/** /**
* Put an element into the specified position * Put an element into the specified position
* @param arena arena * @param arena arena
* @param transactionId transaction id, or 0 * @param transactionOrUpdateId transaction id, update id, or 0
* @param columnId column id * @param columnId column id
* @param keys column keys, or empty array if not needed * @param keys column keys, or empty array if not needed
* @param value value, or null if not needed * @param value value, or null if not needed
* @param callback the callback will be executed on the same thread, exactly once. * @param callback the callback will be executed on the same thread, exactly once.
*/ */
<T> T put(Arena arena, <T> T put(Arena arena,
long transactionId, long transactionOrUpdateId,
long columnId, long columnId,
@NotNull MemorySegment @NotNull[] keys, @NotNull MemorySegment @NotNull[] keys,
@NotNull MemorySegment value, @NotNull MemorySegment value,

View File

@ -25,8 +25,7 @@ public class RocksDBException extends RuntimeException {
COMMIT_FAILED_TRY_AGAIN, COMMIT_FAILED_TRY_AGAIN,
COMMIT_FAILED, COMMIT_FAILED,
TX_NOT_FOUND, TX_NOT_FOUND,
KEY_HASH_SIZE_MISMATCH, KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR
ROCKSDB_LOAD_ERROR
} }
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) {
@ -49,26 +48,26 @@ public class RocksDBException extends RuntimeException {
} }
} }
private RocksDBException(RocksDBErrorType errorUniqueId, String message) { protected RocksDBException(RocksDBErrorType errorUniqueId, String message) {
super(message); super(message);
this.errorUniqueId = errorUniqueId; this.errorUniqueId = errorUniqueId;
} }
private RocksDBException(RocksDBErrorType errorUniqueId, String message, Throwable ex) { protected RocksDBException(RocksDBErrorType errorUniqueId, String message, Throwable ex) {
super(message, ex); super(message, ex);
this.errorUniqueId = errorUniqueId; this.errorUniqueId = errorUniqueId;
} }
private RocksDBException(RocksDBErrorType errorUniqueId, Throwable ex) { protected RocksDBException(RocksDBErrorType errorUniqueId, Throwable ex) {
super(ex.toString(), ex); super(ex.toString(), ex);
this.errorUniqueId = errorUniqueId; this.errorUniqueId = errorUniqueId;
} }
private RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) { protected RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) {
this(errorUniqueId, ex.getMessage()); this(errorUniqueId, ex.getMessage());
} }
private RocksDBException(RocksDBErrorType errorUniqueId, String message, org.rocksdb.RocksDBException ex) { protected RocksDBException(RocksDBErrorType errorUniqueId, String message, org.rocksdb.RocksDBException ex) {
this(errorUniqueId, message + ": " + ex.getMessage()); this(errorUniqueId, message + ": " + ex.getMessage());
} }

View File

@ -0,0 +1,8 @@
package it.cavallium.rockserver.core.common;
public class RocksDBRetryException extends RocksDBException {
public RocksDBRetryException() {
super(RocksDBErrorType.UPDATE_RETRY, "Please, retry the transaction");
}
}

View File

@ -0,0 +1,6 @@
package it.cavallium.rockserver.core.common;
import java.lang.foreign.MemorySegment;
import org.jetbrains.annotations.Nullable;
public record UpdateContext<T>(@Nullable T previous, long updateId) {}

View File

@ -7,6 +7,7 @@ import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.Callback; import it.cavallium.rockserver.core.common.Callback;
import it.cavallium.rockserver.core.common.Callback.CallbackExists; import it.cavallium.rockserver.core.common.Callback.CallbackExists;
import it.cavallium.rockserver.core.common.Callback.CallbackForUpdate;
import it.cavallium.rockserver.core.common.Callback.GetCallback; import it.cavallium.rockserver.core.common.Callback.GetCallback;
import it.cavallium.rockserver.core.common.Callback.IteratorCallback; import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
import it.cavallium.rockserver.core.common.Callback.PutCallback; import it.cavallium.rockserver.core.common.Callback.PutCallback;
@ -14,6 +15,8 @@ import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Delta; import it.cavallium.rockserver.core.common.Delta;
import it.cavallium.rockserver.core.common.RocksDBAPI; import it.cavallium.rockserver.core.common.RocksDBAPI;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import it.cavallium.rockserver.core.common.RocksDBRetryException;
import it.cavallium.rockserver.core.common.UpdateContext;
import it.cavallium.rockserver.core.common.Utils; import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.config.ConfigParser; import it.cavallium.rockserver.core.config.ConfigParser;
import it.cavallium.rockserver.core.config.ConfigPrinter; import it.cavallium.rockserver.core.config.ConfigPrinter;
@ -23,6 +26,7 @@ import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects; import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
import it.cavallium.rockserver.core.impl.rocksdb.Tx;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
@ -35,8 +39,6 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
@ -55,13 +57,14 @@ import org.rocksdb.WriteOptions;
public class EmbeddedDB implements RocksDBAPI, Closeable { public class EmbeddedDB implements RocksDBAPI, Closeable {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
public static final long MAX_TRANSACTION_DURATION_MS = 10_000L;
private static final boolean USE_FAST_GET = true; private static final boolean USE_FAST_GET = true;
private final Logger logger; private final Logger logger;
private final @Nullable Path path; private final @Nullable Path path;
private final TransactionalDB db; private final TransactionalDB db;
private final NonBlockingHashMapLong<ColumnInstance> columns; private final NonBlockingHashMapLong<ColumnInstance> columns;
private final ConcurrentMap<String, Long> columnNamesIndex; private final ConcurrentMap<String, Long> columnNamesIndex;
private final NonBlockingHashMapLong<REntry<Transaction>> txs; private final NonBlockingHashMapLong<Tx> txs;
private final NonBlockingHashMapLong<REntry<RocksIterator>> its; private final NonBlockingHashMapLong<REntry<RocksIterator>> its;
private final SafeShutdown ops; private final SafeShutdown ops;
private final Object columnEditLock = new Object(); private final Object columnEditLock = new Object();
@ -129,7 +132,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
public void close() throws IOException { public void close() throws IOException {
// Wait for 10 seconds // Wait for 10 seconds
try { try {
ops.closeAndWait(10_000); ops.closeAndWait(MAX_TRANSACTION_DURATION_MS);
if (path == null) { if (path == null) {
Utils.deleteDirectory(db.getPath()); Utils.deleteDirectory(db.getPath());
} }
@ -140,16 +143,20 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
@Override @Override
public long openTransaction(long timeoutMs) { public long openTransaction(long timeoutMs) {
return FastRandomUtils.allocateNewValue(txs, openTransactionInternal(timeoutMs), Long.MIN_VALUE, -2); return allocateTransactionInternal(openTransactionInternal(timeoutMs, false));
} }
private REntry<Transaction> openTransactionInternal(long timeoutMs) { private long allocateTransactionInternal(Tx tx) {
return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2);
}
private Tx openTransactionInternal(long timeoutMs, boolean isFromGetForUpdate) {
// Open the transaction operation, do not close until the transaction has been closed // Open the transaction operation, do not close until the transaction has been closed
ops.beginOp(); ops.beginOp();
try { try {
TransactionalOptions txOpts = db.createTransactionalOptions(timeoutMs); TransactionalOptions txOpts = db.createTransactionalOptions(timeoutMs);
var writeOpts = new WriteOptions(); var writeOpts = new WriteOptions();
return new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts)); return new Tx(db.beginTransaction(writeOpts, txOpts), isFromGetForUpdate, new RocksDBObjects(writeOpts, txOpts));
} catch (Throwable ex) { } catch (Throwable ex) {
ops.endOp(); ops.endOp();
throw ex; throw ex;
@ -180,7 +187,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} }
} }
private boolean closeTransaction(@NotNull REntry<Transaction> tx, boolean commit) { private boolean closeTransaction(@NotNull Tx tx, boolean commit) {
ops.beginOp(); ops.beginOp();
try { try {
// Transaction found // Transaction found
@ -209,7 +216,12 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} }
} }
private boolean commitTxOptimistically(@NotNull REntry<Transaction> tx) throws RocksDBException { @Override
public void closeFailedUpdate(long updateId) throws it.cavallium.rockserver.core.common.RocksDBException {
this.closeTransaction(updateId, false);
}
private boolean commitTxOptimistically(@NotNull Tx tx) throws RocksDBException {
try { try {
tx.val().commit(); tx.val().commit();
return true; return true;
@ -292,7 +304,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
@Override @Override
public <T> T put(Arena arena, public <T> T put(Arena arena,
long transactionId, long transactionOrUpdateId,
long columnId, long columnId,
@NotNull MemorySegment @NotNull [] keys, @NotNull MemorySegment @NotNull [] keys,
@NotNull MemorySegment value, @NotNull MemorySegment value,
@ -301,13 +313,14 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
try { try {
// Column id // Column id
var col = getColumn(columnId); var col = getColumn(columnId);
REntry<Transaction> tx; Tx tx;
if (transactionId != 0) { if (transactionOrUpdateId != 0) {
tx = getTransaction(transactionId); tx = getTransaction(transactionOrUpdateId, true);
} else { } else {
tx = null; tx = null;
} }
return put(arena, tx, col, keys, value, callback); long updateId = tx != null && tx.isFromGetForUpdate() ? transactionOrUpdateId : 0L;
return put(arena, tx, col, updateId, keys, value, callback);
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) { } catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex; throw ex;
} catch (Exception ex) { } catch (Exception ex) {
@ -320,8 +333,8 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
/** /**
* @param txConsumer this can be called multiple times, if the optimistic transaction failed * @param txConsumer this can be called multiple times, if the optimistic transaction failed
*/ */
public <R> R wrapWithTransactionIfNeeded(@Nullable REntry<Transaction> tx, boolean needTransaction, public <R> R wrapWithTransactionIfNeeded(@Nullable Tx tx, boolean needTransaction,
ExFunction<@Nullable REntry<Transaction>, R> txConsumer) throws Exception { ExFunction<@Nullable Tx, R> txConsumer) throws Exception {
if (needTransaction) { if (needTransaction) {
return ensureWrapWithTransaction(tx, txConsumer); return ensureWrapWithTransaction(tx, txConsumer);
} else { } else {
@ -333,17 +346,20 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
/** /**
* @param txConsumer this can be called multiple times, if the optimistic transaction failed * @param txConsumer this can be called multiple times, if the optimistic transaction failed
*/ */
public <R> R ensureWrapWithTransaction(@Nullable REntry<Transaction> tx, public <R> R ensureWrapWithTransaction(@Nullable Tx tx,
ExFunction<@NotNull REntry<Transaction>, R> txConsumer) throws Exception { ExFunction<@NotNull Tx, R> txConsumer) throws Exception {
R result; R result;
if (tx == null) { if (tx == null) {
// Retry using a transaction: transactions are required to handle this kind of data // Retry using a transaction: transactions are required to handle this kind of data
var newTx = this.openTransactionInternal(Long.MAX_VALUE); var newTx = this.openTransactionInternal(Long.MAX_VALUE, false);
try { try {
boolean committed; boolean committed;
do { do {
result = txConsumer.apply(newTx); result = txConsumer.apply(newTx);
committed = this.closeTransaction(newTx, true); committed = this.closeTransaction(newTx, true);
if (!committed) {
Thread.yield();
}
} while (!committed); } while (!committed);
} finally { } finally {
this.closeTransaction(newTx, false); this.closeTransaction(newTx, false);
@ -355,20 +371,35 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} }
private <U> U put(Arena arena, private <U> U put(Arena arena,
@Nullable REntry<Transaction> optionalTx, @Nullable Tx optionalTxOrUpdate,
ColumnInstance col, ColumnInstance col,
long updateId,
@NotNull MemorySegment @NotNull[] keys, @NotNull MemorySegment @NotNull[] keys,
@NotNull MemorySegment value, @NotNull MemorySegment value,
PutCallback<? super MemorySegment, U> callback) throws it.cavallium.rockserver.core.common.RocksDBException { PutCallback<? super MemorySegment, U> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
// Check for null value // Check for null value
col.checkNullableValue(value); col.checkNullableValue(value);
try { try {
boolean requirePreviousValue = Callback.requiresGettingPreviousValue(callback);
boolean requirePreviousPresence = Callback.requiresGettingPreviousPresence(callback);
boolean needsTx = col.hasBuckets() boolean needsTx = col.hasBuckets()
|| Callback.requiresGettingPreviousValue(callback) || requirePreviousValue
|| Callback.requiresGettingPreviousPresence(callback); || requirePreviousPresence;
return wrapWithTransactionIfNeeded(optionalTx, needsTx, tx -> { if (optionalTxOrUpdate != null && optionalTxOrUpdate.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"You can't get the previous value or delta, when you are already updating that value");
}
if (updateId != 0L && optionalTxOrUpdate == null) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"Update id must be accompanied with a valid transaction");
}
return wrapWithTransactionIfNeeded(optionalTxOrUpdate, needsTx, tx -> {
MemorySegment previousValue; MemorySegment previousValue;
MemorySegment calculatedKey = col.calculateKey(arena, keys); MemorySegment calculatedKey = col.calculateKey(arena, keys);
if (updateId != 0L) {
assert tx != null;
tx.val().setSavePoint();
}
if (col.hasBuckets()) { if (col.hasBuckets()) {
assert tx != null; assert tx != null;
var bucketElementKeys = col.getBucketElementKeys(keys); var bucketElementKeys = col.getBucketElementKeys(keys);
@ -414,18 +445,33 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} }
} }
} }
return Callback.safeCast(switch (callback) { U result = Callback.safeCast(switch (callback) {
case Callback.CallbackVoid<?> ignored -> null; case Callback.CallbackVoid<?> ignored -> null;
case Callback.CallbackPrevious<?> ignored -> previousValue; case Callback.CallbackPrevious<?> ignored -> previousValue;
case Callback.CallbackPreviousPresence<?> ignored -> previousValue != null; case Callback.CallbackPreviousPresence<?> ignored -> previousValue != null;
case Callback.CallbackChanged<?> ignored -> !Utils.valueEquals(previousValue, value); case Callback.CallbackChanged<?> ignored -> !Utils.valueEquals(previousValue, value);
case Callback.CallbackDelta<?> ignored -> new Delta<>(previousValue, value); case Callback.CallbackDelta<?> ignored -> new Delta<>(previousValue, value);
}); });
if (updateId != 0L) {
if (!closeTransaction(updateId, true)) {
tx.val().rollbackToSavePoint();
tx.val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey));
throw new RocksDBRetryException();
}
}
return result;
}); });
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) { } catch (Exception ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); if (updateId != 0L && !(ex instanceof RocksDBRetryException)) {
closeTransaction(updateId, false);
}
if (ex instanceof it.cavallium.rockserver.core.common.RocksDBException rocksDBException) {
throw rocksDBException;
} else {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
}
} }
} }
@ -439,24 +485,46 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
long columnId, long columnId,
MemorySegment @NotNull [] keys, MemorySegment @NotNull [] keys,
GetCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException { GetCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
// Column id
var col = getColumn(columnId);
Tx tx = transactionId != 0 ? getTransaction(transactionId, true) : null;
long updateId;
if (callback instanceof Callback.CallbackForUpdate<?>) {
if (tx == null) {
tx = openTransactionInternal(MAX_TRANSACTION_DURATION_MS, true);
updateId = allocateTransactionInternal(tx);
} else {
updateId = transactionId;
}
} else {
updateId = 0;
}
try {
return get(arena, tx, updateId, col, keys, callback);
} catch (Throwable ex) {
if (updateId != 0 && tx.isFromGetForUpdate()) {
closeTransaction(updateId, false);
}
throw ex;
}
}
private <T> T get(Arena arena,
Tx tx,
long updateId,
ColumnInstance col,
MemorySegment @NotNull [] keys,
GetCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp(); ops.beginOp();
try { try {
// Column id
var col = getColumn(columnId);
if (!col.schema().hasValue() && Callback.requiresGettingCurrentValue(callback)) { if (!col.schema().hasValue() && Callback.requiresGettingCurrentValue(callback)) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.VALUE_MUST_BE_NULL, throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.VALUE_MUST_BE_NULL,
"The specified callback requires a return value, but this column does not have values!"); "The specified callback requires a return value, but this column does not have values!");
} }
MemorySegment foundValue; MemorySegment foundValue;
boolean existsValue; boolean existsValue;
REntry<Transaction> tx;
if (transactionId != 0) {
tx = getTransaction(transactionId);
} else {
tx = null;
}
MemorySegment calculatedKey = col.calculateKey(arena, keys); MemorySegment calculatedKey = col.calculateKey(arena, keys);
if (col.hasBuckets()) { if (col.hasBuckets()) {
var bucketElementKeys = col.getBucketElementKeys(keys); var bucketElementKeys = col.getBucketElementKeys(keys);
@ -496,6 +564,10 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
return Callback.safeCast(switch (callback) { return Callback.safeCast(switch (callback) {
case Callback.CallbackVoid<?> ignored -> null; case Callback.CallbackVoid<?> ignored -> null;
case Callback.CallbackCurrent<?> ignored -> foundValue; case Callback.CallbackCurrent<?> ignored -> foundValue;
case Callback.CallbackForUpdate<?> ignored -> {
assert updateId != 0;
yield new UpdateContext<>(foundValue, updateId);
}
case Callback.CallbackExists<?> ignored -> existsValue; case Callback.CallbackExists<?> ignored -> existsValue;
}); });
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) { } catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
@ -523,7 +595,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
var ro = new ReadOptions(); var ro = new ReadOptions();
if (transactionId > 0L) { if (transactionId > 0L) {
//noinspection resource //noinspection resource
it = getTransaction(transactionId).val().getIterator(ro, col.cfh()); it = getTransaction(transactionId, false).val().getIterator(ro, col.cfh());
} else { } else {
it = db.get().newIterator(col.cfh()); it = db.get().newIterator(col.cfh());
} }
@ -571,13 +643,18 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} }
} }
private MemorySegment dbGet(REntry<Transaction> tx, private MemorySegment dbGet(Tx tx,
ColumnInstance col, ColumnInstance col,
Arena arena, Arena arena,
ReadOptions readOptions, ReadOptions readOptions,
MemorySegment calculatedKey) throws RocksDBException { MemorySegment calculatedKey) throws RocksDBException {
if (tx != null) { if (tx != null) {
var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); byte[] previousRawBucketByteArray;
if (tx.isFromGetForUpdate()) {
previousRawBucketByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
} else {
previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
}
return toMemorySegment(arena, previousRawBucketByteArray); return toMemorySegment(arena, previousRawBucketByteArray);
} else { } else {
var db = this.db.get(); var db = this.db.get();
@ -632,9 +709,13 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} }
} }
private REntry<Transaction> getTransaction(long transactionId) { private Tx getTransaction(long transactionId, boolean allowGetForUpdate) {
var tx = txs.get(transactionId); var tx = txs.get(transactionId);
if (tx != null) { if (tx != null) {
if (!allowGetForUpdate && tx.isFromGetForUpdate()) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.RESTRICTED_TRANSACTION,
"Can't get this transaction, it's for internal use only");
}
return tx; return tx;
} else { } else {
throw new NoSuchElementException("No transaction with id " + transactionId); throw new NoSuchElementException("No transaction with id " + transactionId);

View File

@ -0,0 +1,15 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import java.io.Closeable;
import org.rocksdb.AbstractNativeReference;
import org.rocksdb.Transaction;
public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs)
implements Closeable {
@Override
public void close() {
val.close();
objs.close();
}
}

View File

@ -7,6 +7,8 @@ import it.cavallium.rockserver.core.common.Callback;
import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Delta; import it.cavallium.rockserver.core.common.Delta;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBRetryException;
import it.cavallium.rockserver.core.common.Utils; import it.cavallium.rockserver.core.common.Utils;
import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.ObjectList; import it.unimi.dsi.fastutil.objects.ObjectList;
@ -329,6 +331,48 @@ abstract class EmbeddedDBTest {
Assertions.assertTrue(db.get(arena, 0, colId, key2, Callback.exists())); Assertions.assertTrue(db.get(arena, 0, colId, key2, Callback.exists()));
} }
@Test
void update() {
if (getHasValues()) {
var forUpdate = db.get(arena, 0, colId, key1, Callback.forUpdate());
Assertions.assertNull(forUpdate.previous());
Assertions.assertTrue(forUpdate.updateId() != 0);
db.put(arena, forUpdate.updateId(), colId, key1, value1, Callback.none());
Assertions.assertThrows(Exception.class, () -> db.put(arena, forUpdate.updateId(), colId, key1, value2, Callback.none()));
}
}
@Test
void concurrentUpdate() {
if (getHasValues()) {
{
var forUpdate1 = db.get(arena, 0, colId, key1, Callback.forUpdate());
try {
var forUpdate2 = db.get(arena, 0, colId, key1, Callback.forUpdate());
try {
db.put(arena, forUpdate1.updateId(), colId, key1, value1, Callback.none());
Assertions.assertThrowsExactly(RocksDBRetryException.class, () -> db.put(arena, forUpdate2.updateId(), colId, key1, value2, Callback.none()));
// Retrying
var forUpdate3 = db.get(arena, forUpdate2.updateId(), colId, key1, Callback.forUpdate());
try {
assertSegmentEquals(value1, forUpdate3.previous());
db.put(arena, forUpdate3.updateId(), colId, key1, value2, Callback.none());
} catch (Throwable ex3) {
db.closeFailedUpdate(forUpdate3.updateId());
throw ex3;
}
} catch (Throwable ex2) {
db.closeFailedUpdate(forUpdate2.updateId());
throw ex2;
}
} catch (Throwable ex) {
throw ex;
}
}
}
}
public static void assertSegmentEquals(MemorySegment expected, MemorySegment input) { public static void assertSegmentEquals(MemorySegment expected, MemorySegment input) {
if (!Utils.valueEquals(expected, input)) { if (!Utils.valueEquals(expected, input)) {
Assertions.fail( Assertions.fail(