Remove datagen, use thrift serialization

This commit is contained in:
Andrea Cavalli 2024-03-29 00:00:12 +01:00
parent b326a2f981
commit 4d0e8eb2d4
6 changed files with 19 additions and 322 deletions

16
pom.xml
View File

@ -14,6 +14,7 @@
<native.maven.plugin.version>0.9.28</native.maven.plugin.version> <native.maven.plugin.version>0.9.28</native.maven.plugin.version>
<gestalt.version>0.25.3</gestalt.version> <gestalt.version>0.25.3</gestalt.version>
<rocksdb.version>9.0.0</rocksdb.version> <rocksdb.version>9.0.0</rocksdb.version>
<slf4j.version>2.0.12</slf4j.version>
<imageName>rockserver-core</imageName> <imageName>rockserver-core</imageName>
<mainClass>it.cavallium.rockserver.core.Main</mainClass> <mainClass>it.cavallium.rockserver.core.Main</mainClass>
</properties> </properties>
@ -107,6 +108,11 @@
<artifactId>libthrift</artifactId> <artifactId>libthrift</artifactId>
<version>0.20.0</version> <version>0.20.0</version>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.lz4</groupId> <groupId>org.lz4</groupId>
@ -207,6 +213,16 @@
</build> </build>
<profiles> <profiles>
<profile>
<id>standalone</id>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
</profile>
<profile> <profile>
<id>native</id> <id>native</id>
<build> <build>

View File

@ -1,114 +1,17 @@
package it.cavallium.rockserver.core.common; package it.cavallium.rockserver.core.common;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestIterate;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RequestType.RequestTypeId;
import it.cavallium.rockserver.core.impl.ColumnInstance;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectList;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
public sealed interface RocksDBAPICommand<R> { public sealed interface RocksDBAPICommand<R> {
enum CommandTypeId {
OPEN_TX,
CLOSE_TX,
CLOSE_FAILED_UPDATE,
CREATE_COLUMN,
DELETE_COLUMN,
GET_COLUMN_ID,
PUT,
GET,
OPEN_ITERATOR,
CLOSE_ITERATOR,
SEEK_TO,
SUBSEQUENT
}
R handleSync(RocksDBSyncAPI api); R handleSync(RocksDBSyncAPI api);
CompletionStage<R> handleAsync(RocksDBAsyncAPI api); CompletionStage<R> handleAsync(RocksDBAsyncAPI api);
void serializeToBuffer(BufDataOutput out);
CommandTypeId getCommandTypeId();
static <T> RocksDBAPICommand<T> deserializeCommand(Arena arena, BufDataInput in) {
return (RocksDBAPICommand<T>) switch (CommandTypeId.values()[in.readUnsignedByte()]) {
case OPEN_TX -> new OpenTransaction(in.readLong());
case CLOSE_TX -> new CloseTransaction(in.readLong(), in.readBoolean());
case CLOSE_FAILED_UPDATE -> new CloseFailedUpdate(in.readLong());
case CREATE_COLUMN -> {
var name = in.readShortText(StandardCharsets.UTF_8);
var keys = new int[in.readInt()];
for (int i = 0; i < keys.length; i++) {
keys[i] = in.readInt();
}
ColumnHashType[] variableTailKeys = new ColumnHashType[in.readInt()];
for (int i = 0; i < variableTailKeys.length; i++) {
variableTailKeys[i] = ColumnHashType.values()[in.readUnsignedByte()];
}
boolean hasValue = in.readBoolean();
var columnSchema = new ColumnSchema(IntArrayList.wrap(keys),
ObjectArrayList.wrap(variableTailKeys),
hasValue
);
yield new CreateColumn(name, columnSchema);
}
case DELETE_COLUMN -> new DeleteColumn(in.readLong());
case GET_COLUMN_ID -> new GetColumnId(in.readShortText(StandardCharsets.UTF_8));
case PUT -> {
var transactionOrUpdateId = in.readLong();
var columnId = in.readLong();
var keys = SerializationUtils.deserializeMemorySegmentArray(in);
var value = SerializationUtils.deserializeMemorySegment(in);
//noinspection unchecked
var requestType = (RequestPut<? super MemorySegment, ?>)
RequestTypeId.values()[in.readUnsignedByte()].getRequestType();
yield new Put<>(arena, transactionOrUpdateId, columnId, keys, value, requestType);
}
case GET -> {
var transactionOrUpdateId = in.readLong();
var columnId = in.readLong();
var keys = SerializationUtils.deserializeMemorySegmentArray(in);
//noinspection unchecked
var requestType = (RequestGet<? super MemorySegment, ?>)
RequestTypeId.values()[in.readUnsignedByte()].getRequestType();
yield new Get<>(arena, transactionOrUpdateId, columnId, keys, requestType);
}
case OPEN_ITERATOR -> {
var transactionId = in.readLong();
var columnId = in.readLong();
var startKeysInclusive = SerializationUtils.deserializeMemorySegmentArray(in);
var endKeysExclusive = SerializationUtils.deserializeNullableMemorySegmentArray(in);
var reverse = in.readBoolean();
var timeoutMs = in.readLong();
yield new OpenIterator(arena, transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs);
}
case CLOSE_ITERATOR -> new CloseIterator(in.readLong());
case SEEK_TO -> {
var iterationId = in.readLong();
var keys = SerializationUtils.deserializeMemorySegmentArray(in);
yield new SeekTo(arena, iterationId, keys);
}
case SUBSEQUENT -> {
var iterationId = in.readLong();
var skipCount = in.readLong();
var takeCount = in.readLong();
//noinspection unchecked
var requestType = (RequestIterate<? super MemorySegment, ?>)
RequestTypeId.values()[in.readUnsignedByte()].getRequestType();
yield new Subsequent<>(arena, iterationId, skipCount, takeCount, requestType);
}
};
}
/** /**
* Open a transaction * Open a transaction
@ -127,15 +30,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.openTransactionAsync(timeoutMs); return api.openTransactionAsync(timeoutMs);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(timeoutMs);
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.OPEN_TX;
}
} }
/** /**
* Close a transaction * Close a transaction
@ -156,16 +50,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.closeTransactionAsync(transactionId, commit); return api.closeTransactionAsync(transactionId, commit);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(transactionId);
out.writeBoolean(commit);
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.CLOSE_TX;
}
} }
/** /**
* Close a failed update, discarding all changes * Close a failed update, discarding all changes
@ -185,15 +69,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.closeFailedUpdateAsync(updateId); return api.closeFailedUpdateAsync(updateId);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(updateId);
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.CLOSE_FAILED_UPDATE;
}
} }
/** /**
* Create a column * Create a column
@ -213,24 +88,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.createColumnAsync(name, schema); return api.createColumnAsync(name, schema);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeShortText(name, StandardCharsets.UTF_8);
var keys = schema.keys();
out.writeInt(keys.size());
keys.forEach(out::writeInt);
var variableTailKeys = schema.variableTailKeys();
out.writeInt(variableTailKeys.size());
for (ColumnHashType variableTailKey : variableTailKeys) {
out.writeByte(variableTailKey.ordinal());
}
out.writeBoolean(schema.hasValue());
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.CREATE_COLUMN;
}
} }
/** /**
* Delete a column * Delete a column
@ -249,15 +106,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.deleteColumnAsync(columnId); return api.deleteColumnAsync(columnId);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(columnId);
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.DELETE_COLUMN;
}
} }
/** /**
* Get column id by name * Get column id by name
@ -276,15 +124,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.getColumnIdAsync(name); return api.getColumnIdAsync(name);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeShortText(name, StandardCharsets.UTF_8);
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.GET_COLUMN_ID;
}
} }
/** /**
* Put an element into the specified position * Put an element into the specified position
@ -312,26 +151,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType); return api.putAsync(arena, transactionOrUpdateId, columnId, keys, value, requestType);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(transactionOrUpdateId);
out.writeLong(columnId);
out.writeInt(keys.length);
for (MemorySegment key : keys) {
var array = key.toArray(ColumnInstance.BIG_ENDIAN_BYTES);
out.writeInt(array.length);
out.writeBytes(array, 0, array.length);
}
var valueArray = value.toArray(ColumnInstance.BIG_ENDIAN_BYTES);
out.writeInt(valueArray.length);
out.writeBytes(valueArray, 0, valueArray.length);
out.writeByte(requestType.getRequestTypeId().ordinal());
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.PUT;
}
} }
/** /**
* Get an element from the specified position * Get an element from the specified position
@ -357,23 +176,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType); return api.getAsync(arena, transactionOrUpdateId, columnId, keys, requestType);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(transactionOrUpdateId);
out.writeLong(columnId);
out.writeInt(keys.length);
for (MemorySegment key : keys) {
var array = key.toArray(ColumnInstance.BIG_ENDIAN_BYTES);
out.writeInt(array.length);
out.writeBytes(array, 0, array.length);
}
out.writeByte(requestType.getRequestTypeId().ordinal());
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.GET;
}
} }
/** /**
* Open an iterator * Open an iterator
@ -411,33 +213,6 @@ public sealed interface RocksDBAPICommand<R> {
); );
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(transactionId);
out.writeLong(columnId);
out.writeInt(startKeysInclusive.length);
SerializationUtils.serializeMemorySegmentArray(out, startKeysInclusive);
for (MemorySegment key : startKeysInclusive) {
var array = key.toArray(ColumnInstance.BIG_ENDIAN_BYTES);
out.writeInt(array.length);
out.writeBytes(array, 0, array.length);
}
out.writeInt(endKeysExclusive == null ? -1 : endKeysExclusive.length);
if (endKeysExclusive != null) {
for (MemorySegment key : endKeysExclusive) {
var array = key.toArray(ColumnInstance.BIG_ENDIAN_BYTES);
out.writeInt(array.length);
out.writeBytes(array, 0, array.length);
}
}
out.writeBoolean(reverse);
out.writeLong(timeoutMs);
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.OPEN_ITERATOR;
}
} }
/** /**
* Close an iterator * Close an iterator
@ -456,15 +231,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.closeIteratorAsync(iteratorId); return api.closeIteratorAsync(iteratorId);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(iteratorId);
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.CLOSE_ITERATOR;
}
} }
/** /**
* Seek to the specific element during an iteration, or the subsequent one if not found * Seek to the specific element during an iteration, or the subsequent one if not found
@ -486,21 +252,6 @@ public sealed interface RocksDBAPICommand<R> {
return api.seekToAsync(arena, iterationId, keys); return api.seekToAsync(arena, iterationId, keys);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(iterationId);
out.writeInt(keys.length);
for (MemorySegment key : keys) {
var array = key.toArray(ColumnInstance.BIG_ENDIAN_BYTES);
out.writeInt(array.length);
out.writeBytes(array, 0, array.length);
}
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.SEEK_TO;
}
} }
/** /**
* Get the subsequent element during an iteration * Get the subsequent element during an iteration
@ -527,17 +278,5 @@ public sealed interface RocksDBAPICommand<R> {
return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType); return api.subsequentAsync(arena, iterationId, skipCount, takeCount, requestType);
} }
@Override
public void serializeToBuffer(BufDataOutput out) {
out.writeLong(iterationId);
out.writeLong(skipCount);
out.writeLong(takeCount);
out.writeByte(requestType.getRequestTypeId().ordinal());
}
@Override
public CommandTypeId getCommandTypeId() {
return CommandTypeId.SUBSEQUENT;
}
} }
} }

View File

@ -1,56 +0,0 @@
package it.cavallium.rockserver.core.common;
import it.cavallium.buffer.BufDataInput;
import it.cavallium.buffer.BufDataOutput;
import it.cavallium.rockserver.core.impl.ColumnInstance;
import java.lang.foreign.MemorySegment;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class SerializationUtils {
public static void serializeMemorySegment(BufDataOutput out, @NotNull MemorySegment segment) {
var array = segment.toArray(ColumnInstance.BIG_ENDIAN_BYTES);
out.writeInt(array.length);
out.writeBytes(array, 0, array.length);
}
public static MemorySegment deserializeMemorySegment(BufDataInput in) {
return MemorySegment.ofArray(in.readNBytes(in.readInt()));
}
public static void serializeNullableMemorySegmentArray(BufDataOutput out, @NotNull MemorySegment @Nullable [] array) {
out.writeInt(array != null ? array.length : -1);
if (array != null) {
for (MemorySegment memorySegment : array) {
serializeMemorySegment(out, memorySegment);
}
}
}
public static void serializeMemorySegmentArray(BufDataOutput out, @NotNull MemorySegment @NotNull [] array) {
out.writeInt(array.length);
for (MemorySegment memorySegment : array) {
serializeMemorySegment(out, memorySegment);
}
}
public static @NotNull MemorySegment @Nullable [] deserializeNullableMemorySegmentArray(BufDataInput in) {
int size = in.readInt();
if (size == -1) {
return null;
} else {
var array = new MemorySegment @NotNull[size];
for (int i = 0; i < array.length; i++) {
array[i] = deserializeMemorySegment(in);
}
return array;
}
}
public static @NotNull MemorySegment @NotNull [] deserializeMemorySegmentArray(BufDataInput in) {
var array = new MemorySegment @NotNull [in.readInt()];
for (int i = 0; i < array.length; i++) {
array[i] = deserializeMemorySegment(in);
}
return array;
}
}

View File

@ -649,7 +649,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
if (tx.isFromGetForUpdate()) { if (tx.isFromGetForUpdate()) {
previousRawBucketByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousRawBucketByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
} else { } else {
previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); previousRawBucketByteArray = tx.val().get(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES));
} }
return toMemorySegment(arena, previousRawBucketByteArray); return toMemorySegment(arena, previousRawBucketByteArray);
} else { } else {

View File

@ -29,11 +29,9 @@ import org.jetbrains.annotations.NotNull;
public class ThriftServer extends Server { public class ThriftServer extends Server {
private final AsyncIface handler;
public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException { public ThriftServer(RocksDBConnection client, String http2Host, int http2Port) throws IOException {
super(client); super(client);
this.handler = new RocksDB.AsyncIface() { var handler = new RocksDB.AsyncIface() {
@Override @Override
public void openTransaction(long timeoutMs, AsyncMethodCallback<Long> resultHandler) { public void openTransaction(long timeoutMs, AsyncMethodCallback<Long> resultHandler) {
client.getAsyncApi().openTransactionAsync(timeoutMs).whenComplete(handleResult(resultHandler)); client.getAsyncApi().openTransactionAsync(timeoutMs).whenComplete(handleResult(resultHandler));
@ -231,7 +229,6 @@ public class ThriftServer extends Server {
} catch (TTransportException e) { } catch (TTransportException e) {
throw new IOException("Can't open server socket", e); throw new IOException("Can't open server socket", e);
} }
} }
private @NotNull MemorySegment [] keysToRecord(List<@NotNull ByteBuffer> keys) { private @NotNull MemorySegment [] keysToRecord(List<@NotNull ByteBuffer> keys) {

View File

@ -19,6 +19,7 @@ module rockserver.core {
requires io.netty5.transport.unix.common; requires io.netty5.transport.unix.common;
requires io.netty5.codec.http; requires io.netty5.codec.http;
requires org.apache.thrift; requires org.apache.thrift;
requires org.slf4j;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;