diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index 8e54d69..ba41a5b 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -17,7 +17,10 @@ import java.net.URI; import java.nio.file.Path; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -25,10 +28,12 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { private final EmbeddedDB db; public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private"); + private final ExecutorService exeuctor; public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) { super(name); this.db = new EmbeddedDB(path, name, embeddedConfig); + this.exeuctor = Executors.newVirtualThreadPerTaskExecutor(); } @Override @@ -89,7 +94,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { @Override public CompletionStage requestAsync(RocksDBAPICommand req) { - return req.handleAsync(this); + return CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); } @Override diff --git a/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java b/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java index f208bbb..31c59cb 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/ThriftServer.java @@ -7,7 +7,6 @@ import it.cavallium.rockserver.core.common.api.ColumnHashType; import it.cavallium.rockserver.core.common.api.ColumnSchema; import it.cavallium.rockserver.core.common.api.Delta; import it.cavallium.rockserver.core.common.api.OptionalBinary; -import it.cavallium.rockserver.core.common.api.RocksDB; import it.cavallium.rockserver.core.common.api.RocksDB.AsyncIface; import it.cavallium.rockserver.core.common.api.RocksDB.AsyncProcessor; import it.cavallium.rockserver.core.common.api.UpdateBegin; @@ -16,214 +15,32 @@ import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.lang.foreign.ValueLayout.OfByte; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.function.BiConsumer; import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.server.TNonblockingServer; -import org.apache.thrift.server.TNonblockingServer.Args; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TTransportException; import org.jetbrains.annotations.NotNull; public class ThriftServer extends Server { + private static final OfByte BYTE_BE = ValueLayout.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN); + public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { super(client); - var handler = new RocksDB.AsyncIface() { - @Override - public void openTransaction(long timeoutMs, AsyncMethodCallback resultHandler) { - client.getAsyncApi().openTransactionAsync(timeoutMs).whenComplete(handleResult(resultHandler)); - } - - @Override - public void closeTransaction(long timeoutMs, boolean commit, AsyncMethodCallback resultHandler) { - client.getAsyncApi().closeTransactionAsync(timeoutMs, commit).whenComplete(handleResult(resultHandler)); - } - - @Override - public void closeFailedUpdate(long updateId, AsyncMethodCallback resultHandler) { - client.getAsyncApi().closeFailedUpdateAsync(updateId).whenComplete(handleResult(resultHandler)); - } - - @Override - public void createColumn(String name, ColumnSchema schema, AsyncMethodCallback resultHandler) { - client.getAsyncApi().createColumnAsync(name, columnSchemaToRecord(schema)) - .whenComplete(handleResult(resultHandler)); - } - - @Override - public void deleteColumn(long columnId, AsyncMethodCallback resultHandler) { - client.getAsyncApi().deleteColumnAsync(columnId).whenComplete(handleResult(resultHandler)); - } - - @Override - public void getColumnId(String name, AsyncMethodCallback resultHandler) { - client.getAsyncApi().getColumnIdAsync(name).whenComplete(handleResult(resultHandler)); - } - - @Override - public void put(long transactionOrUpdateId, - long columnId, - List keys, - ByteBuffer value, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), keyToRecord(value), RequestType.none()) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void putGetPrevious(long transactionOrUpdateId, - long columnId, - List keys, - ByteBuffer value, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), keyToRecord(value), RequestType.previous()) - .thenApply(ThriftServer::mapResult) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void putGetDelta(long transactionOrUpdateId, - long columnId, - List keys, - ByteBuffer value, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), keyToRecord(value), RequestType.delta()) - .thenApply(ThriftServer::mapResult) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void putGetChanged(long transactionOrUpdateId, - long columnId, - List keys, - ByteBuffer value, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), keyToRecord(value), RequestType.changed()) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void putGetPreviousPresence(long transactionOrUpdateId, - long columnId, - List keys, - ByteBuffer value, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), keyToRecord(value), RequestType.previousPresence()) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void get(long transactionOrUpdateId, - long columnId, - List keys, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), RequestType.current()) - .thenApply(ThriftServer::mapResult) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void getForUpdate(long transactionOrUpdateId, - long columnId, - List keys, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), RequestType.forUpdate()) - .thenApply(ThriftServer::mapResult) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void exists(long transactionOrUpdateId, - long columnId, - List keys, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(keys), RequestType.exists()) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void openIterator(long transactionId, - long columnId, - List startKeysInclusive, - List endKeysExclusive, - boolean reverse, - long timeoutMs, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .openIteratorAsync(arena, transactionId, columnId, keysToRecord(startKeysInclusive), keysToRecord(endKeysExclusive), reverse, timeoutMs) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void closeIterator(long iteratorId, AsyncMethodCallback resultHandler) { - client.getAsyncApi() - .closeIteratorAsync(iteratorId) - .whenComplete(handleResult(resultHandler)); - } - - @Override - public void seekTo(long iterationId, List keys, AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .seekToAsync(arena, iterationId, keysToRecord(keys)) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void subsequent(long iterationId, long skipCount, long takeCount, AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.none()) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void subsequentExists(long iterationId, - long skipCount, - long takeCount, - AsyncMethodCallback resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.exists()) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - - @Override - public void subsequentMultiGet(long iterationId, - long skipCount, - long takeCount, - AsyncMethodCallback> resultHandler) { - var arena = Arena.ofShared(); - client.getAsyncApi() - .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.multi()) - .thenApply(ThriftServer::mapResult) - .whenComplete(handleResultWithArena(arena, resultHandler)); - } - }; + var handler = new AsyncThriftHandler(client); try { var serverTransport = new TNonblockingServerSocket(new InetSocketAddress(http2Host, http2Port)); - var server = new TNonblockingServer(new Args(serverTransport).processor(new AsyncProcessor<>(handler))); + var server = new TNonblockingServer(new TNonblockingServer.Args(serverTransport) + .processor(new AsyncProcessor<>(handler)) + ); server.serve(); } catch (TTransportException e) { @@ -231,31 +48,35 @@ public class ThriftServer extends Server { } } - private @NotNull MemorySegment [] keysToRecord(List<@NotNull ByteBuffer> keys) { + private static MemorySegment[] keysToRecord(Arena arena, List<@NotNull ByteBuffer> keys) { if (keys == null) { return null; } var result = new MemorySegment[keys.size()]; int i = 0; for (ByteBuffer key : keys) { - result[i] = keyToRecord(key); + result[i] = keyToRecord(arena, key); i++; } return result; } - private @NotNull MemorySegment keyToRecord(@NotNull ByteBuffer key) { - return MemorySegment.ofBuffer(key); + private static @NotNull MemorySegment keyToRecord(Arena arena, @NotNull ByteBuffer key) { + if (key.isDirect()) { + return MemorySegment.ofBuffer(key); + } else { + return arena.allocate(key.remaining()).copyFrom(MemorySegment.ofBuffer(key)); + } } - private it.cavallium.rockserver.core.common.ColumnSchema columnSchemaToRecord(ColumnSchema schema) { + private static it.cavallium.rockserver.core.common.ColumnSchema columnSchemaToRecord(ColumnSchema schema) { return it.cavallium.rockserver.core.common.ColumnSchema.of(new IntArrayList(schema.getFixedKeys()), hashTypesToRecord(schema.getVariableTailKeys()), schema.isHasValue() ); } - private ObjectArrayList hashTypesToRecord(List variableTailKeys) { + private static ObjectArrayList hashTypesToRecord(List variableTailKeys) { var result = new ObjectArrayList(); for (ColumnHashType variableTailKey : variableTailKeys) { result.add(hashTypeToRecord(variableTailKey)); @@ -263,7 +84,7 @@ public class ThriftServer extends Server { return result; } - private it.cavallium.rockserver.core.common.ColumnHashType hashTypeToRecord(ColumnHashType variableTailKey) { + private static it.cavallium.rockserver.core.common.ColumnHashType hashTypeToRecord(ColumnHashType variableTailKey) { return it.cavallium.rockserver.core.common.ColumnHashType.valueOf(variableTailKey.name()); } @@ -316,4 +137,205 @@ public class ThriftServer extends Server { private static List mapResult(List multi) { return multi.stream().map(ThriftServer::mapResult).toList(); } + + private static class AsyncThriftHandler implements AsyncIface { + + private final RocksDBConnection client; + + public AsyncThriftHandler(RocksDBConnection client) { + this.client = client; + } + + @Override + public void openTransaction(long timeoutMs, AsyncMethodCallback resultHandler) { + client.getAsyncApi().openTransactionAsync(timeoutMs).whenComplete(handleResult(resultHandler)); + } + + @Override + public void closeTransaction(long timeoutMs, boolean commit, AsyncMethodCallback resultHandler) { + client.getAsyncApi().closeTransactionAsync(timeoutMs, commit).whenComplete(handleResult(resultHandler)); + } + + @Override + public void closeFailedUpdate(long updateId, AsyncMethodCallback resultHandler) { + client.getAsyncApi().closeFailedUpdateAsync(updateId).whenComplete(handleResult(resultHandler)); + } + + @Override + public void createColumn(String name, ColumnSchema schema, AsyncMethodCallback resultHandler) { + client.getAsyncApi().createColumnAsync(name, columnSchemaToRecord(schema)) + .whenComplete(handleResult(resultHandler)); + } + + @Override + public void deleteColumn(long columnId, AsyncMethodCallback resultHandler) { + client.getAsyncApi().deleteColumnAsync(columnId).whenComplete(handleResult(resultHandler)); + } + + @Override + public void getColumnId(String name, AsyncMethodCallback resultHandler) { + client.getAsyncApi().getColumnIdAsync(name).whenComplete(handleResult(resultHandler)); + } + + @Override + public void put(long transactionOrUpdateId, + long columnId, + List keys, + ByteBuffer value, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.none()) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void putGetPrevious(long transactionOrUpdateId, + long columnId, + List keys, + ByteBuffer value, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previous()) + .thenApply(ThriftServer::mapResult) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void putGetDelta(long transactionOrUpdateId, + long columnId, + List keys, + ByteBuffer value, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.delta()) + .thenApply(ThriftServer::mapResult) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void putGetChanged(long transactionOrUpdateId, + long columnId, + List keys, + ByteBuffer value, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client + .getAsyncApi() + .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.changed()) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void putGetPreviousPresence(long transactionOrUpdateId, + long columnId, + List keys, + ByteBuffer value, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client + .getAsyncApi() + .putAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), keyToRecord(arena, value), RequestType.previousPresence()) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void get(long transactionOrUpdateId, + long columnId, + List keys, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.current()) + .thenApply(ThriftServer::mapResult) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void getForUpdate(long transactionOrUpdateId, + long columnId, + List keys, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.forUpdate()) + .thenApply(ThriftServer::mapResult) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void exists(long transactionOrUpdateId, + long columnId, + List keys, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .getAsync(arena, transactionOrUpdateId, columnId, keysToRecord(arena, keys), RequestType.exists()) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void openIterator(long transactionId, + long columnId, + List startKeysInclusive, + List endKeysExclusive, + boolean reverse, + long timeoutMs, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client + .getAsyncApi() + .openIteratorAsync(arena, transactionId, columnId, keysToRecord(arena, + startKeysInclusive), keysToRecord(arena, endKeysExclusive), reverse, timeoutMs) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void closeIterator(long iteratorId, AsyncMethodCallback resultHandler) { + client.getAsyncApi() + .closeIteratorAsync(iteratorId) + .whenComplete(handleResult(resultHandler)); + } + + @Override + public void seekTo(long iterationId, List keys, AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .seekToAsync(arena, iterationId, keysToRecord(arena, keys)) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void subsequent(long iterationId, long skipCount, long takeCount, AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.none()) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void subsequentExists(long iterationId, + long skipCount, + long takeCount, + AsyncMethodCallback resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.exists()) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + + @Override + public void subsequentMultiGet(long iterationId, + long skipCount, + long takeCount, + AsyncMethodCallback> resultHandler) { + var arena = Arena.ofShared(); + client.getAsyncApi() + .subsequentAsync(arena, iterationId, skipCount, takeCount, RequestType.multi()) + .thenApply(ThriftServer::mapResult) + .whenComplete(handleResultWithArena(arena, resultHandler)); + } + } }