diff --git a/pom.xml b/pom.xml
index e2cb1a0..11c8b47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,11 +49,6 @@
fastutil
8.5.13
-
- com.github.seancfoley
- ipaddress
- 5.5.0
-
org.jetbrains
annotations
@@ -230,15 +225,17 @@
-H:+UnlockExperimentalVMOptions
- --strict-image-heap
-march=native
-H:IncludeResourceBundles=net.sourceforge.argparse4j.internal.ArgumentParserImpl
-O1
-H:+StaticExecutableWithDynamicLibC
-H:+JNI
+ -H:+ForeignAPISupport
-H:IncludeResources=librocksdbjni-linux64.so
-H:IncludeResources=it/cavallium/rockserver/core/resources/default.conf
+ -H:IncludeResources=it/cavallium/rockserver/core/resources/build.properties
-H:DynamicProxyConfigurationFiles=proxy-config.json
+ -H:ReflectionConfigurationFiles=reflect-config.json
--delay-class-initialization-to-runtime=org.rocksdb.RocksDB,org.rocksdb.RocksObject
--gc=G1
diff --git a/proxy-config.json b/proxy-config.json
index 661fec5..75597d9 100644
--- a/proxy-config.json
+++ b/proxy-config.json
@@ -20,7 +20,7 @@
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
},
"interfaces": [
- "it.cavallium.rockserver.core.config.FallbackColumnOptions"
+ "it.cavallium.rockserver.core.config.FallbackColumnConfig"
]
},
{
@@ -28,7 +28,15 @@
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
},
"interfaces": [
- "it.cavallium.rockserver.core.config.NamedColumnOptions"
+ "it.cavallium.rockserver.core.config.NamedColumnConfig"
+ ]
+ },
+ {
+ "condition": {
+ "typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
+ },
+ "interfaces": [
+ "it.cavallium.rockserver.core.config.ColumnLevelConfig"
]
},
{
@@ -39,6 +47,14 @@
"it.cavallium.rockserver.core.config.BloomFilterConfig"
]
},
+ {
+ "condition": {
+ "typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
+ },
+ "interfaces": [
+ "it.cavallium.rockserver.core.config.VolumeConfig"
+ ]
+ },
{
"condition": {
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
diff --git a/reflect-config.json b/reflect-config.json
new file mode 100644
index 0000000..4f55a47
--- /dev/null
+++ b/reflect-config.json
@@ -0,0 +1,8 @@
+[
+ {
+ "name" : "java.lang.String",
+ "methods" : [
+ { "name" : "", "parameterTypes" : ["java.lang.String"] }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/src/main/java/it/cavallium/rockserver/core/Main.java b/src/main/java/it/cavallium/rockserver/core/Main.java
index 9a8c2d0..3b2fbf7 100644
--- a/src/main/java/it/cavallium/rockserver/core/Main.java
+++ b/src/main/java/it/cavallium/rockserver/core/Main.java
@@ -3,8 +3,7 @@ package it.cavallium.rockserver.core;
import static it.cavallium.rockserver.core.client.EmbeddedConnection.PRIVATE_MEMORY_URL;
import static java.util.Objects.requireNonNull;
-import inet.ipaddr.HostName;
-
+import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
import java.io.IOException;
import java.net.URI;
@@ -89,14 +88,14 @@ public class Main {
case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(databaseUrl.getPath())));
case "file" -> clientBuilder.setEmbeddedPath(Path.of((databaseUrl.getAuthority() != null ? databaseUrl.getAuthority() : "") + databaseUrl.getPath()).normalize());
case "memory" -> clientBuilder.setEmbeddedInMemory(true);
- case "rocksdb" -> clientBuilder.setAddress(new HostName(databaseUrl.getHost()).asInetSocketAddress());
+ case "rocksdb" -> clientBuilder.setAddress(Utils.parseHostAndPort(databaseUrl));
default -> throw new IllegalArgumentException("Invalid scheme: " + databaseUrl.getScheme());
}
switch (listenUrl.getScheme()) {
case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath())));
- case "http" -> serverBuilder.setHttpAddress(listenUrl.getHost(), listenUrl.getPort());
- case "rocksdb" -> serverBuilder.setAddress(new HostName(listenUrl.getHost()).asInetSocketAddress());
+ case "http" -> serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl));
+ case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl));
default -> throw new IllegalArgumentException("Invalid scheme: " + listenUrl.getScheme());
}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java
index 884a15b..ec26f4e 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java
@@ -1,8 +1,5 @@
package it.cavallium.rockserver.core.common;
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
-
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate;
diff --git a/src/main/java/it/cavallium/rockserver/core/common/Utils.java b/src/main/java/it/cavallium/rockserver/core/common/Utils.java
index 05b7a61..ed05da9 100644
--- a/src/main/java/it/cavallium/rockserver/core/common/Utils.java
+++ b/src/main/java/it/cavallium/rockserver/core/common/Utils.java
@@ -8,6 +8,8 @@ import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
+import java.net.InetSocketAddress;
+import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -26,8 +28,10 @@ public class Utils {
@SuppressWarnings("resource")
private static final MemorySegment DUMMY_EMPTY_VALUE = Arena
.global()
- .allocate(ValueLayout.JAVA_BYTE, (byte) -1)
+ .allocate(1)
+ .copyFrom(MemorySegment.ofArray(new byte[] {-1}))
.asReadOnly();
+ public static final int DEFAULT_PORT = 5333;
public static MemorySegment dummyEmptyValue() {
return DUMMY_EMPTY_VALUE;
@@ -78,7 +82,8 @@ public class Utils {
public static @Nullable MemorySegment toMemorySegment(Arena arena, byte... array) {
if (array != null) {
assert arena != null;
- return arena.allocateArray(ValueLayout.JAVA_BYTE, array);
+ // todo: replace with allocateArray when graalvm adds it
+ return arena.allocate(array.length).copyFrom(MemorySegment.ofArray(array));
} else {
return null;
}
@@ -105,7 +110,8 @@ public class Utils {
for (int i = 0; i < array.length; i++) {
newArray[i] = (byte) array[i];
}
- return arena.allocateArray(ValueLayout.JAVA_BYTE, newArray);
+ // todo: replace with allocateArray when graalvm adds it
+ return arena.allocate(newArray.length).copyFrom(MemorySegment.ofArray(newArray));
} else {
return null;
}
@@ -141,4 +147,17 @@ public class Utils {
return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize())
== -1;
}
+
+ public static InetSocketAddress parseHostAndPort(URI uri) {
+ return new InetSocketAddress(uri.getHost(), parsePort(uri));
+ }
+
+ public static int parsePort(URI uri) {
+ var port = uri.getPort();
+ if (port == -1) {
+ return DEFAULT_PORT;
+ } else {
+ return port;
+ }
+ }
}
diff --git a/src/main/java/it/cavallium/rockserver/core/config/ConfigParser.java b/src/main/java/it/cavallium/rockserver/core/config/ConfigParser.java
index 6b436b6..ebfb5cb 100644
--- a/src/main/java/it/cavallium/rockserver/core/config/ConfigParser.java
+++ b/src/main/java/it/cavallium/rockserver/core/config/ConfigParser.java
@@ -23,7 +23,7 @@ public class ConfigParser {
gsb = new GestaltBuilder();
gsb
.setTreatMissingArrayIndexAsError(false)
- .setTreatNullValuesInClassAsErrors(false)
+ .setTreatMissingDiscretionaryValuesAsErrors(false)
.setTreatMissingValuesAsErrors(false)
.addDecoder(new DataSizeDecoder())
.addDecoder(new DbCompressionDecoder())
diff --git a/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java b/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java
index eb1b355..92067bb 100644
--- a/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java
+++ b/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java
@@ -7,8 +7,8 @@ import it.cavallium.rockserver.core.common.api.ColumnHashType;
import it.cavallium.rockserver.core.common.api.ColumnSchema;
import it.cavallium.rockserver.core.common.api.Delta;
import it.cavallium.rockserver.core.common.api.OptionalBinary;
-import it.cavallium.rockserver.core.common.api.RocksDB.AsyncIface;
-import it.cavallium.rockserver.core.common.api.RocksDB.AsyncProcessor;
+import it.cavallium.rockserver.core.common.api.RocksDB.Iface;
+import it.cavallium.rockserver.core.common.api.RocksDB.Processor;
import it.cavallium.rockserver.core.common.api.UpdateBegin;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
@@ -21,10 +21,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
-import java.util.function.BiConsumer;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.jetbrains.annotations.NotNull;
@@ -35,12 +32,12 @@ public class ThriftServer extends Server {
public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
super(client);
- var handler = new AsyncThriftHandler(client);
+ var handler = new ThriftHandler(client);
try {
var serverTransport = new TNonblockingServerSocket(new InetSocketAddress(http2Host, http2Port));
- var server = new TNonblockingServer(new TNonblockingServer.Args(serverTransport)
- .processor(new AsyncProcessor<>(handler))
+ var server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport)
+ .processor(new Processor<>(handler))
);
server.serve();
@@ -97,55 +94,9 @@ public class ThriftServer extends Server {
return it.cavallium.rockserver.core.common.ColumnHashType.valueOf(variableTailKey.name());
}
- private static BiConsumer super T, ? super Throwable> handleResult(AsyncMethodCallback resultHandler) {
- return (result, error) -> {
- if (error != null) {
- if (error instanceof Exception ex) {
- resultHandler.onError(ex);
- } else {
- resultHandler.onError(new Exception(error));
- }
- } else {
- resultHandler.onComplete(result);
- }
- };
- }
-
- private static BiConsumer super T, ? super Throwable> handleResultWithArena(Arena arena,
- AsyncMethodCallback resultHandler) {
- return (result, error) -> {
- arena.close();
- if (error != null) {
- if (error instanceof Exception ex) {
- resultHandler.onError(ex);
- } else {
- resultHandler.onError(new Exception(error));
- }
- } else {
- resultHandler.onComplete(result);
- }
- };
- }
-
- private static BiConsumer super List, ? super Throwable> handleResultListWithArena(Arena arena,
- AsyncMethodCallback resultHandler) {
- return (result, error) -> {
- arena.close();
- if (error != null) {
- if (error instanceof Exception ex) {
- resultHandler.onError(ex);
- } else {
- resultHandler.onError(new Exception(error));
- }
- } else {
- resultHandler.onComplete(null);
- }
- };
- }
-
private static OptionalBinary mapResult(MemorySegment memorySegment) {
var result = new OptionalBinary();
- return memorySegment != null ? result.setValue(memorySegment.asByteBuffer()) : result;
+ return memorySegment != null ? result.setValue(ByteBuffer.wrap(memorySegment.toArray(BYTE_BE))) : result;
}
private static UpdateBegin mapResult(UpdateContext context) {
@@ -164,225 +115,187 @@ public class ThriftServer extends Server {
return multi.stream().map(ThriftServer::mapResult).toList();
}
- private static class AsyncThriftHandler implements AsyncIface {
+ private static class ThriftHandler implements Iface {
private final RocksDBConnection client;
- public AsyncThriftHandler(RocksDBConnection client) {
+ public ThriftHandler(RocksDBConnection client) {
this.client = client;
}
+
@Override
- public void openTransaction(long timeoutMs, AsyncMethodCallback resultHandler) {
- client.getAsyncApi().openTransactionAsync(timeoutMs).whenComplete(handleResult(resultHandler));
+ public long openTransaction(long timeoutMs) {
+ return client.getSyncApi().openTransaction(timeoutMs);
}
@Override
- public void closeTransaction(long timeoutMs, boolean commit, AsyncMethodCallback resultHandler) {
- client.getAsyncApi().closeTransactionAsync(timeoutMs, commit).whenComplete(handleResult(resultHandler));
+ public boolean closeTransaction(long timeoutMs, boolean commit) {
+ return client.getSyncApi().closeTransaction(timeoutMs, commit);
}
@Override
- public void closeFailedUpdate(long updateId, AsyncMethodCallback resultHandler) {
- client.getAsyncApi().closeFailedUpdateAsync(updateId).whenComplete(handleResult(resultHandler));
+ public void closeFailedUpdate(long updateId) {
+ client.getSyncApi().closeFailedUpdate(updateId);
}
@Override
- public void createColumn(String name, ColumnSchema schema, AsyncMethodCallback resultHandler) {
- client.getAsyncApi().createColumnAsync(name, columnSchemaToRecord(schema))
- .whenComplete(handleResult(resultHandler));
+ public long createColumn(String name, ColumnSchema schema) {
+ return client.getSyncApi().createColumn(name, columnSchemaToRecord(schema));
}
@Override
- public void deleteColumn(long columnId, AsyncMethodCallback resultHandler) {
- client.getAsyncApi().deleteColumnAsync(columnId).whenComplete(handleResult(resultHandler));
+ public void deleteColumn(long columnId) {
+ client.getSyncApi().deleteColumn(columnId);
}
@Override
- public void getColumnId(String name, AsyncMethodCallback resultHandler) {
- client.getAsyncApi().getColumnIdAsync(name).whenComplete(handleResult(resultHandler));
+ public long getColumnId(String name) {
+ return client.getSyncApi().getColumnId(name);
}
@Override
public void putFast(long transactionOrUpdateId,
long columnId,
List keys,
- ByteBuffer value,
- AsyncMethodCallback resultHandler) {
- this.put(transactionOrUpdateId, columnId, keys, value, resultHandler);
+ ByteBuffer value) {
+ this.put(transactionOrUpdateId, columnId, keys, value);
}
@Override
public void put(long transactionOrUpdateId,
long columnId,
List keys,
- ByteBuffer value,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.none())
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ ByteBuffer value) {
+ try (var arena = Arena.ofConfined()) {
+ client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.none());
+ }
}
@Override
public void putMulti(long transactionOrUpdateId,
long columnId,
List> keysMulti,
- List valueMulti,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .putMultiAsync(arena, transactionOrUpdateId, columnId, keysToRecords(arena, keysMulti), keyToRecords(arena, valueMulti), RequestType.none())
- .whenComplete(handleResultListWithArena(arena, resultHandler));
+ List valueMulti) {
+ try (var arena = Arena.ofConfined()) {
+ client.getSyncApi().putMulti(arena, transactionOrUpdateId, columnId, keysToRecords(arena, keysMulti), keyToRecords(arena, valueMulti), RequestType.none());
+ }
}
@Override
- public void putGetPrevious(long transactionOrUpdateId,
+ public OptionalBinary putGetPrevious(long transactionOrUpdateId,
long columnId,
List keys,
- ByteBuffer value,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previous())
- .thenApply(ThriftServer::mapResult)
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ ByteBuffer value) {
+ try (var arena = Arena.ofConfined()) {
+ return ThriftServer.mapResult(client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previous()));
+ }
}
@Override
- public void putGetDelta(long transactionOrUpdateId,
+ public Delta putGetDelta(long transactionOrUpdateId,
long columnId,
List keys,
- ByteBuffer value,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.delta())
- .thenApply(ThriftServer::mapResult)
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ ByteBuffer value) {
+ try (var arena = Arena.ofConfined()) {
+ return ThriftServer.mapResult(client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.delta()));
+ }
}
@Override
- public void putGetChanged(long transactionOrUpdateId,
+ public boolean putGetChanged(long transactionOrUpdateId,
long columnId,
List keys,
- ByteBuffer value,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client
- .getAsyncApi()
- .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.changed())
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ ByteBuffer value) {
+ try (var arena = Arena.ofConfined()) {
+ return client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.changed());
+ }
}
@Override
- public void putGetPreviousPresence(long transactionOrUpdateId,
+ public boolean putGetPreviousPresence(long transactionOrUpdateId,
long columnId,
List keys,
- ByteBuffer value,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client
- .getAsyncApi()
- .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previousPresence())
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ ByteBuffer value) {
+ try (var arena = Arena.ofConfined()) {
+ return client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previousPresence());
+ }
}
@Override
- public void get(long transactionOrUpdateId,
+ public OptionalBinary get(long transactionOrUpdateId,
long columnId,
- List keys,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.current())
- .thenApply(ThriftServer::mapResult)
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ List keys) {
+ try (var arena = Arena.ofConfined()) {
+ return ThriftServer.mapResult(client.getSyncApi().get(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.current()));
+ }
}
@Override
- public void getForUpdate(long transactionOrUpdateId,
+ public UpdateBegin getForUpdate(long transactionOrUpdateId,
long columnId,
- List keys,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.forUpdate())
- .thenApply(ThriftServer::mapResult)
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ List keys) {
+ try (var arena = Arena.ofConfined()) {
+ return mapResult(client.getSyncApi().get(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.forUpdate()));
+ }
}
@Override
- public void exists(long transactionOrUpdateId,
+ public boolean exists(long transactionOrUpdateId,
long columnId,
- List keys,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.exists())
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ List keys) {
+ try (var arena = Arena.ofConfined()) {
+ return client.getSyncApi().get(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.exists());
+ }
}
@Override
- public void openIterator(long transactionId,
+ public long openIterator(long transactionId,
long columnId,
List startKeysInclusive,
List endKeysExclusive,
boolean reverse,
- long timeoutMs,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client
- .getAsyncApi()
- .openIteratorAsync(arena, transactionId, columnId, keysToRecord(arena,
- startKeysInclusive), keysToRecord(arena, endKeysExclusive), reverse, timeoutMs)
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ long timeoutMs) {
+ try (var arena = Arena.ofConfined()) {
+ return client.getSyncApi().openIterator(arena, transactionId, columnId, keysToRecord(arena, startKeysInclusive), keysToRecord(arena, endKeysExclusive), reverse, timeoutMs);
+ }
}
@Override
- public void closeIterator(long iteratorId, AsyncMethodCallback resultHandler) {
- client.getAsyncApi()
- .closeIteratorAsync(iteratorId)
- .whenComplete(handleResult(resultHandler));
+ public void closeIterator(long iteratorId) {
+ client.getSyncApi().closeIterator(iteratorId);
}
@Override
- public void seekTo(long iterationId, List keys, AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .seekToAsync(arena, iterationId, keysToRecord(arena, keys))
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ public void seekTo(long iterationId, List keys) {
+ try (var arena = Arena.ofConfined()) {
+ client.getSyncApi().seekTo(arena, iterationId, keysToRecord(arena, keys));
+ }
}
@Override
- public void subsequent(long iterationId, long skipCount, long takeCount, AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.none())
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ public void subsequent(long iterationId, long skipCount, long takeCount) {
+ try (var arena = Arena.ofConfined()) {
+ client.getSyncApi().subsequent(arena, iterationId, skipCount, takeCount, RequestType.none());
+ }
}
@Override
- public void subsequentExists(long iterationId,
+ public boolean subsequentExists(long iterationId,
long skipCount,
- long takeCount,
- AsyncMethodCallback resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.exists())
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ long takeCount) {
+ try (var arena = Arena.ofConfined()) {
+ return client.getSyncApi().subsequent(arena, iterationId, skipCount, takeCount, RequestType.exists());
+ }
}
@Override
- public void subsequentMultiGet(long iterationId,
+ public List subsequentMultiGet(long iterationId,
long skipCount,
- long takeCount,
- AsyncMethodCallback> resultHandler) {
- var arena = Arena.ofShared();
- client.getAsyncApi()
- .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.multi())
- .thenApply(ThriftServer::mapResult)
- .whenComplete(handleResultWithArena(arena, resultHandler));
+ long takeCount) {
+ try (var arena = Arena.ofConfined()) {
+ return mapResult(client.getSyncApi().subsequent(arena, iterationId, skipCount, takeCount, RequestType.multi()));
+ }
}
}
}
diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java
index cfceee4..918e4f1 100644
--- a/src/main/java/module-info.java
+++ b/src/main/java/module-info.java
@@ -1,7 +1,6 @@
module rockserver.core {
requires rocksdbjni;
requires net.sourceforge.argparse4j;
- requires inet.ipaddr;
requires java.logging;
requires org.jetbrains.annotations;
requires high.scale.lib;
diff --git a/src/test/java/it/cavallium/rockserver/core/test/XXHash32Test.java b/src/test/java/it/cavallium/rockserver/core/test/XXHash32Test.java
index 4cdae91..a7c8048 100644
--- a/src/test/java/it/cavallium/rockserver/core/test/XXHash32Test.java
+++ b/src/test/java/it/cavallium/rockserver/core/test/XXHash32Test.java
@@ -3,6 +3,7 @@ package it.cavallium.rockserver.core.test;
import it.cavallium.rockserver.core.impl.ColumnInstance;
import it.cavallium.rockserver.core.impl.XXHash32;
import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout.OfByte;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Assertions;
@@ -21,7 +22,7 @@ public class XXHash32Test {
var hash = safeXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE);
var a = Arena.global();
var result = a.allocate(Integer.BYTES);
- myXxhash32.hash(a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE, result);
+ myXxhash32.hash(a.allocate(bytes.length).copyFrom(MemorySegment.ofArray(bytes)), 0, bytes.length, Integer.MIN_VALUE, result);
var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0);
Assertions.assertEquals(hash, resultInt);
}
@@ -38,7 +39,7 @@ public class XXHash32Test {
var hash = myXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE);
var a = Arena.global();
var result = a.allocate(Integer.BYTES);
- myXxhash32.hash(a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE, result);
+ myXxhash32.hash(a.allocate(bytes.length).copyFrom(MemorySegment.ofArray(bytes)), 0, bytes.length, Integer.MIN_VALUE, result);
var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0);
Assertions.assertEquals(hash, resultInt);
}