Code cleanup
This commit is contained in:
parent
34c7ca4144
commit
a803bad343
@ -7,6 +7,7 @@ import it.cavallium.rockserver.core.common.ColumnSchema;
|
||||
import it.cavallium.rockserver.core.common.RocksDBException;
|
||||
import it.cavallium.rockserver.core.impl.EmbeddedDB;
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Path;
|
||||
@ -61,30 +62,33 @@ public class EmbeddedConnection extends BaseConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T put(long transactionId,
|
||||
public <T> T put(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
@Nullable MemorySegment value,
|
||||
PutCallback<? super MemorySegment, T> callback) throws RocksDBException {
|
||||
return db.put(transactionId, columnId, keys, value, callback);
|
||||
return db.put(arena, transactionId, columnId, keys, value, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T get(long transactionId,
|
||||
public <T> T get(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
GetCallback<? super MemorySegment, T> callback) throws RocksDBException {
|
||||
return db.get(transactionId, columnId, keys, callback);
|
||||
return db.get(arena, transactionId, columnId, keys, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long openIterator(long transactionId,
|
||||
public long openIterator(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] startKeysInclusive,
|
||||
@Nullable MemorySegment[] endKeysExclusive,
|
||||
boolean reverse,
|
||||
long timeoutMs) throws RocksDBException {
|
||||
return db.openIterator(transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs);
|
||||
return db.openIterator(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -93,15 +97,16 @@ public class EmbeddedConnection extends BaseConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException {
|
||||
db.seekTo(iterationId, keys);
|
||||
public void seekTo(Arena arena, long iterationId, MemorySegment[] keys) throws RocksDBException {
|
||||
db.seekTo(arena, iterationId, keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T subsequent(long iterationId,
|
||||
public <T> T subsequent(Arena arena,
|
||||
long iterationId,
|
||||
long skipCount,
|
||||
long takeCount,
|
||||
IteratorCallback<? super MemorySegment, T> callback) throws RocksDBException {
|
||||
return db.subsequent(iterationId, skipCount, takeCount, callback);
|
||||
return db.subsequent(arena, iterationId, skipCount, takeCount, callback);
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import it.cavallium.rockserver.core.common.Callback.PutCallback;
|
||||
import it.cavallium.rockserver.core.common.ColumnSchema;
|
||||
import it.cavallium.rockserver.core.common.RocksDBException;
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.net.SocketAddress;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
@ -53,7 +54,8 @@ public abstract class SocketConnection extends BaseConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T put(long transactionId,
|
||||
public <T> T put(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
@Nullable MemorySegment value,
|
||||
@ -62,7 +64,8 @@ public abstract class SocketConnection extends BaseConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T get(long transactionId,
|
||||
public <T> T get(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
GetCallback<? super MemorySegment, T> callback) throws RocksDBException {
|
||||
@ -70,7 +73,8 @@ public abstract class SocketConnection extends BaseConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long openIterator(long transactionId,
|
||||
public long openIterator(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] startKeysInclusive,
|
||||
@Nullable MemorySegment[] endKeysExclusive,
|
||||
@ -85,12 +89,13 @@ public abstract class SocketConnection extends BaseConnection {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException {
|
||||
public void seekTo(Arena arena, long iterationId, MemorySegment[] keys) throws RocksDBException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T subsequent(long iterationId,
|
||||
public <T> T subsequent(Arena arena,
|
||||
long iterationId,
|
||||
long skipCount,
|
||||
long takeCount,
|
||||
IteratorCallback<? super MemorySegment, T> callback) throws RocksDBException {
|
||||
|
@ -18,21 +18,46 @@ public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> {
|
||||
return callback instanceof CallbackCurrent<?>;
|
||||
}
|
||||
|
||||
static <T> CallbackPrevious<T> previous() {
|
||||
// todo: create static instance
|
||||
return new CallbackPrevious<>();
|
||||
}
|
||||
|
||||
static <T> CallbackDelta<T> delta() {
|
||||
// todo: create static instance
|
||||
return new CallbackDelta<>();
|
||||
}
|
||||
|
||||
static <U> U safeCast(Object previousValue) {
|
||||
//noinspection unchecked
|
||||
return (U) previousValue;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T> CallbackPrevious<T> previous() {
|
||||
return (CallbackPrevious<T>) CallbackPrevious.INSTANCE;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T> CallbackCurrent<T> current() {
|
||||
return (CallbackCurrent<T>) CallbackCurrent.INSTANCE;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T> CallbackDelta<T> delta() {
|
||||
return (CallbackDelta<T>) CallbackDelta.INSTANCE;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T> CallbackExists<T> exists() {
|
||||
return (CallbackExists<T>) CallbackExists.INSTANCE;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T> CallbackMulti<T> multi() {
|
||||
return (CallbackMulti<T>) CallbackMulti.INSTANCE;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T> CallbackChanged<T> changed() {
|
||||
return (CallbackChanged<T>) CallbackChanged.INSTANCE;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T> CallbackVoid<T> none() {
|
||||
return (CallbackVoid<T>) CallbackVoid.INSTANCE;
|
||||
}
|
||||
|
||||
sealed interface PutCallback<T, U> extends Callback<T, U> {}
|
||||
|
||||
sealed interface PatchCallback<T, U> extends Callback<T, U> {}
|
||||
@ -41,18 +66,38 @@ public sealed interface Callback<METHOD_DATA_TYPE, RESULT_TYPE> {
|
||||
|
||||
sealed interface IteratorCallback<T, U> extends Callback<T, U> {}
|
||||
|
||||
record CallbackVoid<T>() implements PutCallback<T, Void>, PatchCallback<T, Void>, IteratorCallback<T, Void>, GetCallback<T, Void> {}
|
||||
record CallbackVoid<T>() implements PutCallback<T, Void>, PatchCallback<T, Void>, IteratorCallback<T, Void>, GetCallback<T, Void> {
|
||||
|
||||
record CallbackPrevious<T>() implements PutCallback<T, @Nullable T> {}
|
||||
|
||||
record CallbackCurrent<T>() implements GetCallback<T, @Nullable T> {}
|
||||
|
||||
record CallbackExists<T>() implements GetCallback<T, Boolean>, IteratorCallback<T, Boolean> {}
|
||||
|
||||
record CallbackDelta<T>() implements PutCallback<T, Delta<T>> {
|
||||
private static final CallbackVoid<Object> INSTANCE = new CallbackVoid<>();
|
||||
}
|
||||
|
||||
record CallbackMulti<M>() implements IteratorCallback<M, List<M>> {}
|
||||
record CallbackPrevious<T>() implements PutCallback<T, @Nullable T> {
|
||||
|
||||
record CallbackChanged<T>() implements PutCallback<T, Boolean>, PatchCallback<T, Boolean> {}
|
||||
private static final CallbackPrevious<Object> INSTANCE = new CallbackPrevious<>();
|
||||
}
|
||||
|
||||
record CallbackCurrent<T>() implements GetCallback<T, @Nullable T> {
|
||||
|
||||
private static final CallbackCurrent<Object> INSTANCE = new CallbackCurrent<>();
|
||||
}
|
||||
|
||||
record CallbackExists<T>() implements GetCallback<T, Boolean>, IteratorCallback<T, Boolean> {
|
||||
|
||||
private static final CallbackExists<Object> INSTANCE = new CallbackExists<>();
|
||||
}
|
||||
|
||||
record CallbackDelta<T>() implements PutCallback<T, Delta<T>> {
|
||||
|
||||
private static final CallbackDelta<Object> INSTANCE = new CallbackDelta<>();
|
||||
}
|
||||
|
||||
record CallbackMulti<M>() implements IteratorCallback<M, List<M>> {
|
||||
|
||||
private static final CallbackMulti<Object> INSTANCE = new CallbackMulti<>();
|
||||
}
|
||||
|
||||
record CallbackChanged<T>() implements PutCallback<T, Boolean>, PatchCallback<T, Boolean> {
|
||||
|
||||
private static final CallbackChanged<Object> INSTANCE = new CallbackChanged<>();
|
||||
}
|
||||
}
|
||||
|
@ -26,18 +26,18 @@ public record ColumnSchema(IntList keys, ObjectList<ColumnHashType> variableTail
|
||||
final int keysSize = keys.size();
|
||||
final int variableKeysSize = variableTailKeys.size();
|
||||
if (variableKeysSize > keysSize) {
|
||||
throw new RocksDBException(RocksDBErrorType.KEYS_COUNT_MISMATCH, "variable length keys count must be less or equal keysCount");
|
||||
throw RocksDBException.of(RocksDBErrorType.KEYS_COUNT_MISMATCH, "variable length keys count must be less or equal keysCount");
|
||||
}
|
||||
for (int i = 0; i < keysSize - variableKeysSize; i++) {
|
||||
if (keys.getInt(i) <= 0) {
|
||||
throw new RocksDBException(RocksDBErrorType.KEY_LENGTH_MISMATCH, "Key length must be > 0");
|
||||
throw RocksDBException.of(RocksDBErrorType.KEY_LENGTH_MISMATCH, "Key length must be > 0");
|
||||
}
|
||||
}
|
||||
for (int i = keysSize - variableKeysSize; i < keysSize; i++) {
|
||||
var hash = variableTailKeys.get(i - (keysSize - variableKeysSize));
|
||||
var keySize = keys.getInt(i);
|
||||
if (keySize != hash.bytesSize()) {
|
||||
throw new RocksDBException(RocksDBErrorType.KEY_HASH_SIZE_MISMATCH, "Key hash length of type " + hash + " must be " + hash.bytesSize() + ", but it's defined as " + keySize);
|
||||
throw RocksDBException.of(RocksDBErrorType.KEY_HASH_SIZE_MISMATCH, "Key hash length of type " + hash + " must be " + hash.bytesSize() + ", but it's defined as " + keySize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package it.cavallium.rockserver.core.common;
|
||||
import it.cavallium.rockserver.core.common.Callback.GetCallback;
|
||||
import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
|
||||
import it.cavallium.rockserver.core.common.Callback.PutCallback;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@ -47,13 +48,15 @@ public interface RocksDBAPI {
|
||||
|
||||
/**
|
||||
* Put an element into the specified position
|
||||
* @param arena arena
|
||||
* @param transactionId transaction id, or 0
|
||||
* @param columnId column id
|
||||
* @param keys column keys, or empty array if not needed
|
||||
* @param value value, or null if not needed
|
||||
* @param callback the callback will be executed on the same thread, exactly once.
|
||||
*/
|
||||
<T> T put(long transactionId,
|
||||
<T> T put(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
@Nullable MemorySegment value,
|
||||
@ -61,18 +64,21 @@ public interface RocksDBAPI {
|
||||
|
||||
/**
|
||||
* Get an element from the specified position
|
||||
* @param arena arena
|
||||
* @param transactionId transaction id, or 0
|
||||
* @param columnId column id
|
||||
* @param keys column keys, or empty array if not needed
|
||||
* @param callback the callback will be executed on the same thread, exactly once.
|
||||
*/
|
||||
<T> T get(long transactionId,
|
||||
<T> T get(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
GetCallback<? super MemorySegment, T> callback) throws RocksDBException;
|
||||
|
||||
/**
|
||||
* Open an iterator
|
||||
* @param arena arena
|
||||
* @param transactionId transaction id, or 0
|
||||
* @param columnId column id
|
||||
* @param startKeysInclusive start keys, inclusive. [] means "the beginning"
|
||||
@ -81,7 +87,8 @@ public interface RocksDBAPI {
|
||||
* @param timeoutMs timeout in milliseconds
|
||||
* @return iterator id
|
||||
*/
|
||||
long openIterator(long transactionId,
|
||||
long openIterator(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] startKeysInclusive,
|
||||
@Nullable MemorySegment[] endKeysExclusive,
|
||||
@ -96,19 +103,22 @@ public interface RocksDBAPI {
|
||||
|
||||
/**
|
||||
* Seek to the specific element during an iteration, or the subsequent one if not found
|
||||
* @param arena arena
|
||||
* @param iterationId iteration id
|
||||
* @param keys keys, inclusive. [] means "the beginning"
|
||||
*/
|
||||
void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException;
|
||||
void seekTo(Arena arena, long iterationId, MemorySegment[] keys) throws RocksDBException;
|
||||
|
||||
/**
|
||||
* Get the subsequent element during an iteration
|
||||
* @param arena arena
|
||||
* @param iterationId iteration id
|
||||
* @param skipCount number of elements to skip
|
||||
* @param takeCount number of elements to take
|
||||
* @param callback the callback will be executed on the same thread, exactly once.
|
||||
*/
|
||||
<T> T subsequent(long iterationId,
|
||||
<T> T subsequent(Arena arena,
|
||||
long iterationId,
|
||||
long skipCount,
|
||||
long takeCount,
|
||||
IteratorCallback<? super MemorySegment, T> callback) throws RocksDBException;
|
||||
|
@ -5,30 +5,70 @@ public class RocksDBException extends RuntimeException {
|
||||
private final RocksDBErrorType errorUniqueId;
|
||||
|
||||
public enum RocksDBErrorType {
|
||||
PUT_UNKNOWN_ERROR, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, ROCKSDB_CONFIG_ERROR, VALUE_MUST_BE_NULL, DIRECTORY_DELETE, KEY_LENGTH_MISMATCH, UNSUPPORTED_HASH_SIZE, RAW_KEY_LENGTH_MISMATCH, KEYS_COUNT_MISMATCH, COMMIT_FAILED_TRY_AGAIN, COMMIT_FAILED, TX_NOT_FOUND, KEY_HASH_SIZE_MISMATCH, ROCKSDB_LOAD_ERROR
|
||||
|
||||
PUT_UNKNOWN_ERROR,
|
||||
PUT_2,
|
||||
UNEXPECTED_NULL_VALUE,
|
||||
PUT_1, PUT_3,
|
||||
GET_1,
|
||||
COLUMN_EXISTS,
|
||||
COLUMN_CREATE_FAIL,
|
||||
COLUMN_NOT_FOUND,
|
||||
COLUMN_DELETE_FAIL,
|
||||
CONFIG_ERROR,
|
||||
ROCKSDB_CONFIG_ERROR,
|
||||
VALUE_MUST_BE_NULL,
|
||||
DIRECTORY_DELETE,
|
||||
KEY_LENGTH_MISMATCH,
|
||||
UNSUPPORTED_HASH_SIZE,
|
||||
RAW_KEY_LENGTH_MISMATCH,
|
||||
KEYS_COUNT_MISMATCH,
|
||||
COMMIT_FAILED_TRY_AGAIN,
|
||||
COMMIT_FAILED,
|
||||
TX_NOT_FOUND,
|
||||
KEY_HASH_SIZE_MISMATCH,
|
||||
ROCKSDB_LOAD_ERROR
|
||||
}
|
||||
|
||||
public RocksDBException(RocksDBErrorType errorUniqueId, String message) {
|
||||
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) {
|
||||
return new RocksDBException(errorUniqueId, message);
|
||||
}
|
||||
|
||||
public static RocksDBException of(RocksDBErrorType errorUniqueId, Throwable ex) {
|
||||
if (ex instanceof RocksDBException e) {
|
||||
return new RocksDBException(errorUniqueId, e);
|
||||
} else {
|
||||
return new RocksDBException(errorUniqueId, ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message, Throwable ex) {
|
||||
if (ex instanceof RocksDBException e) {
|
||||
return new RocksDBException(errorUniqueId, message, e);
|
||||
} else {
|
||||
return new RocksDBException(errorUniqueId, message, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private RocksDBException(RocksDBErrorType errorUniqueId, String message) {
|
||||
super(message);
|
||||
this.errorUniqueId = errorUniqueId;
|
||||
}
|
||||
|
||||
public RocksDBException(RocksDBErrorType errorUniqueId, String message, Throwable ex) {
|
||||
private RocksDBException(RocksDBErrorType errorUniqueId, String message, Throwable ex) {
|
||||
super(message, ex);
|
||||
this.errorUniqueId = errorUniqueId;
|
||||
}
|
||||
|
||||
public RocksDBException(RocksDBErrorType errorUniqueId, Throwable ex) {
|
||||
private RocksDBException(RocksDBErrorType errorUniqueId, Throwable ex) {
|
||||
super(ex.toString(), ex);
|
||||
this.errorUniqueId = errorUniqueId;
|
||||
}
|
||||
|
||||
public RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) {
|
||||
private RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) {
|
||||
this(errorUniqueId, ex.getMessage());
|
||||
}
|
||||
|
||||
public RocksDBException(RocksDBErrorType errorUniqueId, String message, org.rocksdb.RocksDBException ex) {
|
||||
private RocksDBException(RocksDBErrorType errorUniqueId, String message, org.rocksdb.RocksDBException ex) {
|
||||
this(errorUniqueId, message + ": " + ex.getMessage());
|
||||
}
|
||||
|
||||
|
@ -29,10 +29,11 @@ public class Utils {
|
||||
* @since 1.8
|
||||
*/
|
||||
public static char toCharExact(int value) {
|
||||
if ((char) value != value) {
|
||||
if (value < Character.MIN_VALUE || value > Character.MAX_VALUE) {
|
||||
throw new ArithmeticException("char overflow");
|
||||
} else {
|
||||
return (char) value;
|
||||
}
|
||||
return (char) value;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -44,10 +45,11 @@ public class Utils {
|
||||
* @since 1.8
|
||||
*/
|
||||
public static char toCharExact(long value) {
|
||||
if ((char) value != value) {
|
||||
if (value < Character.MIN_VALUE || value > Character.MAX_VALUE) {
|
||||
throw new ArithmeticException("char overflow");
|
||||
} else {
|
||||
return (char) value;
|
||||
}
|
||||
return (char) value;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@ -84,18 +86,17 @@ public class Utils {
|
||||
|
||||
public static void deleteDirectory(String path) throws RocksDBException {
|
||||
try (Stream<Path> pathStream = Files.walk(Path.of(path))) {
|
||||
pathStream.sorted(Comparator.reverseOrder())
|
||||
.forEach(f -> {
|
||||
try {
|
||||
Files.deleteIfExists(f);
|
||||
} catch (IOException e) {
|
||||
throw new RocksDBException(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e);
|
||||
}
|
||||
});
|
||||
pathStream.sorted(Comparator.reverseOrder()).forEach(f -> {
|
||||
try {
|
||||
Files.deleteIfExists(f);
|
||||
} catch (IOException e) {
|
||||
throw RocksDBException.of(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
throw new RocksDBException(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e);
|
||||
}
|
||||
}
|
||||
throw RocksDBException.of(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean valueEquals(MemorySegment previousValue, MemorySegment currentValue) {
|
||||
previousValue = requireNonNullElse(previousValue, NULL);
|
||||
|
@ -61,7 +61,7 @@ public class ConfigParser {
|
||||
|
||||
return gestalt.getConfig("database", DatabaseConfig.class);
|
||||
} catch (GestaltException ex) {
|
||||
throw new RocksDBException(RocksDBErrorType.CONFIG_ERROR, ex);
|
||||
throw RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ public class ConfigPrinter {
|
||||
try {
|
||||
return stringifyDatabase(config);
|
||||
} catch (GestaltException e) {
|
||||
throw new RocksDBException(RocksDBErrorType.CONFIG_ERROR, "Can't stringify config", e);
|
||||
throw RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Can't stringify config", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
|
||||
} else {
|
||||
finalKey = arena.allocate(finalKeySizeBytes);
|
||||
long offsetBytes = 0;
|
||||
for (int i = 0; i < schema.keys().size(); i++) {
|
||||
for (int i = 0; i < schema.keysCount(); i++) {
|
||||
var computedKeyAtI = computeKeyAt(arena, i, keys);
|
||||
var computedKeyAtISize = computedKeyAtI.byteSize();
|
||||
MemorySegment.copy(computedKeyAtI, 0, finalKey, offsetBytes, computedKeyAtISize);
|
||||
@ -81,9 +81,9 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
|
||||
}
|
||||
|
||||
private MemorySegment computeKeyAt(Arena arena, int i, MemorySegment[] keys) {
|
||||
if (i < schema.keys().size() - schema.variableLengthKeysCount()) {
|
||||
if (i < schema.keysCount() - schema.variableLengthKeysCount()) {
|
||||
if (keys[i].byteSize() != schema.key(i)) {
|
||||
throw new RocksDBException(RocksDBErrorType.KEY_LENGTH_MISMATCH,
|
||||
throw RocksDBException.of(RocksDBErrorType.KEY_LENGTH_MISMATCH,
|
||||
"Key at index " + i + " has a different length than expected! Expected: " + schema.key(i)
|
||||
+ ", received: " + keys[i].byteSize());
|
||||
}
|
||||
@ -98,16 +98,16 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
|
||||
|
||||
private void validateFinalKeySize(MemorySegment key) {
|
||||
if (finalKeySizeBytes != key.byteSize()) {
|
||||
throw new RocksDBException(RocksDBErrorType.RAW_KEY_LENGTH_MISMATCH,
|
||||
throw RocksDBException.of(RocksDBErrorType.RAW_KEY_LENGTH_MISMATCH,
|
||||
"Keys size must be equal to the column keys size. Expected: "
|
||||
+ finalKeySizeBytes + ", got: " + key.byteSize());
|
||||
}
|
||||
}
|
||||
|
||||
private void validateKeyCount(MemorySegment[] keys) {
|
||||
if (schema.keys().size() != keys.length) {
|
||||
throw new RocksDBException(RocksDBErrorType.KEYS_COUNT_MISMATCH,
|
||||
"Keys count must be equal to the column keys count. Expected: " + schema.keys().size()
|
||||
if (schema.keysCount() != keys.length) {
|
||||
throw RocksDBException.of(RocksDBErrorType.KEYS_COUNT_MISMATCH,
|
||||
"Keys count must be equal to the column keys count. Expected: " + schema.keysCount()
|
||||
+ ", got: " + keys.length);
|
||||
}
|
||||
}
|
||||
@ -143,10 +143,10 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
|
||||
public void checkNullableValue(MemorySegment value) {
|
||||
if (schema.hasValue() == (value == null)) {
|
||||
if (schema.hasValue()) {
|
||||
throw new RocksDBException(RocksDBErrorType.UNEXPECTED_NULL_VALUE,
|
||||
throw RocksDBException.of(RocksDBErrorType.UNEXPECTED_NULL_VALUE,
|
||||
"Schema expects a value, but a null value has been passed");
|
||||
} else {
|
||||
throw new RocksDBException(RocksDBErrorType.VALUE_MUST_BE_NULL,
|
||||
throw RocksDBException.of(RocksDBErrorType.VALUE_MUST_BE_NULL,
|
||||
"Schema expects no value, but a non-null value has been passed");
|
||||
}
|
||||
}
|
||||
@ -170,9 +170,9 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
|
||||
* Get only the variable-length keys
|
||||
*/
|
||||
public MemorySegment[] getBucketElementKeys(MemorySegment[] keys) {
|
||||
assert keys.length == schema.keys().size();
|
||||
assert keys.length == schema.keysCount();
|
||||
return Arrays.copyOfRange(keys,
|
||||
schema.keys().size() - schema.variableLengthKeysCount(),
|
||||
schema.keys().size());
|
||||
schema.keysCount() - schema.variableLengthKeysCount(),
|
||||
schema.keysCount());
|
||||
}
|
||||
}
|
||||
|
@ -36,11 +36,13 @@ import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.ColumnFamilyDescriptor;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
@ -173,7 +175,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
} else {
|
||||
// Transaction not found
|
||||
if (commit) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.TX_NOT_FOUND, "Transaction not found: " + transactionId);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.TX_NOT_FOUND, "Transaction not found: " + transactionId);
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
@ -202,7 +204,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
} catch (RocksDBException e) {
|
||||
// Close the transaction operation
|
||||
ops.endOp();
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COMMIT_FAILED, "Transaction close failed");
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COMMIT_FAILED, "Transaction close failed");
|
||||
}
|
||||
} finally {
|
||||
ops.endOp();
|
||||
@ -233,7 +235,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
if (schema.equals(col.schema())) {
|
||||
return colId;
|
||||
} else {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_EXISTS,
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_EXISTS,
|
||||
"Column exists, with a different schema: " + name
|
||||
);
|
||||
}
|
||||
@ -242,7 +244,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8)));
|
||||
return registerColumn(new ColumnInstance(cf, schema));
|
||||
} catch (RocksDBException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -261,7 +263,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
db.get().dropColumnFamily(col.cfh());
|
||||
unregisterColumn(columnId).close();
|
||||
} catch (RocksDBException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_DELETE_FAIL, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_DELETE_FAIL, e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@ -273,7 +275,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
public long getColumnId(String name) {
|
||||
var columnId = getColumnIdOrNull(name);
|
||||
if (columnId == null) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_NOT_FOUND,
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_NOT_FOUND,
|
||||
"Column not found: " + name);
|
||||
} else {
|
||||
return columnId;
|
||||
@ -291,13 +293,14 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T put(long transactionId,
|
||||
public <T> T put(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
@Nullable MemorySegment value,
|
||||
PutCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||
ops.beginOp();
|
||||
try (var arena = Arena.ofConfined()) {
|
||||
try {
|
||||
// Column id
|
||||
var col = getColumn(columnId);
|
||||
REntry<Transaction> tx;
|
||||
@ -310,7 +313,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
||||
} finally {
|
||||
ops.endOp();
|
||||
}
|
||||
@ -337,7 +340,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
previousValue = bucket.addElement(bucketElementKeys, value);
|
||||
tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(bucket.toSegment(arena)));
|
||||
} catch (RocksDBException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_1, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e);
|
||||
}
|
||||
} else {
|
||||
// Retry using a transaction: transactions are required to handle this kind of data
|
||||
@ -363,7 +366,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
}
|
||||
previousValue = toMemorySegment(previousValueByteArray);
|
||||
} catch (RocksDBException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
|
||||
}
|
||||
} else {
|
||||
previousValue = null;
|
||||
@ -385,17 +388,18 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T get(long transactionId,
|
||||
public <T> T get(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] keys,
|
||||
GetCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||
ops.beginOp();
|
||||
try (var arena = Arena.ofConfined()) {
|
||||
try {
|
||||
// Column id
|
||||
var col = getColumn(columnId);
|
||||
|
||||
@ -412,11 +416,15 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
var bucketElementKeys = col.getBucketElementKeys(keys);
|
||||
try (var readOptions = new ReadOptions()) {
|
||||
MemorySegment previousRawBucket = dbGet(tx, col, arena, readOptions, calculatedKey);
|
||||
var bucket = new Bucket(col, previousRawBucket);
|
||||
foundValue = bucket.getElement(bucketElementKeys);
|
||||
if (previousRawBucket != null) {
|
||||
var bucket = new Bucket(col, previousRawBucket);
|
||||
foundValue = bucket.getElement(bucketElementKeys);
|
||||
} else {
|
||||
foundValue = null;
|
||||
}
|
||||
existsValue = foundValue != null;
|
||||
} catch (RocksDBException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.GET_1, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.GET_1, e);
|
||||
}
|
||||
} else {
|
||||
boolean shouldGetCurrent = Callback.requiresGettingCurrentValue(callback)
|
||||
@ -426,7 +434,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
foundValue = dbGet(tx, col, arena, readOptions, calculatedKey);
|
||||
existsValue = foundValue != null;
|
||||
} catch (RocksDBException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
|
||||
}
|
||||
} else if (callback instanceof Callback.CallbackExists<?>) {
|
||||
// tx is always null here
|
||||
@ -447,14 +455,15 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
|
||||
} finally {
|
||||
ops.endOp();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long openIterator(long transactionId,
|
||||
public long openIterator(Arena arena,
|
||||
long transactionId,
|
||||
long columnId,
|
||||
MemorySegment[] startKeysInclusive,
|
||||
@Nullable MemorySegment[] endKeysExclusive,
|
||||
@ -492,7 +501,8 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seekTo(long iterationId, MemorySegment[] keys) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||
public void seekTo(Arena arena, long iterationId, MemorySegment[] keys)
|
||||
throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||
ops.beginOp();
|
||||
try {
|
||||
throw new UnsupportedOperationException();
|
||||
@ -502,7 +512,8 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T subsequent(long iterationId,
|
||||
public <T> T subsequent(Arena arena,
|
||||
long iterationId,
|
||||
long skipCount,
|
||||
long takeCount,
|
||||
IteratorCallback<? super MemorySegment, T> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
|
||||
@ -525,7 +536,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
} else {
|
||||
var db = this.db.get();
|
||||
if (USE_FAST_GET) {
|
||||
return dbGetDirect(arena, readOptions, calculatedKey);
|
||||
return dbGetDirect(arena, col.cfh(), readOptions, calculatedKey);
|
||||
} else {
|
||||
var previousRawBucketByteArray = db.get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
|
||||
return toMemorySegment(previousRawBucketByteArray);
|
||||
@ -534,30 +545,35 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private MemorySegment dbGetDirect(Arena arena, ReadOptions readOptions, MemorySegment calculatedKey)
|
||||
private MemorySegment dbGetDirect(Arena arena, ColumnFamilyHandle cfh, ReadOptions readOptions, MemorySegment calculatedKey)
|
||||
throws RocksDBException {
|
||||
// Get the key nio buffer to pass to RocksDB
|
||||
ByteBuffer keyNioBuffer = calculatedKey.asByteBuffer();
|
||||
|
||||
// Create a direct result buffer because RocksDB works only with direct buffers
|
||||
var resultBuffer = arena.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES).asByteBuffer();
|
||||
var keyMayExist = this.db.get().keyMayExist(readOptions, keyNioBuffer.rewind(), resultBuffer.clear());
|
||||
var keyMayExist = this.db.get().keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), resultBuffer.clear());
|
||||
return switch (keyMayExist.exists) {
|
||||
case kNotExist -> null;
|
||||
case kExistsWithValue, kExistsWithoutValue -> {
|
||||
// At the beginning, size reflects the expected size, then it becomes the real data size
|
||||
int size = keyMayExist.exists == kExistsWithValue ? keyMayExist.valueLength : -1;
|
||||
int size;
|
||||
if (keyMayExist.exists == kExistsWithValue) {
|
||||
size = keyMayExist.valueLength;
|
||||
} else {
|
||||
size = -1;
|
||||
}
|
||||
if (keyMayExist.exists == kExistsWithoutValue || size > resultBuffer.limit()) {
|
||||
if (size > resultBuffer.capacity()) {
|
||||
resultBuffer = arena.allocate(size).asByteBuffer();
|
||||
}
|
||||
size = this.db.get().get(readOptions, keyNioBuffer.rewind(), resultBuffer.clear());
|
||||
size = this.db.get().get(cfh, readOptions, keyNioBuffer.rewind(), resultBuffer.clear());
|
||||
}
|
||||
|
||||
if (size == RocksDB.NOT_FOUND) {
|
||||
yield null;
|
||||
} else if (size == resultBuffer.limit()) {
|
||||
yield MemorySegment.ofBuffer(resultBuffer.position(0));
|
||||
yield MemorySegment.ofBuffer(resultBuffer);
|
||||
} else {
|
||||
throw new IllegalStateException("size (" + size + ") != read size (" + resultBuffer.limit() + ")");
|
||||
}
|
||||
@ -570,7 +586,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
|
||||
if (col != null) {
|
||||
return col;
|
||||
} else {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_NOT_FOUND,
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_NOT_FOUND,
|
||||
"No column with id " + columnId);
|
||||
}
|
||||
}
|
||||
|
@ -36,11 +36,6 @@ public abstract class XXHash32 {
|
||||
*/
|
||||
public abstract void hash(MemorySegment buf, int off, int len, int seed, MemorySegment result);
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
public static XXHash32 getInstance() {
|
||||
return XXHash32JavaSafe.INSTANCE;
|
||||
}
|
||||
|
@ -21,12 +21,12 @@ import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME4;
|
||||
import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME5;
|
||||
import static java.lang.Integer.rotateLeft;
|
||||
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
import java.lang.foreign.ValueLayout;
|
||||
import java.lang.foreign.ValueLayout.OfByte;
|
||||
import java.lang.foreign.ValueLayout.OfInt;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Safe Java implementation of {@link XXHash32}.
|
||||
@ -34,7 +34,6 @@ import java.nio.ByteOrder;
|
||||
public final class XXHash32JavaSafe extends XXHash32 {
|
||||
|
||||
|
||||
public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
|
||||
public static final XXHash32 INSTANCE = new XXHash32JavaSafe();
|
||||
private static final OfInt INT_LE = ValueLayout.JAVA_INT_UNALIGNED.withOrder(ByteOrder.LITTLE_ENDIAN);
|
||||
private static final OfInt INT_BE = ValueLayout.JAVA_INT_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN);
|
||||
@ -42,7 +41,7 @@ public final class XXHash32JavaSafe extends XXHash32 {
|
||||
|
||||
@Override
|
||||
public int hash(byte[] buf, int off, int len, int seed) {
|
||||
checkRange(buf, off, len);
|
||||
Objects.checkFromIndexSize(off, len, buf.length);
|
||||
|
||||
final int end = off + len;
|
||||
int h32;
|
||||
@ -51,7 +50,7 @@ public final class XXHash32JavaSafe extends XXHash32 {
|
||||
final int limit = end - 16;
|
||||
int v1 = seed + PRIME1 + PRIME2;
|
||||
int v2 = seed + PRIME2;
|
||||
int v3 = seed + 0;
|
||||
int v3 = seed;
|
||||
int v4 = seed - PRIME1;
|
||||
do {
|
||||
v1 += readIntLE(buf, off) * PRIME2;
|
||||
@ -105,7 +104,7 @@ public final class XXHash32JavaSafe extends XXHash32 {
|
||||
|
||||
@Override
|
||||
public void hash(MemorySegment buf, int off, int len, int seed, MemorySegment result) {
|
||||
checkRange(buf, off, len);
|
||||
Objects.checkFromIndexSize(off, len, buf.byteSize());
|
||||
|
||||
final int end = off + len;
|
||||
int h32;
|
||||
@ -114,7 +113,7 @@ public final class XXHash32JavaSafe extends XXHash32 {
|
||||
final int limit = end - 16;
|
||||
int v1 = seed + PRIME1 + PRIME2;
|
||||
int v2 = seed + PRIME2;
|
||||
int v3 = seed + 0;
|
||||
int v3 = seed;
|
||||
int v4 = seed - PRIME1;
|
||||
do {
|
||||
v1 += readIntLE(buf, off) * PRIME2;
|
||||
@ -167,44 +166,6 @@ public final class XXHash32JavaSafe extends XXHash32 {
|
||||
result.set(INT_BE, 0, h32);
|
||||
}
|
||||
|
||||
private static void checkRange(byte[] buf, int off) {
|
||||
if (off < 0 || off >= buf.length) {
|
||||
throw new ArrayIndexOutOfBoundsException(off);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkRange(MemorySegment buf, int off) {
|
||||
if (off < 0 || off >= buf.byteSize()) {
|
||||
throw new ArrayIndexOutOfBoundsException(off);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkRange(byte[] buf, int off, int len) {
|
||||
checkLength(len);
|
||||
if (len > 0) {
|
||||
checkRange(buf, off);
|
||||
checkRange(buf, off + len - 1);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkRange(MemorySegment buf, int off, int len) {
|
||||
checkLength(len);
|
||||
if (len > 0) {
|
||||
checkRange(buf, off);
|
||||
checkRange(buf, off + len - 1);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkLength(int len) {
|
||||
if (len < 0) {
|
||||
throw new IllegalArgumentException("lengths must be >= 0");
|
||||
}
|
||||
}
|
||||
|
||||
private static int readIntBE(byte[] buf, int i) {
|
||||
return ((buf[i] & 0xFF) << 24) | ((buf[i+1] & 0xFF) << 16) | ((buf[i+2] & 0xFF) << 8) | (buf[i+3] & 0xFF);
|
||||
}
|
||||
|
||||
private static int readIntLE(byte[] buf, int i) {
|
||||
return (buf[i] & 0xFF) | ((buf[i+1] & 0xFF) << 8) | ((buf[i+2] & 0xFF) << 16) | ((buf[i+3] & 0xFF) << 24);
|
||||
}
|
||||
@ -212,12 +173,4 @@ public final class XXHash32JavaSafe extends XXHash32 {
|
||||
private static int readIntLE(MemorySegment buf, int i) {
|
||||
return buf.get(INT_LE, i);
|
||||
}
|
||||
|
||||
private static int readInt(byte[] buf, int i) {
|
||||
if (NATIVE_BYTE_ORDER == ByteOrder.BIG_ENDIAN) {
|
||||
return readIntBE(buf, i);
|
||||
} else {
|
||||
return readIntLE(buf, i);
|
||||
}
|
||||
}
|
||||
}
|
@ -127,7 +127,7 @@ public class RocksDBLoader {
|
||||
List<DbPath> paths = mapList(volumeConfigs, p -> new DbPath(p.path, p.targetSize));
|
||||
options.setDbPaths(paths);
|
||||
} else if (!volumeConfigs.isEmpty() && (volumeConfigs.size() > 1 || definitiveDbPath.relativize(volumeConfigs.getFirst().path).isAbsolute())) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "in-memory databases should not have any volume configured");
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "in-memory databases should not have any volume configured");
|
||||
}
|
||||
options.setMaxOpenFiles(Optional.ofNullable(databaseOptions.global().maximumOpenFiles()).orElse(-1));
|
||||
options.setMaxFileOpeningThreads(Runtime.getRuntime().availableProcessors());
|
||||
@ -218,7 +218,7 @@ public class RocksDBLoader {
|
||||
|
||||
return new OptionsWithCache(options, blockCache);
|
||||
} catch (GestaltException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -243,7 +243,7 @@ public class RocksDBLoader {
|
||||
try {
|
||||
return new DbPathRecord(definitiveDbPath.resolve(volumeConfig.volumePath()), volumeConfig.targetSize().longValue());
|
||||
} catch (GestaltException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.CONFIG_ERROR, "Failed to load volume configurations", e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.CONFIG_ERROR, "Failed to load volume configurations", e);
|
||||
}
|
||||
})
|
||||
.toList();
|
||||
@ -303,7 +303,7 @@ public class RocksDBLoader {
|
||||
String name = entry.getKey();
|
||||
FallbackColumnConfig columnOptions = entry.getValue();
|
||||
if (columnOptions instanceof NamedColumnConfig namedColumnConfig && !namedColumnConfig.name().equals(name)) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name);
|
||||
}
|
||||
|
||||
var columnFamilyOptions = new ColumnFamilyOptions();
|
||||
@ -415,7 +415,7 @@ public class RocksDBLoader {
|
||||
if (bloomFilterConfig != null) filter = bloomFilterConfig;
|
||||
if (filter == null) {
|
||||
if (path == null) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases");
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases");
|
||||
}
|
||||
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
|
||||
blockBasedTableConfig.setFilterPolicy(null);
|
||||
@ -533,11 +533,11 @@ public class RocksDBLoader {
|
||||
}
|
||||
return TransactionalDB.create(definitiveDbPath.toString(), db);
|
||||
} catch (IOException ex) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
|
||||
} catch (RocksDBException ex) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
|
||||
} catch (GestaltException e) {
|
||||
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Failed to load rocksdb", e);
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Failed to load rocksdb", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,8 @@ import it.cavallium.rockserver.core.common.ColumnSchema;
|
||||
import it.cavallium.rockserver.core.common.Utils;
|
||||
import it.unimi.dsi.fastutil.ints.IntList;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectList;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -18,14 +20,51 @@ class EmbeddedDBTest {
|
||||
|
||||
private EmbeddedConnection db;
|
||||
private long colId = 0L;
|
||||
private Arena arena;
|
||||
private MemorySegment bigValue;
|
||||
private MemorySegment[] key1;
|
||||
private MemorySegment[] collidingKey1;
|
||||
private MemorySegment[] key2;
|
||||
private MemorySegment value1;
|
||||
private MemorySegment value2;
|
||||
|
||||
@org.junit.jupiter.api.BeforeEach
|
||||
void setUp() throws IOException {
|
||||
if (System.getProperty("rockserver.core.print-config", null) == null) {
|
||||
System.setProperty("rockserver.core.print-config", "false");
|
||||
}
|
||||
arena = Arena.ofShared();
|
||||
db = new EmbeddedConnection(null, "test", null);
|
||||
createStandardColumn();
|
||||
|
||||
|
||||
var bigValueArray = new byte[10_000];
|
||||
ThreadLocalRandom.current().nextBytes(bigValueArray);
|
||||
bigValue = MemorySegment.ofArray(bigValueArray);
|
||||
key1 = new MemorySegment[] {
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(4, 6),
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(1, 2, 3),
|
||||
toMemorySegmentSimple(6, 7, 8)
|
||||
};
|
||||
collidingKey1 = new MemorySegment[] {
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(4, 6),
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(1, 2, 3),
|
||||
toMemorySegmentSimple(6, 7, -48)
|
||||
};
|
||||
key2 = new MemorySegment[] {
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(4, 6),
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(1, 2, 3),
|
||||
toMemorySegmentSimple(6, 7, 7)
|
||||
};
|
||||
|
||||
value1 = MemorySegment.ofArray(new byte[] {0, 0, 3});
|
||||
value2 = MemorySegment.ofArray(new byte[] {0, 0, 5});
|
||||
}
|
||||
|
||||
private void createStandardColumn() {
|
||||
@ -42,10 +81,28 @@ class EmbeddedDBTest {
|
||||
colId = db.createColumn("column-test", schema);
|
||||
}
|
||||
|
||||
void fillSomeKeys() {
|
||||
Assertions.assertNull(db.put(arena, 0, colId, key1, value1, Callback.none()));
|
||||
Assertions.assertNull(db.put(arena, 0, colId, collidingKey1, value2, Callback.none()));
|
||||
Assertions.assertNull(db.put(arena, 0, colId, key2, bigValue, Callback.none()));
|
||||
for (int i = 0; i < Byte.MAX_VALUE; i++) {
|
||||
var keyI = new MemorySegment[] {
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(4, 6),
|
||||
toMemorySegmentSimple(3),
|
||||
toMemorySegmentSimple(1, 2, 3),
|
||||
toMemorySegmentSimple(8, 2, 5, 1, 7, i)
|
||||
};
|
||||
var valueI = toMemorySegmentSimple(i, i, i, i, i);
|
||||
Assertions.assertNull(db.put(arena, 0, colId, keyI, valueI, Callback.none()));
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.jupiter.api.AfterEach
|
||||
void tearDown() throws IOException {
|
||||
db.deleteColumn(colId);
|
||||
db.close();
|
||||
arena.close();
|
||||
}
|
||||
|
||||
@org.junit.jupiter.api.Test
|
||||
@ -60,11 +117,11 @@ class EmbeddedDBTest {
|
||||
var value1 = MemorySegment.ofArray(new byte[] {0, 0, 3});
|
||||
var value2 = MemorySegment.ofArray(new byte[] {0, 0, 5});
|
||||
|
||||
var delta = db.put(0, colId, key, value1, Callback.delta());
|
||||
var delta = db.put(arena, 0, colId, key, value1, Callback.delta());
|
||||
Assertions.assertNull(delta.previous());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value1));
|
||||
|
||||
delta = db.put(0, colId, key, value2, Callback.delta());
|
||||
delta = db.put(arena, 0, colId, key, value2, Callback.delta());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.previous(), value1));
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value2));
|
||||
}
|
||||
@ -94,15 +151,15 @@ class EmbeddedDBTest {
|
||||
var value1 = MemorySegment.ofArray(new byte[] {0, 0, 3});
|
||||
var value2 = MemorySegment.ofArray(new byte[] {0, 0, 5});
|
||||
|
||||
var delta = db.put(0, colId, key1, value1, Callback.delta());
|
||||
var delta = db.put(arena, 0, colId, key1, value1, Callback.delta());
|
||||
Assertions.assertNull(delta.previous());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value1));
|
||||
|
||||
delta = db.put(0, colId, key2, value2, Callback.delta());
|
||||
delta = db.put(arena, 0, colId, key2, value2, Callback.delta());
|
||||
Assertions.assertNull(delta.previous());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value2));
|
||||
|
||||
delta = db.put(0, colId, key2, value1, Callback.delta());
|
||||
delta = db.put(arena, 0, colId, key2, value1, Callback.delta());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.previous(), value2));
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value1));
|
||||
}
|
||||
@ -143,29 +200,32 @@ class EmbeddedDBTest {
|
||||
var value1 = MemorySegment.ofArray(new byte[] {0, 0, 3});
|
||||
var value2 = MemorySegment.ofArray(new byte[] {0, 0, 5});
|
||||
|
||||
var delta = db.put(0, colId, key1, value1, Callback.delta());
|
||||
var delta = db.put(arena, 0, colId, key1, value1, Callback.delta());
|
||||
Assertions.assertNull(delta.previous());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value1));
|
||||
|
||||
delta = db.put(0, colId, collidingKey1, value2, Callback.delta());
|
||||
delta = db.put(arena, 0, colId, collidingKey1, value2, Callback.delta());
|
||||
Assertions.assertNull(delta.previous());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value2));
|
||||
|
||||
delta = db.put(0, colId, collidingKey1, value1, Callback.delta());
|
||||
delta = db.put(arena, 0, colId, collidingKey1, value1, Callback.delta());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.previous(), value2));
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value1));
|
||||
|
||||
delta = db.put(0, colId, key2, value1, Callback.delta());
|
||||
delta = db.put(arena, 0, colId, key2, value1, Callback.delta());
|
||||
Assertions.assertNull(delta.previous());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value1));
|
||||
|
||||
delta = db.put(0, colId, key2, value2, Callback.delta());
|
||||
delta = db.put(arena, 0, colId, key2, value2, Callback.delta());
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.previous(), value1));
|
||||
Assertions.assertTrue(Utils.valueEquals(delta.current(), value2));
|
||||
}
|
||||
|
||||
@org.junit.jupiter.api.Test
|
||||
void get() {
|
||||
throw new UnsupportedOperationException();
|
||||
fillSomeKeys();
|
||||
Assertions.assertTrue(Utils.valueEquals(value1, db.get(arena, 0, colId, key1, Callback.current())));
|
||||
Assertions.assertTrue(Utils.valueEquals(value2, db.get(arena, 0, colId, collidingKey1, Callback.current())));
|
||||
Assertions.assertTrue(Utils.valueEquals(bigValue, db.get(arena, 0, colId, key2, Callback.current())));
|
||||
}
|
||||
}
|
@ -12,12 +12,8 @@ import org.junit.jupiter.api.Test;
|
||||
|
||||
public class XXHash32Test {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new XXHash32Test().test();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
public void testMemorySegment() {
|
||||
var safeXxhash32 = net.jpountz.xxhash.XXHashFactory.safeInstance().hash32();
|
||||
var myXxhash32 = XXHash32.getInstance();
|
||||
for (int runs = 0; runs < 3; runs++) {
|
||||
@ -33,4 +29,21 @@ public class XXHash32Test {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBytes() {
|
||||
var myXxhash32 = XXHash32.getInstance();
|
||||
for (int runs = 0; runs < 3; runs++) {
|
||||
for (int len = 0; len < 600; len++) {
|
||||
byte[] bytes = new byte[len];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
var hash = myXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE);
|
||||
var a = Arena.global();
|
||||
var result = a.allocate(Integer.BYTES);
|
||||
myXxhash32.hash(a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE, result);
|
||||
var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0);
|
||||
Assertions.assertEquals(hash, resultInt);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user