Better callbacks

This commit is contained in:
Andrea Cavalli 2023-12-07 16:54:12 +01:00
parent a505b3500f
commit 73c1fa3ea7
13 changed files with 224 additions and 139 deletions

View File

@ -41,8 +41,8 @@ public class EmbeddedConnection extends BaseConnection {
}
@Override
public void closeTransaction(long transactionId) {
db.closeTransaction(transactionId);
public boolean closeTransaction(long transactionId, boolean commit) {
return db.closeTransaction(transactionId, commit);
}
@Override
@ -61,18 +61,20 @@ public class EmbeddedConnection extends BaseConnection {
}
@Override
public void put(long transactionId,
public <T> T put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws RocksDBException {
db.put(transactionId, columnId, keys, value, callback);
PutCallback<? super MemorySegment, T> callback) throws RocksDBException {
return db.put(transactionId, columnId, keys, value, callback);
}
@Override
public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback<? super MemorySegment> callback)
throws RocksDBException {
db.get(transactionId, columnId, keys, callback);
public <T> T get(long transactionId,
long columnId,
MemorySegment[] keys,
GetCallback<? super MemorySegment, T> callback) throws RocksDBException {
return db.get(transactionId, columnId, keys, callback);
}
@Override
@ -96,10 +98,10 @@ public class EmbeddedConnection extends BaseConnection {
}
@Override
public void subsequent(long iterationId,
public <T> T subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws RocksDBException {
db.subsequent(iterationId, skipCount, takeCount, callback);
IteratorCallback<? super MemorySegment, T> callback) throws RocksDBException {
return db.subsequent(iterationId, skipCount, takeCount, callback);
}
}

View File

@ -33,7 +33,7 @@ public abstract class SocketConnection extends BaseConnection {
}
@Override
public void closeTransaction(long transactionId) {
public boolean closeTransaction(long transactionId, boolean commit) {
throw new UnsupportedOperationException();
}
@ -53,17 +53,19 @@ public abstract class SocketConnection extends BaseConnection {
}
@Override
public void put(long transactionId,
public <T> T put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws RocksDBException {
PutCallback<? super MemorySegment, T> callback) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback<? super MemorySegment> callback)
throws RocksDBException {
public <T> T get(long transactionId,
long columnId,
MemorySegment[] keys,
GetCallback<? super MemorySegment, T> callback) throws RocksDBException {
throw new UnsupportedOperationException();
}
@ -88,10 +90,10 @@ public abstract class SocketConnection extends BaseConnection {
}
@Override
public void subsequent(long iterationId,
public <T> T subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws RocksDBException {
IteratorCallback<? super MemorySegment, T> callback) throws RocksDBException {
throw new UnsupportedOperationException();
}
}

View File

@ -1,58 +1,58 @@
package it.cavallium.rockserver.core.common;
import java.lang.foreign.MemorySegment;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.Nullable;
public sealed interface Callback<T> {
public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> {
static boolean requiresGettingPreviousValue(PutCallback<?> callback) {
static boolean requiresGettingPreviousValue(PutCallback<?, ?> callback) {
return callback instanceof CallbackPrevious<?>
|| callback instanceof CallbackDelta<?>
|| callback instanceof CallbackChanged;
}
static boolean requiresGettingCurrentValue(GetCallback<?> callback) {
static boolean requiresGettingCurrentValue(GetCallback<?, ?> callback) {
return callback instanceof CallbackCurrent<?>;
}
sealed interface PutCallback<T> extends Callback<T> {}
sealed interface PatchCallback<T> extends Callback<T> {}
sealed interface GetCallback<T> extends Callback<T> {}
sealed interface IteratorCallback<T> extends Callback<T> {}
non-sealed interface CallbackVoid<T> extends PutCallback<T>, PatchCallback<T>, IteratorCallback<T>, GetCallback<T> {}
non-sealed interface CallbackPrevious<T> extends PutCallback<T> {
void onPrevious(@Nullable T previous);
static <T> CallbackPrevious<T> previous() {
// todo: create static instance
return new CallbackPrevious<>();
}
non-sealed interface CallbackCurrent<T> extends GetCallback<T> {
void onCurrent(@Nullable T previous);
static <T> CallbackDelta<T> delta() {
// todo: create static instance
return new CallbackDelta<>();
}
non-sealed interface CallbackExists<T> extends GetCallback<T>, IteratorCallback<T> {
void onExists(boolean exists);
static <U> U safeCast(Object previousValue) {
//noinspection unchecked
return (U) previousValue;
}
non-sealed interface CallbackDelta<T> extends PutCallback<T> {
sealed interface PutCallback<T, U> extends Callback<T, U> {}
void onSuccess(Delta<T> previous);
sealed interface PatchCallback<T, U> extends Callback<T, U> {}
sealed interface GetCallback<T, U> extends Callback<T, U> {}
sealed interface IteratorCallback<T, U> extends Callback<T, U> {}
record CallbackVoid<T>() implements PutCallback<T, Void>, PatchCallback<T, Void>, IteratorCallback<T, Void>, GetCallback<T, Void> {}
record CallbackPrevious<T>() implements PutCallback<T, @Nullable T> {}
record CallbackCurrent<T>() implements GetCallback<T, @Nullable T> {}
record CallbackExists<T>() implements GetCallback<T, Boolean>, IteratorCallback<T, Boolean> {}
record CallbackDelta<T>() implements PutCallback<T, Delta<T>> {
}
non-sealed interface CallbackMulti<T> extends IteratorCallback<T> {
record CallbackMulti<M>() implements IteratorCallback<M, List<M>> {}
void onSuccess(List<Entry<T, T>> elements);
}
non-sealed interface CallbackChanged extends PutCallback<Object>, PatchCallback<Object> {
void onChanged(boolean changed);
}
record CallbackChanged<T>() implements PutCallback<T, Boolean>, PatchCallback<T, Boolean> {}
}

View File

@ -17,9 +17,12 @@ public interface RocksDBAPI {
/**
* Close a transaction
*
* @param transactionId transaction id to close
* @param commit true to commit the transaction, false to rollback it
* @return true if committed, if false, you should try again
*/
void closeTransaction(long transactionId) throws RocksDBException;
boolean closeTransaction(long transactionId, boolean commit) throws RocksDBException;
/**
* Create a column
@ -50,11 +53,11 @@ public interface RocksDBAPI {
* @param value value, or null if not needed
* @param callback the callback will be executed on the same thread, exactly once.
*/
void put(long transactionId,
<T> T put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws RocksDBException;
PutCallback<? super MemorySegment, T> callback) throws RocksDBException;
/**
* Get an element from the specified position
@ -63,10 +66,10 @@ public interface RocksDBAPI {
* @param keys column keys, or empty array if not needed
* @param callback the callback will be executed on the same thread, exactly once.
*/
void get(long transactionId,
<T> T get(long transactionId,
long columnId,
MemorySegment[] keys,
GetCallback<? super MemorySegment> callback) throws RocksDBException;
GetCallback<? super MemorySegment, T> callback) throws RocksDBException;
/**
* Open an iterator
@ -105,8 +108,8 @@ public interface RocksDBAPI {
* @param takeCount number of elements to take
* @param callback the callback will be executed on the same thread, exactly once.
*/
void subsequent(long iterationId,
<T> T subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws RocksDBException;
IteratorCallback<? super MemorySegment, T> callback) throws RocksDBException;
}

View File

@ -5,7 +5,7 @@ public class RocksDBException extends RuntimeException {
private final RocksDBErrorType errorUniqueId;
public enum RocksDBErrorType {
PUT_UNKNOWN, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, ROCKSDB_CONFIG_ERROR, VALUE_MUST_BE_NULL, DIRECTORY_DELETE, KEY_LENGTH_MISMATCH, UNSUPPORTED_HASH_SIZE, RAW_KEY_LENGTH_MISMATCH, KEYS_COUNT_MISMATCH, ROCKSDB_LOAD_ERROR
PUT_UNKNOWN_ERROR, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, ROCKSDB_CONFIG_ERROR, VALUE_MUST_BE_NULL, DIRECTORY_DELETE, KEY_LENGTH_MISMATCH, UNSUPPORTED_HASH_SIZE, RAW_KEY_LENGTH_MISMATCH, KEYS_COUNT_MISMATCH, COMMIT_FAILED_TRY_AGAIN, COMMIT_FAILED, TX_NOT_FOUND, ROCKSDB_LOAD_ERROR
}

View File

@ -1,6 +1,8 @@
package it.cavallium.rockserver.core.common;
import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import static java.lang.foreign.MemorySegment.NULL;
import static java.util.Objects.requireNonNullElse;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
@ -81,4 +83,10 @@ public class Utils {
throw new RocksDBException(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e);
}
}
public static boolean valueEquals(MemorySegment previousValue, MemorySegment currentValue) {
previousValue = requireNonNullElse(previousValue, NULL);
currentValue = requireNonNullElse(currentValue, NULL);
return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize()) == -1;
}
}

View File

@ -1,5 +1,9 @@
package it.cavallium.rockserver.core.impl;
import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_INT;
import static java.lang.Math.toIntExact;
import it.cavallium.rockserver.core.common.Utils;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
@ -23,11 +27,11 @@ public class Bucket {
this.elements = new ArrayList<>();
long rawBucketSegmentByteSize = rawBucketSegment.byteSize();
if (rawBucketSegmentByteSize > 0) {
var elements = rawBucketSegment.get(ValueLayout.JAVA_INT_UNALIGNED, offset);
var elements = rawBucketSegment.get(ColumnInstance.BIG_ENDIAN_INT_UNALIGNED, offset);
offset += Integer.BYTES;
int elementI = 0;
while (elementI < elements) {
var elementKVSize = rawBucketSegment.get(ValueLayout.JAVA_INT_UNALIGNED, offset);
var elementKVSize = rawBucketSegment.get(ColumnInstance.BIG_ENDIAN_INT_UNALIGNED, offset);
offset += Integer.BYTES;
MemorySegment[] bucketElementKeys;
@ -37,7 +41,7 @@ public class Bucket {
int readKeys = 0;
bucketElementKeys = new MemorySegment[col.schema().variableLengthKeysCount()];
while (readKeys < col.schema().variableLengthKeysCount()) {
var keyISize = elementKVSegment.get(ValueLayout.JAVA_CHAR_UNALIGNED, segmentOffset);
var keyISize = elementKVSegment.get(ColumnInstance.BIG_ENDIAN_CHAR_UNALIGNED, segmentOffset);
segmentOffset += Character.BYTES;
var elementKeyISegment = elementKVSegment.asSlice(segmentOffset, keyISize);
bucketElementKeys[readKeys] = elementKeyISegment;
@ -55,7 +59,7 @@ public class Bucket {
}
var entry = Map.entry(bucketElementKeys, bucketElementValues);
this.elements.add(entry);
offset += segmentOffset;
}
elementI++;
@ -123,7 +127,7 @@ public class Bucket {
var arrayKeys = elem.getKey();
assert arrayKeys.length == bucketVariableKeys.length;
for (int j = 0; j < arrayKeys.length; j++) {
if (MemorySegment.mismatch(arrayKeys[j], 0, arrayKeys[j].byteSize(), bucketVariableKeys[j], 0, bucketVariableKeys[j].byteSize()) == -1) {
if (Utils.valueEquals(arrayKeys[j], bucketVariableKeys[j])) {
return i;
}
}
@ -145,14 +149,16 @@ public class Bucket {
}
long totalSize = Integer.BYTES;
for (MemorySegment serializedElement : serializedElements) {
totalSize += serializedElement.byteSize();
totalSize += Integer.BYTES + serializedElement.byteSize();
}
var segment = arena.allocate(totalSize);
long offset = 0;
segment.set(ColumnInstance.BIG_ENDIAN_INT, offset, serializedElements.length);
segment.set(BIG_ENDIAN_INT, offset, serializedElements.length);
offset += Integer.BYTES;
for (MemorySegment elementAtI : serializedElements) {
var elementSize = elementAtI.byteSize();
segment.set(BIG_ENDIAN_INT, offset, toIntExact(elementSize));
offset += Integer.BYTES;
MemorySegment.copy(elementAtI, 0, segment, offset, elementSize);
offset += elementSize;
}

View File

@ -159,12 +159,11 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
checkNullableValue(computedBucketElementValue);
var keySize = computedBucketElementKey.byteSize();
var valueSize = computedBucketElementValue != null ? computedBucketElementValue.byteSize() : 0;
var totalSize = Integer.BYTES + keySize + valueSize;
var totalSize = keySize + valueSize;
var computedBucketElementKV = arena.allocate(totalSize);
computedBucketElementKV.set(BIG_ENDIAN_INT, 0, toIntExact(totalSize));
MemorySegment.copy(computedBucketElementKey, 0, computedBucketElementKV, Integer.BYTES, keySize);
MemorySegment.copy(computedBucketElementKey, 0, computedBucketElementKV, 0, keySize);
if (computedBucketElementValue != null) {
MemorySegment.copy(computedBucketElementValue, 0, computedBucketElementKV, Integer.BYTES + keySize, valueSize);
MemorySegment.copy(computedBucketElementValue, 0, computedBucketElementKV, keySize, valueSize);
}
return computedBucketElementKV;
}

View File

@ -7,6 +7,9 @@ import static java.util.Objects.requireNonNullElse;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.Callback.CallbackDelta;
import it.cavallium.rockserver.core.common.Callback.CallbackPrevious;
import it.cavallium.rockserver.core.common.Callback.CallbackVoid;
import it.cavallium.rockserver.core.common.Callback.GetCallback;
import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
import it.cavallium.rockserver.core.common.Callback.PutCallback;
@ -35,6 +38,7 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
@ -44,6 +48,7 @@ import org.github.gestalt.config.source.ClassPathConfigSource;
import org.github.gestalt.config.source.FileConfigSource;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.*;
import org.rocksdb.Status.Code;
public class EmbeddedDB implements RocksDBAPI, Closeable {
@ -154,13 +159,16 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
@Override
public long openTransaction(long timeoutMs) {
return FastRandomUtils.allocateNewValue(txs, openTransactionInternal(timeoutMs), Long.MIN_VALUE, -2);
}
private REntry<Transaction> openTransactionInternal(long timeoutMs) {
// Open the transaction operation, do not close until the transaction has been closed
ops.beginOp();
try {
TransactionalOptions txOpts = db.createTransactionalOptions();
TransactionalOptions txOpts = db.createTransactionalOptions(timeoutMs);
var writeOpts = new WriteOptions();
var tx = new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts));
return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2);
return new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts));
} catch (Throwable ex) {
ops.endOp();
throw ex;
@ -168,25 +176,71 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
}
@Override
public void closeTransaction(long transactionId) {
public boolean closeTransaction(long transactionId, boolean commit) {
var tx = txs.get(transactionId);
if (tx != null) {
try {
var committed = closeTransaction(tx, commit);
if (committed) {
txs.remove(transactionId, tx);
}
return committed;
} catch (Throwable ex) {
txs.remove(transactionId, tx);
throw ex;
}
} else {
// Transaction not found
if (commit) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.TX_NOT_FOUND, "Transaction not found: " + transactionId);
} else {
return true;
}
}
}
private boolean closeTransaction(REntry<Transaction> tx, boolean commit) {
ops.beginOp();
try {
var tx = txs.remove(transactionId);
if (tx != null) {
try {
tx.close();
} finally {
// Close the transaction operation
ops.endOp();
// Transaction found
try {
if (commit) {
if (!commitTxOptimistically(tx)) {
// Do not call endOp here, since the transaction is still open
return false;
}
} else {
if (tx.val().isOwningHandle()) {
tx.val().rollback();
}
}
} else {
throw new NoSuchElementException("Transaction not found: " + transactionId);
tx.close();
// Close the transaction operation
ops.endOp();
return true;
} catch (RocksDBException e) {
// Close the transaction operation
ops.endOp();
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COMMIT_FAILED, "Transaction close failed");
}
} finally {
ops.endOp();
}
}
private boolean commitTxOptimistically(REntry<Transaction> tx) throws RocksDBException {
try {
tx.val().commit();
return true;
} catch (RocksDBException ex) {
var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok;
if (status == Code.Busy || status == Code.TryAgain) {
return false;
}
throw ex;
}
}
@Override
public long createColumn(String name, ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
@ -256,31 +310,47 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
}
@Override
public void put(long transactionId,
public <T> T put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
PutCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try (var arena = Arena.ofConfined()) {
// Column id
var col = getColumn(columnId);
// Check for null value
col.checkNullableValue(value);
MemorySegment previousValue;
REntry<Transaction> tx;
if (transactionId != 0) {
tx = getTransaction(transactionId);
} else {
tx = null;
}
return put(arena, tx, col, keys, value, callback);
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} finally {
ops.endOp();
}
}
private <U> U put(Arena arena,
REntry<Transaction> tx,
ColumnInstance col,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment, U> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
// Check for null value
col.checkNullableValue(value);
try {
MemorySegment previousValue;
MemorySegment calculatedKey = col.calculateKey(arena, keys);
if (col.hasBuckets()) {
if (tx != null) {
var bucketElementKeys = col.getBucketElementKeys(keys);
try (var readOptions = new ReadOptions()) {
var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
var previousRawBucketByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
MemorySegment previousRawBucket = toMemorySegment(previousRawBucketByteArray);
var bucket = new Bucket(col, previousRawBucket);
previousValue = bucket.addElement(bucketElementKeys, value);
@ -290,12 +360,15 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
}
} else {
// Retry using a transaction: transactions are required to handle this kind of data
var newTransactionId = this.openTransaction(Long.MAX_VALUE);
var newTx = this.openTransactionInternal(Long.MAX_VALUE);
try {
put(newTransactionId, columnId, keys, value, callback);
return;
boolean committed;
do {
previousValue = put(arena, newTx, col, keys, value, Callback.previous());
committed = this.closeTransaction(newTx, true);
} while (!committed);
} finally {
this.closeTransaction(newTransactionId);
this.closeTransaction(newTx, false);
}
}
} else {
@ -322,26 +395,24 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
}
}
}
switch (callback) {
case Callback.CallbackVoid<? super MemorySegment> ignored -> {}
case Callback.CallbackPrevious<? super MemorySegment> c -> c.onPrevious(previousValue);
case Callback.CallbackChanged c -> c.onChanged(valueEquals(previousValue, value));
case Callback.CallbackDelta<? super MemorySegment> c -> c.onSuccess(new Delta<>(previousValue, value));
default -> throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_3,
"Unexpected value: " + callback);
}
return Callback.safeCast(switch (callback) {
case Callback.CallbackVoid<?> ignored -> null;
case Callback.CallbackPrevious<?> ignored -> previousValue;
case Callback.CallbackChanged<?> ignored -> Utils.valueEquals(previousValue, value);
case Callback.CallbackDelta<?> ignored -> new Delta<>(previousValue, value);
});
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN, ex);
} finally {
ops.endOp();
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
}
}
@Override
public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback<? super MemorySegment> callback)
throws it.cavallium.rockserver.core.common.RocksDBException {
public <T> T get(long transactionId,
long columnId,
MemorySegment[] keys,
GetCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try (var arena = Arena.ofConfined()) {
// Column id
@ -368,7 +439,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
}
} else {
boolean shouldGetCurrent = Callback.requiresGettingCurrentValue(callback)
|| (tx != null && callback instanceof Callback.CallbackExists<? super MemorySegment>);
|| (tx != null && callback instanceof Callback.CallbackExists<?>);
if (shouldGetCurrent) {
try (var readOptions = new ReadOptions()) {
foundValue = dbGet(tx, col, arena, readOptions, calculatedKey);
@ -376,7 +447,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e);
}
} else if (callback instanceof Callback.CallbackExists<? super MemorySegment>) {
} else if (callback instanceof Callback.CallbackExists<?>) {
// tx is always null here
//noinspection ConstantValue
assert tx == null;
@ -387,17 +458,15 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
existsValue = false;
}
}
switch (callback) {
case Callback.CallbackVoid<? super MemorySegment> ignored -> {}
case Callback.CallbackCurrent<? super MemorySegment> c -> c.onCurrent(foundValue);
case Callback.CallbackExists<? super MemorySegment> c -> c.onExists(existsValue);
default -> throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_3,
"Unexpected value: " + callback);
}
return Callback.safeCast(switch (callback) {
case Callback.CallbackVoid<?> ignored -> null;
case Callback.CallbackCurrent<?> ignored -> foundValue;
case Callback.CallbackExists<?> ignored -> existsValue;
});
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN, ex);
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} finally {
ops.endOp();
}
@ -452,10 +521,10 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
}
@Override
public void subsequent(long iterationId,
public <T> T subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
IteratorCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try {
throw new UnsupportedOperationException();
@ -515,12 +584,6 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
};
}
private boolean valueEquals(MemorySegment previousValue, MemorySegment currentValue) {
previousValue = requireNonNullElse(previousValue, NULL);
currentValue = requireNonNullElse(currentValue, NULL);
return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize()) != -1;
}
private ColumnInstance getColumn(long columnId) {
var col = columns.get(columnId);
if (col != null) {

View File

@ -2,6 +2,7 @@ package it.cavallium.rockserver.core.impl.rocksdb;
import java.io.Closeable;
import org.rocksdb.AbstractNativeReference;
import org.rocksdb.Status.Code;
public record REntry<T extends AbstractNativeReference>(T val, RocksDBObjects objs) implements Closeable {

View File

@ -21,7 +21,7 @@ public sealed interface TransactionalDB extends Closeable {
};
}
TransactionalOptions createTransactionalOptions();
TransactionalOptions createTransactionalOptions(long timeoutMs);
String getPath();
@ -100,8 +100,8 @@ public sealed interface TransactionalDB extends Closeable {
}
@Override
public TransactionalOptions createTransactionalOptions() {
return new TransactionalOptionsPessimistic(new TransactionOptions());
public TransactionalOptions createTransactionalOptions(long timeoutMs) {
return new TransactionalOptionsPessimistic(new TransactionOptions().setExpiration(timeoutMs));
}
@Override
@ -171,7 +171,7 @@ public sealed interface TransactionalDB extends Closeable {
}
@Override
public TransactionalOptions createTransactionalOptions() {
public TransactionalOptions createTransactionalOptions(long timeoutMs) {
return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions());
}

View File

@ -1,6 +1,7 @@
package it.cavallium.rockserver.core.impl.test;
import it.cavallium.rockserver.core.client.EmbeddedConnection;
import it.cavallium.rockserver.core.common.Callback;
import it.cavallium.rockserver.core.common.Callback.CallbackDelta;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Utils;
@ -40,16 +41,15 @@ class EmbeddedDBTest {
Utils.toMemorySegment(new byte[] {0, 0, 3, 6, 7, 8})
};
var value1 = MemorySegment.ofArray(new byte[] {0, 0, 3});
AtomicInteger callbackCalled = new AtomicInteger();
db.put(0, colId, key, value1, (CallbackDelta<MemorySegment>) prev -> {
callbackCalled.incrementAndGet();
Assertions.assertNull(prev);
});
Assertions.assertEquals(1, callbackCalled.get());
db.put(0, colId, key, MemorySegment.ofArray(new byte[] {0, 0, 5}), (CallbackDelta<MemorySegment>) prev -> {
callbackCalled.incrementAndGet();
Utils.(value1);
});
var value2 = MemorySegment.ofArray(new byte[] {0, 0, 5});
var delta = db.put(0, colId, key, value1, Callback.delta());
Assertions.assertNull(delta.previous());
Assertions.assertTrue(Utils.valueEquals(delta.current(), value1));
delta = db.put(0, colId, key, value2, Callback.delta());
Assertions.assertTrue(Utils.valueEquals(delta.previous(), value1));
Assertions.assertTrue(Utils.valueEquals(delta.current(), value2));
}
@org.junit.jupiter.api.Test

View File

@ -1,5 +1,6 @@
package it.cavallium.rockserver.core.test;
import it.cavallium.rockserver.core.impl.ColumnInstance;
import it.cavallium.rockserver.core.impl.XXHash32;
import java.lang.foreign.Arena;
import java.lang.foreign.ValueLayout;
@ -26,7 +27,7 @@ public class XXHash32Test {
var hash = safeXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE);
var a = Arena.global();
var result = myXxhash32.hash(a, a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE);
var resultInt = result.get(ValueLayout.JAVA_INT.withOrder(ByteOrder.BIG_ENDIAN), 0);
var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0);
Assertions.assertEquals(hash, resultInt);
}
}