Remove ipaddress dependency, fix native image

This commit is contained in:
Andrea Cavalli 2024-03-29 22:58:54 +01:00
parent 0368839c98
commit 629a66eecb
10 changed files with 151 additions and 202 deletions

View File

@ -49,11 +49,6 @@
<artifactId>fastutil</artifactId> <artifactId>fastutil</artifactId>
<version>8.5.13</version> <version>8.5.13</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.seancfoley</groupId>
<artifactId>ipaddress</artifactId>
<version>5.5.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.jetbrains</groupId> <groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId> <artifactId>annotations</artifactId>
@ -230,15 +225,17 @@
</agent> </agent>
<buildArgs> <buildArgs>
<buildArg>-H:+UnlockExperimentalVMOptions</buildArg> <buildArg>-H:+UnlockExperimentalVMOptions</buildArg>
<buildArg>--strict-image-heap</buildArg>
<buildArg>-march=native</buildArg> <buildArg>-march=native</buildArg>
<buildArg>-H:IncludeResourceBundles=net.sourceforge.argparse4j.internal.ArgumentParserImpl</buildArg> <buildArg>-H:IncludeResourceBundles=net.sourceforge.argparse4j.internal.ArgumentParserImpl</buildArg>
<buildArg>-O1</buildArg> <buildArg>-O1</buildArg>
<buildArg>-H:+StaticExecutableWithDynamicLibC</buildArg> <buildArg>-H:+StaticExecutableWithDynamicLibC</buildArg>
<buildArg>-H:+JNI</buildArg> <buildArg>-H:+JNI</buildArg>
<buildArg>-H:+ForeignAPISupport</buildArg>
<buildArg>-H:IncludeResources=librocksdbjni-linux64.so</buildArg> <buildArg>-H:IncludeResources=librocksdbjni-linux64.so</buildArg>
<buildArg>-H:IncludeResources=it/cavallium/rockserver/core/resources/default.conf</buildArg> <buildArg>-H:IncludeResources=it/cavallium/rockserver/core/resources/default.conf</buildArg>
<buildArg>-H:IncludeResources=it/cavallium/rockserver/core/resources/build.properties</buildArg>
<buildArg>-H:DynamicProxyConfigurationFiles=proxy-config.json</buildArg> <buildArg>-H:DynamicProxyConfigurationFiles=proxy-config.json</buildArg>
<buildArg>-H:ReflectionConfigurationFiles=reflect-config.json</buildArg>
<buildArg>--delay-class-initialization-to-runtime=org.rocksdb.RocksDB,org.rocksdb.RocksObject</buildArg> <buildArg>--delay-class-initialization-to-runtime=org.rocksdb.RocksDB,org.rocksdb.RocksObject</buildArg>
<!--<buildArg>_-no-fallback</buildArg>--> <!--<buildArg>_-no-fallback</buildArg>-->
<buildArg>--gc=G1</buildArg> <buildArg>--gc=G1</buildArg>

View File

@ -20,7 +20,7 @@
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder" "typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
}, },
"interfaces": [ "interfaces": [
"it.cavallium.rockserver.core.config.FallbackColumnOptions" "it.cavallium.rockserver.core.config.FallbackColumnConfig"
] ]
}, },
{ {
@ -28,7 +28,15 @@
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder" "typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
}, },
"interfaces": [ "interfaces": [
"it.cavallium.rockserver.core.config.NamedColumnOptions" "it.cavallium.rockserver.core.config.NamedColumnConfig"
]
},
{
"condition": {
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
},
"interfaces": [
"it.cavallium.rockserver.core.config.ColumnLevelConfig"
] ]
}, },
{ {
@ -39,6 +47,14 @@
"it.cavallium.rockserver.core.config.BloomFilterConfig" "it.cavallium.rockserver.core.config.BloomFilterConfig"
] ]
}, },
{
"condition": {
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"
},
"interfaces": [
"it.cavallium.rockserver.core.config.VolumeConfig"
]
},
{ {
"condition": { "condition": {
"typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder" "typeReachable": "org.github.gestalt.config.decoder.ProxyDecoder"

8
reflect-config.json Normal file
View File

@ -0,0 +1,8 @@
[
{
"name" : "java.lang.String",
"methods" : [
{ "name" : "<init>", "parameterTypes" : ["java.lang.String"] }
]
}
]

View File

@ -3,8 +3,7 @@ package it.cavallium.rockserver.core;
import static it.cavallium.rockserver.core.client.EmbeddedConnection.PRIVATE_MEMORY_URL; import static it.cavallium.rockserver.core.client.EmbeddedConnection.PRIVATE_MEMORY_URL;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import inet.ipaddr.HostName; import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader; import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
@ -89,14 +88,14 @@ public class Main {
case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(databaseUrl.getPath()))); case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(databaseUrl.getPath())));
case "file" -> clientBuilder.setEmbeddedPath(Path.of((databaseUrl.getAuthority() != null ? databaseUrl.getAuthority() : "") + databaseUrl.getPath()).normalize()); case "file" -> clientBuilder.setEmbeddedPath(Path.of((databaseUrl.getAuthority() != null ? databaseUrl.getAuthority() : "") + databaseUrl.getPath()).normalize());
case "memory" -> clientBuilder.setEmbeddedInMemory(true); case "memory" -> clientBuilder.setEmbeddedInMemory(true);
case "rocksdb" -> clientBuilder.setAddress(new HostName(databaseUrl.getHost()).asInetSocketAddress()); case "rocksdb" -> clientBuilder.setAddress(Utils.parseHostAndPort(databaseUrl));
default -> throw new IllegalArgumentException("Invalid scheme: " + databaseUrl.getScheme()); default -> throw new IllegalArgumentException("Invalid scheme: " + databaseUrl.getScheme());
} }
switch (listenUrl.getScheme()) { switch (listenUrl.getScheme()) {
case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath()))); case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath())));
case "http" -> serverBuilder.setHttpAddress(listenUrl.getHost(), listenUrl.getPort()); case "http" -> serverBuilder.setHttpAddress(listenUrl.getHost(), Utils.parsePort(listenUrl));
case "rocksdb" -> serverBuilder.setAddress(new HostName(listenUrl.getHost()).asInetSocketAddress()); case "rocksdb" -> serverBuilder.setAddress(Utils.parseHostAndPort(listenUrl));
default -> throw new IllegalArgumentException("Invalid scheme: " + listenUrl.getScheme()); default -> throw new IllegalArgumentException("Invalid scheme: " + listenUrl.getScheme());
} }

View File

@ -1,8 +1,5 @@
package it.cavallium.rockserver.core.common; package it.cavallium.rockserver.core.common;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate; import it.cavallium.rockserver.core.common.RocksDBAPICommand.CloseFailedUpdate;

View File

@ -8,6 +8,8 @@ import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout; import java.lang.foreign.ValueLayout;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
@ -26,8 +28,10 @@ public class Utils {
@SuppressWarnings("resource") @SuppressWarnings("resource")
private static final MemorySegment DUMMY_EMPTY_VALUE = Arena private static final MemorySegment DUMMY_EMPTY_VALUE = Arena
.global() .global()
.allocate(ValueLayout.JAVA_BYTE, (byte) -1) .allocate(1)
.copyFrom(MemorySegment.ofArray(new byte[] {-1}))
.asReadOnly(); .asReadOnly();
public static final int DEFAULT_PORT = 5333;
public static MemorySegment dummyEmptyValue() { public static MemorySegment dummyEmptyValue() {
return DUMMY_EMPTY_VALUE; return DUMMY_EMPTY_VALUE;
@ -78,7 +82,8 @@ public class Utils {
public static @Nullable MemorySegment toMemorySegment(Arena arena, byte... array) { public static @Nullable MemorySegment toMemorySegment(Arena arena, byte... array) {
if (array != null) { if (array != null) {
assert arena != null; assert arena != null;
return arena.allocateArray(ValueLayout.JAVA_BYTE, array); // todo: replace with allocateArray when graalvm adds it
return arena.allocate(array.length).copyFrom(MemorySegment.ofArray(array));
} else { } else {
return null; return null;
} }
@ -105,7 +110,8 @@ public class Utils {
for (int i = 0; i < array.length; i++) { for (int i = 0; i < array.length; i++) {
newArray[i] = (byte) array[i]; newArray[i] = (byte) array[i];
} }
return arena.allocateArray(ValueLayout.JAVA_BYTE, newArray); // todo: replace with allocateArray when graalvm adds it
return arena.allocate(newArray.length).copyFrom(MemorySegment.ofArray(newArray));
} else { } else {
return null; return null;
} }
@ -141,4 +147,17 @@ public class Utils {
return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize()) return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize())
== -1; == -1;
} }
public static InetSocketAddress parseHostAndPort(URI uri) {
return new InetSocketAddress(uri.getHost(), parsePort(uri));
}
public static int parsePort(URI uri) {
var port = uri.getPort();
if (port == -1) {
return DEFAULT_PORT;
} else {
return port;
}
}
} }

View File

@ -23,7 +23,7 @@ public class ConfigParser {
gsb = new GestaltBuilder(); gsb = new GestaltBuilder();
gsb gsb
.setTreatMissingArrayIndexAsError(false) .setTreatMissingArrayIndexAsError(false)
.setTreatNullValuesInClassAsErrors(false) .setTreatMissingDiscretionaryValuesAsErrors(false)
.setTreatMissingValuesAsErrors(false) .setTreatMissingValuesAsErrors(false)
.addDecoder(new DataSizeDecoder()) .addDecoder(new DataSizeDecoder())
.addDecoder(new DbCompressionDecoder()) .addDecoder(new DbCompressionDecoder())

View File

@ -7,8 +7,8 @@ import it.cavallium.rockserver.core.common.api.ColumnHashType;
import it.cavallium.rockserver.core.common.api.ColumnSchema; import it.cavallium.rockserver.core.common.api.ColumnSchema;
import it.cavallium.rockserver.core.common.api.Delta; import it.cavallium.rockserver.core.common.api.Delta;
import it.cavallium.rockserver.core.common.api.OptionalBinary; import it.cavallium.rockserver.core.common.api.OptionalBinary;
import it.cavallium.rockserver.core.common.api.RocksDB.AsyncIface; import it.cavallium.rockserver.core.common.api.RocksDB.Iface;
import it.cavallium.rockserver.core.common.api.RocksDB.AsyncProcessor; import it.cavallium.rockserver.core.common.api.RocksDB.Processor;
import it.cavallium.rockserver.core.common.api.UpdateBegin; import it.cavallium.rockserver.core.common.api.UpdateBegin;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
@ -21,10 +21,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer; import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -35,12 +32,12 @@ public class ThriftServer extends Server {
public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
super(client); super(client);
var handler = new AsyncThriftHandler(client); var handler = new ThriftHandler(client);
try { try {
var serverTransport = new TNonblockingServerSocket(new InetSocketAddress(http2Host, http2Port)); var serverTransport = new TNonblockingServerSocket(new InetSocketAddress(http2Host, http2Port));
var server = new TNonblockingServer(new TNonblockingServer.Args(serverTransport) var server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport)
.processor(new AsyncProcessor<>(handler)) .processor(new Processor<>(handler))
); );
server.serve(); server.serve();
@ -97,55 +94,9 @@ public class ThriftServer extends Server {
return it.cavallium.rockserver.core.common.ColumnHashType.valueOf(variableTailKey.name()); return it.cavallium.rockserver.core.common.ColumnHashType.valueOf(variableTailKey.name());
} }
private static <T> BiConsumer<? super T, ? super Throwable> handleResult(AsyncMethodCallback<T> resultHandler) {
return (result, error) -> {
if (error != null) {
if (error instanceof Exception ex) {
resultHandler.onError(ex);
} else {
resultHandler.onError(new Exception(error));
}
} else {
resultHandler.onComplete(result);
}
};
}
private static <T> BiConsumer<? super T, ? super Throwable> handleResultWithArena(Arena arena,
AsyncMethodCallback<T> resultHandler) {
return (result, error) -> {
arena.close();
if (error != null) {
if (error instanceof Exception ex) {
resultHandler.onError(ex);
} else {
resultHandler.onError(new Exception(error));
}
} else {
resultHandler.onComplete(result);
}
};
}
private static BiConsumer<? super List<Void>, ? super Throwable> handleResultListWithArena(Arena arena,
AsyncMethodCallback<Void> resultHandler) {
return (result, error) -> {
arena.close();
if (error != null) {
if (error instanceof Exception ex) {
resultHandler.onError(ex);
} else {
resultHandler.onError(new Exception(error));
}
} else {
resultHandler.onComplete(null);
}
};
}
private static OptionalBinary mapResult(MemorySegment memorySegment) { private static OptionalBinary mapResult(MemorySegment memorySegment) {
var result = new OptionalBinary(); var result = new OptionalBinary();
return memorySegment != null ? result.setValue(memorySegment.asByteBuffer()) : result; return memorySegment != null ? result.setValue(ByteBuffer.wrap(memorySegment.toArray(BYTE_BE))) : result;
} }
private static UpdateBegin mapResult(UpdateContext<MemorySegment> context) { private static UpdateBegin mapResult(UpdateContext<MemorySegment> context) {
@ -164,225 +115,187 @@ public class ThriftServer extends Server {
return multi.stream().map(ThriftServer::mapResult).toList(); return multi.stream().map(ThriftServer::mapResult).toList();
} }
private static class AsyncThriftHandler implements AsyncIface { private static class ThriftHandler implements Iface {
private final RocksDBConnection client; private final RocksDBConnection client;
public AsyncThriftHandler(RocksDBConnection client) { public ThriftHandler(RocksDBConnection client) {
this.client = client; this.client = client;
} }
@Override @Override
public void openTransaction(long timeoutMs, AsyncMethodCallback<Long> resultHandler) { public long openTransaction(long timeoutMs) {
client.getAsyncApi().openTransactionAsync(timeoutMs).whenComplete(handleResult(resultHandler)); return client.getSyncApi().openTransaction(timeoutMs);
} }
@Override @Override
public void closeTransaction(long timeoutMs, boolean commit, AsyncMethodCallback<Boolean> resultHandler) { public boolean closeTransaction(long timeoutMs, boolean commit) {
client.getAsyncApi().closeTransactionAsync(timeoutMs, commit).whenComplete(handleResult(resultHandler)); return client.getSyncApi().closeTransaction(timeoutMs, commit);
} }
@Override @Override
public void closeFailedUpdate(long updateId, AsyncMethodCallback<Void> resultHandler) { public void closeFailedUpdate(long updateId) {
client.getAsyncApi().closeFailedUpdateAsync(updateId).whenComplete(handleResult(resultHandler)); client.getSyncApi().closeFailedUpdate(updateId);
} }
@Override @Override
public void createColumn(String name, ColumnSchema schema, AsyncMethodCallback<Long> resultHandler) { public long createColumn(String name, ColumnSchema schema) {
client.getAsyncApi().createColumnAsync(name, columnSchemaToRecord(schema)) return client.getSyncApi().createColumn(name, columnSchemaToRecord(schema));
.whenComplete(handleResult(resultHandler));
} }
@Override @Override
public void deleteColumn(long columnId, AsyncMethodCallback<Void> resultHandler) { public void deleteColumn(long columnId) {
client.getAsyncApi().deleteColumnAsync(columnId).whenComplete(handleResult(resultHandler)); client.getSyncApi().deleteColumn(columnId);
} }
@Override @Override
public void getColumnId(String name, AsyncMethodCallback<Long> resultHandler) { public long getColumnId(String name) {
client.getAsyncApi().getColumnIdAsync(name).whenComplete(handleResult(resultHandler)); return client.getSyncApi().getColumnId(name);
} }
@Override @Override
public void putFast(long transactionOrUpdateId, public void putFast(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys,
ByteBuffer value, ByteBuffer value) {
AsyncMethodCallback<Void> resultHandler) { this.put(transactionOrUpdateId, columnId, keys, value);
this.put(transactionOrUpdateId, columnId, keys, value, resultHandler);
} }
@Override @Override
public void put(long transactionOrUpdateId, public void put(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys,
ByteBuffer value, ByteBuffer value) {
AsyncMethodCallback<Void> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.none());
client.getAsyncApi() }
.putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.none())
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void putMulti(long transactionOrUpdateId, public void putMulti(long transactionOrUpdateId,
long columnId, long columnId,
List<List<ByteBuffer>> keysMulti, List<List<ByteBuffer>> keysMulti,
List<ByteBuffer> valueMulti, List<ByteBuffer> valueMulti) {
AsyncMethodCallback<Void> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); client.getSyncApi().putMulti(arena, transactionOrUpdateId, columnId, keysToRecords(arena, keysMulti), keyToRecords(arena, valueMulti), RequestType.none());
client.getAsyncApi() }
.putMultiAsync(arena, transactionOrUpdateId, columnId, keysToRecords(arena, keysMulti), keyToRecords(arena, valueMulti), RequestType.none())
.whenComplete(handleResultListWithArena(arena, resultHandler));
} }
@Override @Override
public void putGetPrevious(long transactionOrUpdateId, public OptionalBinary putGetPrevious(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys,
ByteBuffer value, ByteBuffer value) {
AsyncMethodCallback<OptionalBinary> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return ThriftServer.mapResult(client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previous()));
client.getAsyncApi() }
.putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previous())
.thenApply(ThriftServer::mapResult)
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void putGetDelta(long transactionOrUpdateId, public Delta putGetDelta(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys,
ByteBuffer value, ByteBuffer value) {
AsyncMethodCallback<Delta> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return ThriftServer.mapResult(client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.delta()));
client.getAsyncApi() }
.putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.delta())
.thenApply(ThriftServer::mapResult)
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void putGetChanged(long transactionOrUpdateId, public boolean putGetChanged(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys,
ByteBuffer value, ByteBuffer value) {
AsyncMethodCallback<Boolean> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.changed());
client }
.getAsyncApi()
.putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.changed())
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void putGetPreviousPresence(long transactionOrUpdateId, public boolean putGetPreviousPresence(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys,
ByteBuffer value, ByteBuffer value) {
AsyncMethodCallback<Boolean> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return client.getSyncApi().put(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previousPresence());
client }
.getAsyncApi()
.putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previousPresence())
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void get(long transactionOrUpdateId, public OptionalBinary get(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys) {
AsyncMethodCallback<OptionalBinary> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return ThriftServer.mapResult(client.getSyncApi().get(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.current()));
client.getAsyncApi() }
.getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.current())
.thenApply(ThriftServer::mapResult)
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void getForUpdate(long transactionOrUpdateId, public UpdateBegin getForUpdate(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys) {
AsyncMethodCallback<UpdateBegin> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return mapResult(client.getSyncApi().get(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.forUpdate()));
client.getAsyncApi() }
.getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.forUpdate())
.thenApply(ThriftServer::mapResult)
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void exists(long transactionOrUpdateId, public boolean exists(long transactionOrUpdateId,
long columnId, long columnId,
List<ByteBuffer> keys, List<ByteBuffer> keys) {
AsyncMethodCallback<Boolean> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return client.getSyncApi().get(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.exists());
client.getAsyncApi() }
.getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.exists())
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void openIterator(long transactionId, public long openIterator(long transactionId,
long columnId, long columnId,
List<ByteBuffer> startKeysInclusive, List<ByteBuffer> startKeysInclusive,
List<ByteBuffer> endKeysExclusive, List<ByteBuffer> endKeysExclusive,
boolean reverse, boolean reverse,
long timeoutMs, long timeoutMs) {
AsyncMethodCallback<Long> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return client.getSyncApi().openIterator(arena, transactionId, columnId, keysToRecord(arena, startKeysInclusive), keysToRecord(arena, endKeysExclusive), reverse, timeoutMs);
client }
.getAsyncApi()
.openIteratorAsync(arena, transactionId, columnId, keysToRecord(arena,
startKeysInclusive), keysToRecord(arena, endKeysExclusive), reverse, timeoutMs)
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void closeIterator(long iteratorId, AsyncMethodCallback<Void> resultHandler) { public void closeIterator(long iteratorId) {
client.getAsyncApi() client.getSyncApi().closeIterator(iteratorId);
.closeIteratorAsync(iteratorId)
.whenComplete(handleResult(resultHandler));
} }
@Override @Override
public void seekTo(long iterationId, List<ByteBuffer> keys, AsyncMethodCallback<Void> resultHandler) { public void seekTo(long iterationId, List<ByteBuffer> keys) {
var arena = Arena.ofShared(); try (var arena = Arena.ofConfined()) {
client.getAsyncApi() client.getSyncApi().seekTo(arena, iterationId, keysToRecord(arena, keys));
.seekToAsync(arena, iterationId, keysToRecord(arena, keys)) }
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void subsequent(long iterationId, long skipCount, long takeCount, AsyncMethodCallback<Void> resultHandler) { public void subsequent(long iterationId, long skipCount, long takeCount) {
var arena = Arena.ofShared(); try (var arena = Arena.ofConfined()) {
client.getAsyncApi() client.getSyncApi().subsequent(arena, iterationId, skipCount, takeCount, RequestType.none());
.subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.none()) }
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void subsequentExists(long iterationId, public boolean subsequentExists(long iterationId,
long skipCount, long skipCount,
long takeCount, long takeCount) {
AsyncMethodCallback<Boolean> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return client.getSyncApi().subsequent(arena, iterationId, skipCount, takeCount, RequestType.exists());
client.getAsyncApi() }
.subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.exists())
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
@Override @Override
public void subsequentMultiGet(long iterationId, public List<OptionalBinary> subsequentMultiGet(long iterationId,
long skipCount, long skipCount,
long takeCount, long takeCount) {
AsyncMethodCallback<List<OptionalBinary>> resultHandler) { try (var arena = Arena.ofConfined()) {
var arena = Arena.ofShared(); return mapResult(client.getSyncApi().subsequent(arena, iterationId, skipCount, takeCount, RequestType.multi()));
client.getAsyncApi() }
.subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.multi())
.thenApply(ThriftServer::mapResult)
.whenComplete(handleResultWithArena(arena, resultHandler));
} }
} }
} }

View File

@ -1,7 +1,6 @@
module rockserver.core { module rockserver.core {
requires rocksdbjni; requires rocksdbjni;
requires net.sourceforge.argparse4j; requires net.sourceforge.argparse4j;
requires inet.ipaddr;
requires java.logging; requires java.logging;
requires org.jetbrains.annotations; requires org.jetbrains.annotations;
requires high.scale.lib; requires high.scale.lib;

View File

@ -3,6 +3,7 @@ package it.cavallium.rockserver.core.test;
import it.cavallium.rockserver.core.impl.ColumnInstance; import it.cavallium.rockserver.core.impl.ColumnInstance;
import it.cavallium.rockserver.core.impl.XXHash32; import it.cavallium.rockserver.core.impl.XXHash32;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout.OfByte; import java.lang.foreign.ValueLayout.OfByte;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -21,7 +22,7 @@ public class XXHash32Test {
var hash = safeXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE); var hash = safeXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE);
var a = Arena.global(); var a = Arena.global();
var result = a.allocate(Integer.BYTES); var result = a.allocate(Integer.BYTES);
myXxhash32.hash(a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE, result); myXxhash32.hash(a.allocate(bytes.length).copyFrom(MemorySegment.ofArray(bytes)), 0, bytes.length, Integer.MIN_VALUE, result);
var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0); var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0);
Assertions.assertEquals(hash, resultInt); Assertions.assertEquals(hash, resultInt);
} }
@ -38,7 +39,7 @@ public class XXHash32Test {
var hash = myXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE); var hash = myXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE);
var a = Arena.global(); var a = Arena.global();
var result = a.allocate(Integer.BYTES); var result = a.allocate(Integer.BYTES);
myXxhash32.hash(a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE, result); myXxhash32.hash(a.allocate(bytes.length).copyFrom(MemorySegment.ofArray(bytes)), 0, bytes.length, Integer.MIN_VALUE, result);
var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0); var resultInt = result.get(ColumnInstance.BIG_ENDIAN_INT, 0);
Assertions.assertEquals(hash, resultInt); Assertions.assertEquals(hash, resultInt);
} }