From a11ae8cebcbab5dd6fee23018e8ad497c1970eb2 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 25 Sep 2024 11:11:09 +0200 Subject: [PATCH] Implement reactive sst batching --- pom.xml | 5 + src/fatjar/java/module-info.java | 1 + src/library/java/module-info.java | 1 + .../core/client/EmbeddedConnection.java | 20 +- .../core/client/GrpcConnection.java | 178 ++++++-- .../rockserver/core/common/KVBatch.java | 10 + .../core/common/RocksDBAPICommand.java | 40 +- .../core/common/RocksDBAsyncAPI.java | 8 +- .../core/common/RocksDBException.java | 11 +- .../core/common/RocksDBSyncAPI.java | 8 +- .../rockserver/core/impl/EmbeddedDB.java | 250 +++++++---- .../core/impl/rocksdb/RocksDBLoader.java | 418 +++++++++--------- .../core/impl/rocksdb/SSTWriter.java | 144 +++++- .../rockserver/core/impl/rocksdb/Tx.java | 6 + .../rockserver/core/server/GrpcServer.java | 232 +++++++--- src/main/proto/rocksdb.proto | 9 +- .../rockserver/core/resources/default.conf | 4 +- src/native/java/module-info.java | 1 + .../core/impl/test/EmbeddedDBTest.java | 84 +++- .../core/impl/test/TestSSTWriter.java | 208 +++++++++ src/test/java/module-info.java | 6 + 21 files changed, 1203 insertions(+), 441 deletions(-) create mode 100644 src/main/java/it/cavallium/rockserver/core/common/KVBatch.java create mode 100644 src/test/java/it/cavallium/rockserver/core/impl/test/TestSSTWriter.java diff --git a/pom.xml b/pom.xml index 1e5527f..ed785d2 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,11 @@ grpc-stub ${grpc.version} + + org.reactivestreams + reactive-streams + 1.0.4 + org.lz4 diff --git a/src/fatjar/java/module-info.java b/src/fatjar/java/module-info.java index 669fd59..f0e1a23 100644 --- a/src/fatjar/java/module-info.java +++ b/src/fatjar/java/module-info.java @@ -28,6 +28,7 @@ module rockserver.core { requires io.netty.codec.http2; requires jdk.unsupported; requires io.netty.transport.classes.epoll; + requires org.reactivestreams; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/library/java/module-info.java b/src/library/java/module-info.java index 71d4d6d..641528f 100644 --- a/src/library/java/module-info.java +++ b/src/library/java/module-info.java @@ -26,6 +26,7 @@ module rockserver.core { requires io.netty.codec.http2; requires jdk.unsupported; requires io.netty.transport.classes.epoll; + requires org.reactivestreams; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index 677fc3c..96bc779 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -17,6 +17,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { @@ -88,6 +89,10 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { @Override public CompletableFuture requestAsync(RocksDBAPICommand req) { + if (req instanceof RocksDBAPICommand.PutBatch putBatch) { + //noinspection unchecked + return (CompletableFuture) this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode()); + } return CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); } @@ -112,12 +117,17 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { } @Override - public void putBatch(Arena arena, - long columnId, - @NotNull List keys, - @NotNull List<@NotNull MemorySegment> values, + public CompletableFuture putBatchAsync(long columnId, + @NotNull Publisher<@NotNull KVBatch> batchPublisher, + @NotNull PutBatchMode mode) throws RocksDBException { + return db.putBatchInternal(columnId, batchPublisher, mode); + } + + @Override + public void putBatch(long columnId, + @NotNull Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws RocksDBException { - db.putBatch(arena, columnId, keys, values, mode); + db.putBatch(columnId, batchPublisher, mode); } @Override diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index 4bc0339..82d4133 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -11,9 +11,10 @@ import com.google.protobuf.UnsafeByteOperations; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; -import io.grpc.stub.StreamObserver; +import io.grpc.stub.*; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.KVBatch; import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; @@ -40,13 +41,18 @@ import java.lang.foreign.MemorySegment; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,20 +193,20 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { CompletableFuture> responseObserver; if (requestType instanceof RequestType.RequestNothing && transactionOrUpdateId == 0L) { - var putBatchRequestBuilder = PutBatchRequest.newBuilder() - .setColumnId(columnId) - .setMode(it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH); + return putBatchAsync(columnId, subscriber -> { + var sub = new Subscription() { + @Override + public void request(long l) { + } - var it1 = allKeys.iterator(); - var it2 = allValues.iterator(); + @Override + public void cancel() { - while (it1.hasNext()) { - var k = it1.next(); - var v = it2.next(); - putBatchRequestBuilder.addData(mapKV(k, v)); - } - - return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null); + } + }; + subscriber.onSubscribe(sub); + subscriber.onNext(new KVBatch(allKeys, allValues)); + }, PutBatchMode.WRITE_BATCH).thenApply(_ -> List.of()); } var initialRequest = PutMultiRequest.newBuilder() @@ -261,36 +267,105 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { } @Override - public CompletableFuture putBatchAsync(Arena arena, - long columnId, - @NotNull List<@NotNull Keys> allKeys, - @NotNull List<@NotNull MemorySegment> allValues, + public CompletableFuture putBatchAsync(long columnId, + @NotNull Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws RocksDBException { - var count = allKeys.size(); - if (count != allValues.size()) { - throw new IllegalArgumentException("Keys length is different than values length! " - + count + " != " + allValues.size()); - } + var cf = new CompletableFuture(); + var responseobserver = new ClientResponseObserver() { + private ClientCallStreamObserver requestStream; + private Subscription subscription; - var putBatchRequestBuilder = PutBatchRequest.newBuilder() - .setColumnId(columnId) - .setMode(switch (mode) { - case WRITE_BATCH -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH; - case WRITE_BATCH_NO_WAL -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH_NO_WAL; - case SST_INGESTION -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGESTION; - case SST_INGEST_BEHIND -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGEST_BEHIND; - }); + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + this.requestStream = requestStream; + // Set up manual flow control for the response stream. It feels backwards to configure the response + // stream's flow control using the request stream's observer, but this is the way it is. + requestStream.disableAutoRequestWithInitial(1); - var it1 = allKeys.iterator(); - var it2 = allValues.iterator(); + var subscriber = new Subscriber() { + private volatile boolean finalized; - while (it1.hasNext()) { - var k = it1.next(); - var v = it2.next(); - putBatchRequestBuilder.addData(mapKV(k, v)); - } + @Override + public void onSubscribe(Subscription subscription2) { + subscription = subscription2; + } - return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null); + @Override + public void onNext(KVBatch batch) { + var request = PutBatchRequest.newBuilder(); + request.setData(mapKVBatch(batch)); + requestStream.onNext(request.build()); + if (requestStream.isReady()) { + subscription.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + this.finalized = true; + requestStream.onError(throwable); + } + + @Override + public void onComplete() { + this.finalized = true; + requestStream.onCompleted(); + } + }; + + + batchPublisher.subscribe(subscriber); + + // Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked + // when the consuming side has enough buffer space to receive more messages. + // + // Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer, + // MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call + // request() to pull a buffered message from the client. + // + // Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming + // StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent + // additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return + // in a timely manner or else message processing throughput will suffer. + requestStream.setOnReadyHandler(new Runnable() { + + @Override + public void run() { + // Start generating values from where we left off on a non-gRPC thread. + subscription.request(1); + } + }); + } + + @Override + public void onNext(Empty empty) {} + + @Override + public void onError(Throwable throwable) { + cf.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + cf.complete(null); + } + }; + + var requestStream = asyncStub.putBatch(responseobserver); + + requestStream.onNext(PutBatchRequest.newBuilder() + .setInitialRequest(PutBatchInitialRequest.newBuilder() + .setColumnId(columnId) + .setMode(switch (mode) { + case WRITE_BATCH -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH; + case WRITE_BATCH_NO_WAL -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH_NO_WAL; + case SST_INGESTION -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGESTION; + case SST_INGEST_BEHIND -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGEST_BEHIND; + }) + .build()) + .build()); + + return cf; } @SuppressWarnings("unchecked") @@ -397,6 +472,33 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { return data != null ? MemorySegment.ofBuffer(data.asReadOnlyByteBuffer()) : null; } + private static it.cavallium.rockserver.core.common.api.proto.KVBatch mapKVBatch(@NotNull KVBatch kvBatch) { + return it.cavallium.rockserver.core.common.api.proto.KVBatch.newBuilder() + .addAllEntries(mapKVList(kvBatch.keys(), kvBatch.values())) + .build(); + } + + private static Iterable mapKVList(@NotNull List keys, @NotNull List values) { + return new Iterable<>() { + @Override + public @NotNull Iterator iterator() { + var it1 = keys.iterator(); + var it2 = values.iterator(); + return new Iterator<>() { + @Override + public boolean hasNext() { + return it1.hasNext(); + } + + @Override + public KV next() { + return mapKV(it1.next(), it2.next()); + } + }; + } + }; + } + private static KV mapKV(@NotNull Keys keys, @NotNull MemorySegment value) { return KV.newBuilder() .addAllKeys(mapKeys(keys)) diff --git a/src/main/java/it/cavallium/rockserver/core/common/KVBatch.java b/src/main/java/it/cavallium/rockserver/core/common/KVBatch.java new file mode 100644 index 0000000..5fdffdd --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/KVBatch.java @@ -0,0 +1,10 @@ +package it.cavallium.rockserver.core.common; + +import org.jetbrains.annotations.NotNull; + +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.util.List; + +public record KVBatch(@NotNull List<@NotNull Keys> keys, @NotNull List<@NotNull MemorySegment> values) { +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java index e31dd05..bc4eaf1 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java @@ -17,8 +17,10 @@ public sealed interface RocksDBAPICommand { /** * Open a transaction + *

+ * Returns the transaction id + * * @param timeoutMs timeout in milliseconds - * @return transaction id */ record OpenTransaction(long timeoutMs) implements RocksDBAPICommand { @@ -35,10 +37,11 @@ public sealed interface RocksDBAPICommand { } /** * Close a transaction + *

+ * Returns true if committed, if false, you should try again * * @param transactionId transaction id to close * @param commit true to commit the transaction, false to rollback it - * @return true if committed, if false, you should try again */ record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommand { @@ -74,9 +77,11 @@ public sealed interface RocksDBAPICommand { } /** * Create a column + *

+ * Returns the column id + * * @param name column name * @param schema column key-value schema - * @return column id */ record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommand { @@ -111,8 +116,10 @@ public sealed interface RocksDBAPICommand { } /** * Get column id by name + *

+ * Returns the column id + * * @param name column name - * @return column id */ record GetColumnId(@NotNull String name) implements RocksDBAPICommand { @@ -210,26 +217,23 @@ public sealed interface RocksDBAPICommand { } /** * Put multiple elements into the specified positions - * @param arena arena * @param columnId column id - * @param keys multiple lists of column keys - * @param values multiple values, or null if not needed + * @param batchPublisher publisher of batches of keys and values * @param mode put batch mode */ - record PutBatch(Arena arena, long columnId, - @NotNull List keys, - @NotNull List<@NotNull MemorySegment> values, + record PutBatch(long columnId, + @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) implements RocksDBAPICommand { @Override public Void handleSync(RocksDBSyncAPI api) { - api.putBatch(arena, columnId, keys, values, mode); + api.putBatch(columnId, batchPublisher, mode); return null; } @Override public CompletionStage handleAsync(RocksDBAsyncAPI api) { - return api.putBatchAsync(arena, columnId, keys, values, mode); + return api.putBatchAsync(columnId, batchPublisher, mode); } @Override @@ -237,13 +241,7 @@ public sealed interface RocksDBAPICommand { var sb = new StringBuilder("PUT_BATCH"); sb.append(" column:").append(columnId); sb.append(" mode:").append(mode); - sb.append(" batch:["); - for (int i = 0; i < keys.size(); i++) { - if (i > 0) sb.append(","); - sb.append(" keys:").append(keys.get(i)); - sb.append(" value:").append(Utils.toPrettyString(values.get(i))); - } - sb.append("]"); + sb.append(" batch:[...]"); return sb.toString(); } } @@ -285,6 +283,9 @@ public sealed interface RocksDBAPICommand { } /** * Open an iterator + *

+ * Returns the iterator id + * * @param arena arena * @param transactionId transaction id, or 0 * @param columnId column id @@ -292,7 +293,6 @@ public sealed interface RocksDBAPICommand { * @param endKeysExclusive end keys, exclusive. Null means "the end" * @param reverse if true, seek in reverse direction * @param timeoutMs timeout in milliseconds - * @return iterator id */ record OpenIterator(Arena arena, long transactionId, diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java index 84bc44e..b1332f2 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java @@ -76,12 +76,10 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { } /** See: {@link PutBatch}. */ - default CompletableFuture putBatchAsync(Arena arena, - long columnId, - @NotNull List<@NotNull Keys> keys, - @NotNull List<@NotNull MemorySegment> values, + default CompletableFuture putBatchAsync(long columnId, + @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws RocksDBException { - return requestAsync(new PutBatch(arena, columnId, keys, values, mode)); + return requestAsync(new PutBatch(columnId, batchPublisher, mode)); } /** See: {@link Get}. */ diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java index 207a5c7..d4591d8 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -27,7 +27,11 @@ public class RocksDBException extends RuntimeException { TX_NOT_FOUND, KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR, WRITE_BATCH_1, - SST_WRITE_1 + SST_WRITE_1, + SST_WRITE_2, + SST_WRITE_3, + SST_WRITE_4, + SST_GET_SIZE_FAILED } public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { @@ -35,7 +39,7 @@ public class RocksDBException extends RuntimeException { } public static RocksDBException of(RocksDBErrorType errorUniqueId, Throwable ex) { - if (ex instanceof RocksDBException e) { + if (ex instanceof org.rocksdb.RocksDBException e) { return new RocksDBException(errorUniqueId, e); } else { return new RocksDBException(errorUniqueId, ex); @@ -43,7 +47,7 @@ public class RocksDBException extends RuntimeException { } public static RocksDBException of(RocksDBErrorType errorUniqueId, String message, Throwable ex) { - if (ex instanceof RocksDBException e) { + if (ex instanceof org.rocksdb.RocksDBException e) { return new RocksDBException(errorUniqueId, message, e); } else { return new RocksDBException(errorUniqueId, message, ex); @@ -67,6 +71,7 @@ public class RocksDBException extends RuntimeException { protected RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) { this(errorUniqueId, ex.getMessage()); + super.initCause(ex); } protected RocksDBException(RocksDBErrorType errorUniqueId, String message, org.rocksdb.RocksDBException ex) { diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java index 82a1b5b..015ee76 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java @@ -75,12 +75,10 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler { } /** See: {@link PutBatch}. */ - default void putBatch(Arena arena, - long columnId, - @NotNull List keys, - @NotNull List<@NotNull MemorySegment> values, + default void putBatch(long columnId, + @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws RocksDBException { - requestSync(new PutBatch(arena, columnId, keys, values, mode)); + requestSync(new PutBatch(columnId, batchPublisher, mode)); } /** See: {@link Get}. */ diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index 011d687..a3f7ba7 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -9,9 +9,7 @@ import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; -import it.cavallium.rockserver.core.config.ConfigParser; -import it.cavallium.rockserver.core.config.ConfigPrinter; -import it.cavallium.rockserver.core.config.DatabaseConfig; +import it.cavallium.rockserver.core.config.*; import it.cavallium.rockserver.core.impl.rocksdb.*; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -26,20 +24,21 @@ import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Objects; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import org.cliffc.high_scale_lib.NonBlockingHashMapLong; +import org.github.gestalt.config.exceptions.GestaltException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.rocksdb.*; import org.rocksdb.RocksDBException; import org.rocksdb.Status.Code; @@ -55,13 +54,16 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { private final Logger logger; private final @Nullable Path path; private final TransactionalDB db; + private final DBOptions dbOptions; private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle; private final NonBlockingHashMapLong columns; + private final Map columnsConifg; private final ConcurrentMap columnNamesIndex; private final NonBlockingHashMapLong txs; private final NonBlockingHashMapLong> its; private final SafeShutdown ops; private final Object columnEditLock = new Object(); + private final DatabaseConfig config; public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException { this.path = path; @@ -72,7 +74,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { this.columnNamesIndex = new ConcurrentHashMap<>(); this.ops = new SafeShutdown(); DatabaseConfig config = ConfigParser.parse(embeddedConfigPath); - this.db = RocksDBLoader.load(path, config, logger); + this.config = config; + var loadedDb = RocksDBLoader.load(path, config, logger); + this.db = loadedDb.db(); + this.dbOptions = loadedDb.dbOptions(); + this.columnsConifg = loadedDb.definitiveColumnFamilyOptionsMap(); var existingColumnSchemasColumnDescriptorOptional = db .getStartupColumns() .entrySet() @@ -411,91 +417,163 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { if (keys.size() != values.size()) { throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); } - if (requestType instanceof RequestType.RequestNothing - && !getColumn(columnId).hasBuckets() - && transactionOrUpdateId == 0L) { - putBatch(arena, columnId, keys, values, PutBatchMode.WRITE_BATCH); - return List.of(); - } else { - List responses = requestType instanceof RequestType.RequestNothing ? null : new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); i++) { - var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType); - if (responses != null) { - responses.add(result); - } + List responses = requestType instanceof RequestType.RequestNothing ? null : new ArrayList<>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType); + if (responses != null) { + responses.add(result); } - return responses != null ? responses : List.of(); + } + return responses != null ? responses : List.of(); + } + + public CompletableFuture putBatchInternal(long columnId, + @NotNull Publisher<@NotNull KVBatch> batchPublisher, + @NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException { + try { + var cf = new CompletableFuture(); + batchPublisher.subscribe(new Subscriber<>() { + private Subscription subscription; + private ColumnInstance col; + private ArrayList refs; + private DBWriter writer; + + @Override + public void onSubscribe(Subscription subscription) { + ops.beginOp(); + + try { + // Column id + col = getColumn(columnId); + refs = new ArrayList<>(); + + writer = switch (mode) { + case WRITE_BATCH, WRITE_BATCH_NO_WAL -> { + var wb = new WB(db.get(), new WriteBatch(), mode == PutBatchMode.WRITE_BATCH_NO_WAL); + refs.add(wb); + yield wb; + } + case SST_INGESTION, SST_INGEST_BEHIND -> { + var sstWriter = getSSTWriter(columnId, null, null, true, mode == PutBatchMode.SST_INGEST_BEHIND); + refs.add(sstWriter); + yield sstWriter; + } + }; + } catch (Throwable ex) { + doFinally(); + throw ex; + } + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(KVBatch kvBatch) { + var keyIt = kvBatch.keys().iterator(); + var valueIt = kvBatch.values().iterator(); + try (var arena = Arena.ofConfined()) { + while (keyIt.hasNext()) { + var key = keyIt.next(); + var value = valueIt.next(); + put(arena, writer, col, 0, key, value, RequestType.none()); + } + } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { + doFinally(); + throw ex; + } catch (Exception ex) { + doFinally(); + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); + } + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + cf.completeExceptionally(throwable); + doFinally(); + } + + @Override + public void onComplete() { + try { + try { + writer.writePending(); + } catch (Throwable ex) { + cf.completeExceptionally(ex); + return; + } + cf.complete(null); + } finally { + doFinally(); + } + } + + private void doFinally() { + for (int i = refs.size() - 1; i >= 0; i--) { + try { + var c = refs.get(i); + if (c instanceof AbstractImmutableNativeReference fr) { + if (fr.isOwningHandle()) { + c.close(); + } + } else { + c.close(); + } + } catch (Exception ex) { + logger.error("Failed to close reference during batch write", ex); + } + } + ops.endOp(); + } + }); + return cf; + } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { + throw ex; + } catch (Exception ex) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); } } + @VisibleForTesting + public SSTWriter getSSTWriter(long colId, + @Nullable GlobalDatabaseConfig globalDatabaseConfigOverride, + @Nullable FallbackColumnConfig columnConfigOverride, + boolean forceNoOptions, + boolean ingestBehind) throws it.cavallium.rockserver.core.common.RocksDBException { + try { + var col = getColumn(colId); + ColumnFamilyOptions columnConifg; + RocksDBObjects refs; + if (!forceNoOptions) { + if (columnConfigOverride != null) { + refs = new RocksDBObjects(); + columnConifg = RocksDBLoader.getColumnOptions(globalDatabaseConfigOverride, columnConfigOverride, logger, refs, path == null, null); + } else { + columnConifg = columnsConifg.get(new String(col.cfh().getName(), StandardCharsets.UTF_8)); + refs = null; + } + } else { + columnConifg = null; + refs = null; + } + return SSTWriter.open(db, col, columnConifg, forceNoOptions, ingestBehind, refs); + } catch (IOException ex) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex); + } catch (RocksDBException ex) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex); + } + } @Override - public void putBatch(Arena arena, - long columnId, - @NotNull List keys, - @NotNull List<@NotNull MemorySegment> values, + public void putBatch(long columnId, + @NotNull Publisher<@NotNull KVBatch> batchPublisher, @NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException { - if (keys.size() != values.size()) { - throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); - } - ops.beginOp(); try { - // Column id - var col = getColumn(columnId); - List refs = new ArrayList<>(); - try { - DBWriter writer = switch (mode) { - case WRITE_BATCH, WRITE_BATCH_NO_WAL -> { - var wb = new WB(db.get(), new WriteBatch(), mode == PutBatchMode.WRITE_BATCH_NO_WAL); - refs.add(wb); - yield wb; - } - case SST_INGESTION, SST_INGEST_BEHIND -> { - var envOptions = new EnvOptions(); - refs.add(envOptions); - var compressionOptions = new CompressionOptions() - .setEnabled(true) - .setMaxDictBytes(32768) - .setZStdMaxTrainBytes(32768 * 4); - refs.add(compressionOptions); - var options = new Options() - .setCompressionOptions(compressionOptions) - .setCompressionType(CompressionType.ZSTD_COMPRESSION) - .setUnorderedWrite(true) - .setAllowIngestBehind(mode == PutBatchMode.SST_INGEST_BEHIND) - .setAllowConcurrentMemtableWrite(true); - refs.add(options); - var tempFile = Files.createTempFile("temp", ".sst"); - var sstFileWriter = new SstFileWriter(envOptions, options); - var sstWriter = new SSTWriter(db.get(), col, tempFile, sstFileWriter, mode == PutBatchMode.SST_INGEST_BEHIND); - refs.add(sstWriter); - sstFileWriter.open(tempFile.toString()); - yield sstWriter; - } - }; - var keyIt = keys.iterator(); - var valusIt = values.iterator(); - while (keyIt.hasNext()) { - var key = keyIt.next(); - var value = valusIt.next(); - put(arena, writer, col, 0, key, value, RequestType.none()); - } - writer.writePending(); - } finally { - for (int i = refs.size() - 1; i >= 0; i--) { - try { - refs.get(i).close(); - } catch (Exception ex) { - logger.error("Failed to close reference during batch write", ex); - } - } - } + putBatchInternal(columnId, batchPublisher, mode).get(); } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { throw ex; } catch (Exception ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); - } finally { - ops.endOp(); } } @@ -613,7 +691,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } switch (dbWriter) { case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); - case SSTWriter sstWriter -> sstWriter.sstFileWriter().put(Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); + case SSTWriter sstWriter -> { + var keyBB = calculatedKey.asByteBuffer(); + ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer(); + sstWriter.put(keyBB, valueBB); + } case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case null -> { try (var w = new WriteOptions()) { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java index 57ccf48..4eb8521 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java @@ -5,14 +5,8 @@ import it.cavallium.rockserver.core.config.*; import java.io.InputStream; import java.nio.file.StandardCopyOption; import java.time.Duration; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.SequencedMap; -import java.util.logging.Level; +import java.util.*; + import org.github.gestalt.config.exceptions.GestaltException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -90,8 +84,207 @@ public class RocksDBLoader { throw new RuntimeException("rocksdb was not found inside JAR."); } + public record LoadedDb(TransactionalDB db, DBOptions dbOptions, + Map definitiveColumnFamilyOptionsMap) {} - public static TransactionalDB load(@Nullable Path path, DatabaseConfig config, Logger logger) { + public static ColumnFamilyOptions getColumnOptions( + GlobalDatabaseConfig globalDatabaseConfig, + FallbackColumnConfig columnOptions, Logger logger, + RocksDBObjects refs, + boolean inMemory, + @Nullable Cache cache) { + try { + var columnFamilyOptions = new ColumnFamilyOptions(); + refs.add(columnFamilyOptions); + + //noinspection ConstantConditions + if (columnOptions.memtableMemoryBudgetBytes() != null) { + // about 512MB of ram will be used for level style compaction + columnFamilyOptions.optimizeLevelStyleCompaction(Optional.ofNullable(columnOptions.memtableMemoryBudgetBytes()) + .map(DataSize::longValue) + .orElse(DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET)); + } + + if (isDisableAutoCompactions()) { + columnFamilyOptions.setDisableAutoCompactions(true); + } + try { + columnFamilyOptions.setPrepopulateBlobCache(PrepopulateBlobCache.PREPOPULATE_BLOB_FLUSH_ONLY); + } catch (Throwable ex) { + logger.error("Failed to set prepopulate blob cache", ex); + } + + // This option is not supported with multiple db paths + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + boolean dynamicLevelBytes = (globalDatabaseConfig.volumes() == null || globalDatabaseConfig.volumes().length <= 1) + && !globalDatabaseConfig.ingestBehind(); + if (dynamicLevelBytes) { + columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true); + } + + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions + .setTargetFileSizeBase(256 * SizeUnit.MB) + .setMaxBytesForLevelBase(SizeUnit.GB); + + if (isDisableAutoCompactions()) { + columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1); + } else if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { + // ArangoDB uses a value of 2: https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // Higher values speed up writes, but slow down reads + columnFamilyOptions.setLevel0FileNumCompactionTrigger(2); + } + if (isDisableSlowdown()) { + columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1); + columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE); + columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE); + columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE); + } + { + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setLevel0SlowdownWritesTrigger(20); + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setLevel0StopWritesTrigger(36); + } + + if (columnOptions.levels().length > 0) { + columnFamilyOptions.setNumLevels(columnOptions.levels().length); + var firstLevelOptions = getRocksLevelOptions(columnOptions.levels()[0], refs); + columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType); + columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions); + + var lastLevelOptions = getRocksLevelOptions(columnOptions + .levels()[columnOptions.levels().length - 1], refs); + columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType); + columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions); + + List compressionPerLevel = new ArrayList<>(); + for (ColumnLevelConfig columnLevelConfig : columnOptions.levels()) { + CompressionType compression = columnLevelConfig.compression(); + compressionPerLevel.add(compression); + } + columnFamilyOptions.setCompressionPerLevel(compressionPerLevel); + } else { + columnFamilyOptions.setNumLevels(7); + List compressionTypes = new ArrayList<>(7); + for (int i = 0; i < 7; i++) { + if (i < 2) { + compressionTypes.add(CompressionType.NO_COMPRESSION); + } else { + compressionTypes.add(CompressionType.LZ4_COMPRESSION); + } + } + columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION); + var compressionOptions = new CompressionOptions() + .setEnabled(true) + .setMaxDictBytes(Math.toIntExact(32 * SizeUnit.KB)); + refs.add(compressionOptions); + setZstdCompressionOptions(compressionOptions); + columnFamilyOptions.setBottommostCompressionOptions(compressionOptions); + columnFamilyOptions.setCompressionPerLevel(compressionTypes); + } + + final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); + + if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { + columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB); + } + Optional.ofNullable(columnOptions.writeBufferSize()) + .map(DataSize::longValue) + .ifPresent(columnFamilyOptions::setWriteBufferSize); + + columnFamilyOptions.setMaxWriteBufferNumberToMaintain(1); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setVerifyCompression(false); + } + // If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1) + // If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys + BloomFilterConfig filter = null; + BloomFilterConfig bloomFilterConfig = columnOptions.bloomFilter(); + if (bloomFilterConfig != null) filter = bloomFilterConfig; + if (filter == null) { + if (inMemory) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases"); + } + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setFilterPolicy(null); + } + } else { + final BloomFilter bloomFilter = new BloomFilter(filter.bitsPerKey()); + refs.add(bloomFilter); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setFilterPolicy(bloomFilter); + } + } + boolean cacheIndexAndFilterBlocks = !inMemory && Optional.ofNullable(columnOptions.cacheIndexAndFilterBlocks()) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .orElse(true); + if (globalDatabaseConfig.spinning()) { + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions.setMinWriteBufferNumberToMerge(3); + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions.setMaxWriteBufferNumber(4); + } + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig + // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html + .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) + // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html + .setDataBlockHashTableUtilRatio(0.75) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setPinTopLevelIndexAndFilter(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setPinL0FilterAndIndexBlocksInCache(!inMemory) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + // Enabling partition filters increase the reads by 2x + .setPartitionFilters(Optional.ofNullable(columnOptions.partitionFilters()).orElse(false)) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setIndexType(inMemory ? IndexType.kHashSearch : Optional.ofNullable(columnOptions.partitionFilters()).orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch) + .setChecksumType(inMemory ? ChecksumType.kNoChecksum : ChecksumType.kXXH3) + // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + .setBlockSize(inMemory ? 4 * SizeUnit.KB : Optional.ofNullable(columnOptions.blockSize()) + .map(DataSize::longValue) + .orElse((globalDatabaseConfig.spinning() ? 128 : 16) * SizeUnit.KB)) + .setBlockCache(cache) + .setNoBlockCache(cache == null); + } + if (inMemory) { + columnFamilyOptions.useCappedPrefixExtractor(4); + tableOptions.setBlockRestartInterval(4); + } + + columnFamilyOptions.setTableFormatConfig(tableOptions); + columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions= + BloomFilterConfig bloomFilterOptions = columnOptions.bloomFilter(); + if (bloomFilterOptions != null) { + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions= + boolean optimizeForHits = globalDatabaseConfig.spinning(); + Boolean value = bloomFilterOptions.optimizeForHits(); + if (value != null) optimizeForHits = value; + columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits); + } + return columnFamilyOptions; + } catch (GestaltException ex) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, ex); + } + } + + private static void setZstdCompressionOptions(CompressionOptions compressionOptions) { + // https://rocksdb.org/blog/2021/05/31/dictionary-compression.html#:~:text=(zstd%20only,it%20to%20100x + compressionOptions + .setZStdMaxTrainBytes(compressionOptions.maxDictBytes() * 100); + } + + public static LoadedDb load(@Nullable Path path, DatabaseConfig config, Logger logger) { var refs = new RocksDBObjects(); // Get databases directory path Path definitiveDbPath; @@ -132,7 +325,9 @@ public class RocksDBLoader { refs.add(options); options.setParanoidChecks(PARANOID_CHECKS); options.setSkipCheckingSstFileSizesOnDbOpen(true); - options.setEnablePipelinedWrite(true); + if (!databaseOptions.global().unorderedWrite()) { + options.setEnablePipelinedWrite(true); + } var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1")); if (maxSubCompactions > 0) { options.setMaxSubcompactions(maxSubCompactions); @@ -301,11 +496,12 @@ public class RocksDBLoader { .toList(); } - private static TransactionalDB loadDb(@Nullable Path path, + private static LoadedDb loadDb(@Nullable Path path, @NotNull Path definitiveDbPath, DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) { var inMemory = path == null; var rocksdbOptions = optionsWithCache.options(); + Map definitiveColumnFamilyOptionsMap = new HashMap<>(); try { List volumeConfigs = getVolumeConfigs(definitiveDbPath, databaseOptions); List descriptors = new ArrayList<>(); @@ -359,199 +555,12 @@ public class RocksDBLoader { throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name); } - var columnFamilyOptions = new ColumnFamilyOptions(); + var columnFamilyOptions = getColumnOptions(databaseOptions.global(), columnOptions, + logger, refs, path == null, optionsWithCache.standardCache()); refs.add(columnFamilyOptions); - //noinspection ConstantConditions - if (columnOptions.memtableMemoryBudgetBytes() != null) { - // about 512MB of ram will be used for level style compaction - columnFamilyOptions.optimizeLevelStyleCompaction(Optional.ofNullable(columnOptions.memtableMemoryBudgetBytes()) - .map(DataSize::longValue) - .orElse(DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET)); - } - - if (isDisableAutoCompactions()) { - columnFamilyOptions.setDisableAutoCompactions(true); - } - try { - columnFamilyOptions.setPrepopulateBlobCache(PrepopulateBlobCache.PREPOPULATE_BLOB_FLUSH_ONLY); - } catch (Throwable ex) { - logger.error("Failed to set prepopulate blob cache", ex); - } - - // This option is not supported with multiple db paths - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - boolean dynamicLevelBytes = volumeConfigs.size() <= 1; - if (dynamicLevelBytes) { - columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true); - columnFamilyOptions.setMaxBytesForLevelBase(10 * SizeUnit.GB); - columnFamilyOptions.setMaxBytesForLevelMultiplier(10); - } else { - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB); - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - columnFamilyOptions.setMaxBytesForLevelMultiplier(10); - } - if (isDisableAutoCompactions()) { - columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1); - } else if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { - // ArangoDB uses a value of 2: https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - // Higher values speed up writes, but slow down reads - columnFamilyOptions.setLevel0FileNumCompactionTrigger(2); - } - if (isDisableSlowdown()) { - columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1); - columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE); - columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE); - columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE); - } - { - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - columnFamilyOptions.setLevel0SlowdownWritesTrigger(20); - // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html - columnFamilyOptions.setLevel0StopWritesTrigger(36); - } - - if (columnOptions.levels().length > 0) { - columnFamilyOptions.setNumLevels(columnOptions.levels().length); - var firstLevelOptions = getRocksLevelOptions(columnOptions.levels()[0], refs); - columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType); - columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions); - - var lastLevelOptions = getRocksLevelOptions(columnOptions - .levels()[columnOptions.levels().length - 1], refs); - columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType); - columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions); - - List compressionPerLevel = new ArrayList<>(); - for (ColumnLevelConfig columnLevelConfig : columnOptions.levels()) { - CompressionType compression = columnLevelConfig.compression(); - compressionPerLevel.add(compression); - } - columnFamilyOptions.setCompressionPerLevel(compressionPerLevel); - } else { - columnFamilyOptions.setNumLevels(7); - List compressionTypes = new ArrayList<>(7); - for (int i = 0; i < 7; i++) { - if (i < 2) { - compressionTypes.add(CompressionType.NO_COMPRESSION); - } else { - compressionTypes.add(CompressionType.LZ4_COMPRESSION); - } - } - columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION); - var compressionOptions = new CompressionOptions() - .setEnabled(true) - .setMaxDictBytes(32768); - refs.add(compressionOptions); - columnFamilyOptions.setBottommostCompressionOptions(compressionOptions); - columnFamilyOptions.setCompressionPerLevel(compressionTypes); - } - - final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); - - if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { - columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB); - } - Optional.ofNullable(columnOptions.writeBufferSize()) - .map(DataSize::longValue) - .ifPresent(columnFamilyOptions::setWriteBufferSize); - - columnFamilyOptions.setMaxWriteBufferNumberToMaintain(1); - if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { - blockBasedTableConfig.setVerifyCompression(false); - } - // If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1) - // If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys - BloomFilterConfig filter = null; - BloomFilterConfig bloomFilterConfig = columnOptions.bloomFilter(); - if (bloomFilterConfig != null) filter = bloomFilterConfig; - if (filter == null) { - if (path == null) { - throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases"); - } - if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { - blockBasedTableConfig.setFilterPolicy(null); - } - } else { - final BloomFilter bloomFilter = new BloomFilter(filter.bitsPerKey()); - refs.add(bloomFilter); - if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { - blockBasedTableConfig.setFilterPolicy(bloomFilter); - } - } - boolean cacheIndexAndFilterBlocks = path != null && Optional.ofNullable(columnOptions.cacheIndexAndFilterBlocks()) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .orElse(true); - if (databaseOptions.global().spinning()) { - if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { - // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - // cacheIndexAndFilterBlocks = true; - // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - columnFamilyOptions.setMinWriteBufferNumberToMerge(3); - // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - columnFamilyOptions.setMaxWriteBufferNumber(4); - } - } - if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { - blockBasedTableConfig - // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html - .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) - // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html - .setDataBlockHashTableUtilRatio(0.75) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setPinTopLevelIndexAndFilter(true) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setPinL0FilterAndIndexBlocksInCache(path != null) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setCacheIndexAndFilterBlocksWithHighPriority(true) - .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - // Enabling partition filters increase the reads by 2x - .setPartitionFilters(Optional.ofNullable(columnOptions.partitionFilters()).orElse(false)) - // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters - .setIndexType(path == null ? IndexType.kHashSearch : Optional.ofNullable(columnOptions.partitionFilters()).orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch) - .setChecksumType(path == null ? ChecksumType.kNoChecksum : ChecksumType.kXXH3) - // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB - // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - .setBlockSize(path == null ? 4096 : Optional.ofNullable(columnOptions.blockSize()).map(DataSize::longValue).orElse((databaseOptions.global().spinning() ? 128 : 16) * 1024L)) - .setBlockCache(optionsWithCache.standardCache()) - .setNoBlockCache(optionsWithCache.standardCache() == null); - } - if (path == null) { - columnFamilyOptions.useCappedPrefixExtractor(4); - tableOptions.setBlockRestartInterval(4); - } - - columnFamilyOptions.setTableFormatConfig(tableOptions); - columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); - // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - // https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions= - BloomFilterConfig bloomFilterOptions = columnOptions.bloomFilter(); - if (bloomFilterOptions != null) { - // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks - // https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions= - boolean optimizeForHits = databaseOptions.global().spinning(); - Boolean value = bloomFilterOptions.optimizeForHits(); - if (value != null) optimizeForHits = value; - columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits); - } - - if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { - // // Increasing this value can reduce the frequency of compaction and reduce write amplification, - // // but it will also cause old data to be unable to be cleaned up in time, thus increasing read amplification. - // // This parameter is not easy to adjust. It is generally not recommended to set it above 256MB. - // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html - columnFamilyOptions.setTargetFileSizeBase(64 * SizeUnit.MB); - // // For each level up, the threshold is multiplied by the factor target_file_size_multiplier - // // (but the default value is 1, which means that the maximum sstable of each level is the same). - columnFamilyOptions.setTargetFileSizeMultiplier(2); - } - descriptors.add(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), columnFamilyOptions)); + definitiveColumnFamilyOptionsMap.put(name, columnFamilyOptions); } var handles = new ArrayList(); @@ -587,7 +596,7 @@ public class RocksDBLoader { var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions); var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig); - return TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks); + return new LoadedDb(TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks), rocksdbOptions, definitiveColumnFamilyOptionsMap); } catch (IOException | RocksDBException ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex); } catch (GestaltException e) { @@ -612,8 +621,9 @@ public class RocksDBLoader { var compressionOptions = new CompressionOptions(); refs.add(compressionOptions); if (compressionType != CompressionType.NO_COMPRESSION) { - compressionOptions.setEnabled(true); - compressionOptions.setMaxDictBytes(Math.toIntExact(levelOptions.maxDictBytes().longValue())); + compressionOptions.setEnabled(true) + .setMaxDictBytes(Math.toIntExact(levelOptions.maxDictBytes().longValue())); + setZstdCompressionOptions(compressionOptions); } else { compressionOptions.setEnabled(false); } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/SSTWriter.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/SSTWriter.java index e45e717..f0fdbee 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/SSTWriter.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/SSTWriter.java @@ -1,37 +1,151 @@ package it.cavallium.rockserver.core.impl.rocksdb; import it.cavallium.rockserver.core.common.RocksDBException; -import org.rocksdb.IngestExternalFileOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.SstFileWriter; +import it.cavallium.rockserver.core.impl.ColumnInstance; +import org.rocksdb.*; +import org.rocksdb.util.SizeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.UUID; -public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInstance col, Path path, SstFileWriter sstFileWriter, boolean ingestBehind) implements Closeable, DBWriter { +public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInstance col, Path path, SstFileWriter sstFileWriter, boolean ingestBehind, + RocksDBObjects refs) implements Closeable, DBWriter { + + private static final Logger LOG = LoggerFactory.getLogger(SSTWriter.class); + + public static SSTWriter open(TransactionalDB db, ColumnInstance col, ColumnFamilyOptions columnConifg, boolean forceNoOptions, boolean ingestBehind, RocksDBObjects refs) throws IOException, org.rocksdb.RocksDBException { + if (refs == null) { + refs = new RocksDBObjects(); + } + var envOptions = new EnvOptions(); + if (!forceNoOptions) { + envOptions + .setAllowFallocate(true) + .setWritableFileMaxBufferSize(10 * SizeUnit.MB) + .setRandomAccessMaxBufferSize(10 * SizeUnit.MB) + .setCompactionReadaheadSize(2 * SizeUnit.MB) + .setBytesPerSync(10 * SizeUnit.MB); + } + refs.add(envOptions); + + var options = new Options(); + refs.add(options); + if (!forceNoOptions) { + options + .setDisableAutoCompactions(true) + .setManualWalFlush(true) + .setUseDirectIoForFlushAndCompaction(true) + .setBytesPerSync(5 * SizeUnit.MB) + .setParanoidChecks(false) + .setSkipCheckingSstFileSizesOnDbOpen(true) + .setForceConsistencyChecks(false) + .setParanoidFileChecks(false); + if (columnConifg != null) { + options + .setCompressionType(columnConifg.compressionType()) + .setCompressionOptions(columnConifg.compressionOptions()) + .setBottommostCompressionType(columnConifg.bottommostCompressionType()) + .setBottommostCompressionOptions(columnConifg.bottommostCompressionOptions()) + .setCompressionPerLevel(columnConifg.compressionPerLevel()) + .setNumLevels(columnConifg.numLevels()) + .setTableFormatConfig(columnConifg.tableFormatConfig()) + .setMemTableConfig(columnConifg.memTableConfig()) + .setTargetFileSizeBase(columnConifg.targetFileSizeBase()) + .setTargetFileSizeMultiplier(columnConifg.targetFileSizeMultiplier()) + .setMaxOpenFiles(-1); + } + } + Path tempFile; + try { + var tempDir = Path.of(db.getPath()).resolve(".temp_sst"); + if (Files.notExists(tempDir)) { + Files.createDirectories(tempDir); + } + tempFile = tempDir.resolve(UUID.randomUUID() + ".sst"); + } catch (IOException ex) { + refs.close(); + throw ex; + } + var sstFileWriter = new SstFileWriter(envOptions, options); + var sstWriter = new SSTWriter(db.get(), col, tempFile, sstFileWriter, ingestBehind, refs); + sstFileWriter.open(tempFile.toString()); + return sstWriter; + } + + public void put(byte[] key, byte[] value) throws RocksDBException { + try { + checkOwningHandle(); + sstFileWriter.put(key, value); + } catch (org.rocksdb.RocksDBException e) { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e); + } + } + + public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException { + try { + checkOwningHandle(); + sstFileWriter.put(key, value); + } catch (org.rocksdb.RocksDBException e) { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e); + } + } @Override public void writePending() throws it.cavallium.rockserver.core.common.RocksDBException { try { - sstFileWriter.finish(); - try (var ingestOptions = new IngestExternalFileOptions()) { - ingestOptions - .setIngestBehind(ingestBehind) - .setAllowBlockingFlush(true) - .setMoveFiles(true) - .setAllowGlobalSeqNo(true) - .setWriteGlobalSeqno(false) - .setSnapshotConsistency(false); - db.ingestExternalFile(col.cfh(), List.of(path.toString()), ingestOptions); + try (this) { + checkOwningHandle(); + sstFileWriter.finish(); + try (var ingestOptions = new IngestExternalFileOptions()) { + ingestOptions + .setIngestBehind(ingestBehind) + .setAllowBlockingFlush(true) + .setMoveFiles(true) + .setAllowGlobalSeqNo(true) + .setWriteGlobalSeqno(false) + .setSnapshotConsistency(false); + db.ingestExternalFile(col.cfh(), List.of(path.toString()), ingestOptions); + } } } catch (org.rocksdb.RocksDBException e) { throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_WRITE_1, e); } } + private void checkOwningHandle() { + if (!sstFileWriter.isOwningHandle()) { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_WRITE_4, "SST writer is closed"); + } + } + @Override public void close() { - sstFileWriter.close(); + if (sstFileWriter.isOwningHandle()) { + sstFileWriter.close(); + try { + Files.deleteIfExists(path); + } catch (IOException e) { + LOG.error("Failed to delete a file: {}", path, e); + } + } + refs.close(); + } + + public long fileSize() { + if (!sstFileWriter.isOwningHandle()) { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_GET_SIZE_FAILED, "The SSTWriter is closed"); + } + try { + return sstFileWriter.fileSize(); + } catch (org.rocksdb.RocksDBException e) { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_GET_SIZE_FAILED, e); + } } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java index 029f6bc..798f0fc 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java @@ -2,6 +2,7 @@ package it.cavallium.rockserver.core.impl.rocksdb; import java.io.Closeable; +import it.cavallium.rockserver.core.common.RocksDBException; import org.rocksdb.Transaction; public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs) @@ -12,4 +13,9 @@ public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects obj val.close(); objs.close(); } + + @Override + public void writePending() throws RocksDBException { + + } } diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index bf103c3..7251e7e 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -4,7 +4,9 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import io.grpc.Status; import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -17,6 +19,7 @@ import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.KVBatch; import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; @@ -42,11 +45,14 @@ import java.lang.foreign.MemorySegment; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.rocksdb.util.SizeUnit; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +85,7 @@ public class GrpcServer extends Server { .directExecutor() .channelType(channelType) .withChildOption(ChannelOption.SO_KEEPALIVE, false) - .maxInboundMessageSize(Math.toIntExact(128 * SizeUnit.MB)) + .maxInboundMessageSize(128 * 1024 * 1024) .addService(grpc) .build(); server.start(); @@ -88,9 +94,11 @@ public class GrpcServer extends Server { private final class GrpcServerImpl extends RocksDBServiceImplBase { + private final RocksDBAsyncAPI asyncApi; private final RocksDBSyncAPI api; public GrpcServerImpl(RocksDBConnection client) { + this.asyncApi = client.getAsyncApi(); this.api = client.getSyncApi(); } @@ -105,7 +113,7 @@ public class GrpcServer extends Server { responseObserver.onNext(OpenTransactionResponse.newBuilder().setTransactionId(txId).build()); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -120,7 +128,7 @@ public class GrpcServer extends Server { responseObserver.onNext(response); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -133,7 +141,7 @@ public class GrpcServer extends Server { responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -141,19 +149,27 @@ public class GrpcServer extends Server { @Override public void createColumn(CreateColumnRequest request, StreamObserver responseObserver) { executor.execute(() -> { - var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); - var response = CreateColumnResponse.newBuilder().setColumnId(colId).build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); + try { + var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); + var response = CreateColumnResponse.newBuilder().setColumnId(colId).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable ex) { + handleError(responseObserver, ex); + } }); } @Override public void deleteColumn(DeleteColumnRequest request, StreamObserver responseObserver) { executor.execute(() -> { - api.deleteColumn(request.getColumnId()); - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); + try { + api.deleteColumn(request.getColumnId()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } catch (Throwable ex) { + handleError(responseObserver, ex); + } }); } @@ -166,7 +182,7 @@ public class GrpcServer extends Server { responseObserver.onNext(response); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -187,35 +203,127 @@ public class GrpcServer extends Server { responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @Override - public void putBatch(PutBatchRequest request, StreamObserver responseObserver) { - executor.execute(() -> { - try { - try (var arena = Arena.ofConfined()) { - api.putBatch(arena, - request.getColumnId(), - mapKeysKV(arena, request.getDataCount(), request::getData), - mapValuesKV(arena, request.getDataCount(), request::getData), - switch (request.getMode()) { - case WRITE_BATCH -> PutBatchMode.WRITE_BATCH; - case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL; - case SST_INGESTION -> PutBatchMode.SST_INGESTION; - case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND; - case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\""); - } - ); - } - responseObserver.onNext(Empty.getDefaultInstance()); - responseObserver.onCompleted(); - } catch (Throwable ex) { - responseObserver.onError(ex); + public StreamObserver putBatch(StreamObserver responseObserver) { + final ServerCallStreamObserver serverCallStreamObserver = + (ServerCallStreamObserver) responseObserver; + serverCallStreamObserver.disableAutoRequest(); + serverCallStreamObserver.request(1); + var requestObserver = new StreamObserver() { + enum State { + BEFORE_INITIAL_REQUEST, + RECEIVING_DATA, + RECEIVED_ALL } - }); + private final ExecutorService sstExecutor = Executors.newSingleThreadExecutor(); + final AtomicInteger pendingRequests = new AtomicInteger(); + State state = State.BEFORE_INITIAL_REQUEST; + private PutBatchInitialRequest initialRequest; + private Subscriber putBatchInputsSubscriber; + @Override + public void onNext(PutBatchRequest putBatchRequest) { + if (state == State.BEFORE_INITIAL_REQUEST) { + if (!putBatchRequest.hasInitialRequest()) { + serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request")); + } + + initialRequest = putBatchRequest.getInitialRequest(); + + try { + asyncApi.putBatchAsync(initialRequest.getColumnId(), + subscriber2 -> { + putBatchInputsSubscriber = subscriber2; + subscriber2.onSubscribe(new Subscription() { + @Override + public void request(long l) { + serverCallStreamObserver.request(Math.toIntExact(l)); + } + + @Override + public void cancel() { + serverCallStreamObserver.onError(new IOException("Cancelled")); + + } + }); + }, + switch (initialRequest.getMode()) { + case WRITE_BATCH -> PutBatchMode.WRITE_BATCH; + case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL; + case SST_INGESTION -> PutBatchMode.SST_INGESTION; + case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND; + case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\""); + } + ).whenComplete((_, ex) -> { + if (ex != null) { + handleError(serverCallStreamObserver, ex); + } else { + serverCallStreamObserver.onNext(Empty.getDefaultInstance()); + serverCallStreamObserver.onCompleted(); + } + }); + } catch (Throwable ex) { + handleError(serverCallStreamObserver, ex); + } + state = State.RECEIVING_DATA; + } else if (state == State.RECEIVING_DATA) { + pendingRequests.incrementAndGet(); + var kvBatch = putBatchRequest.getData(); + sstExecutor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + putBatchInputsSubscriber.onNext(mapKVBatch(arena, kvBatch.getEntriesCount(), kvBatch::getEntries)); + } + checkCompleted(true); + } catch (Throwable ex) { + putBatchInputsSubscriber.onError(ex); + } + }); + } else { + serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Invalid request")); + } + } + + @Override + public void onError(Throwable throwable) { + state = State.RECEIVED_ALL; + doFinally(); + if (putBatchInputsSubscriber != null) { + putBatchInputsSubscriber.onError(throwable); + } else { + serverCallStreamObserver.onError(throwable); + } + } + + @Override + public void onCompleted() { + if (state == State.BEFORE_INITIAL_REQUEST) { + serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request")); + } else if (state == State.RECEIVING_DATA) { + state = State.RECEIVED_ALL; + checkCompleted(false); + } else { + putBatchInputsSubscriber.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, "Unknown state during onComplete: " + state)); + } + } + + private void checkCompleted(boolean requestDone) { + if ((requestDone ? pendingRequests.decrementAndGet() : pendingRequests.get()) == 0 + && state == State.RECEIVED_ALL) { + doFinally(); + putBatchInputsSubscriber.onComplete(); + } + } + + private void doFinally() { + sstExecutor.shutdown(); + } + }; + return requestObserver; } @Override @@ -253,7 +361,7 @@ public class GrpcServer extends Server { new RequestNothing<>()); } } catch (RocksDBException ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); return; } @@ -308,7 +416,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -337,7 +445,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -359,7 +467,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -381,7 +489,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -406,7 +514,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -432,7 +540,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -452,7 +560,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -474,7 +582,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -487,7 +595,7 @@ public class GrpcServer extends Server { responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -502,7 +610,7 @@ public class GrpcServer extends Server { responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -520,7 +628,7 @@ public class GrpcServer extends Server { responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -539,7 +647,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -571,7 +679,7 @@ public class GrpcServer extends Server { } responseObserver.onCompleted(); } catch (Throwable ex) { - responseObserver.onError(ex); + handleError(responseObserver, ex); } }); } @@ -641,6 +749,30 @@ public class GrpcServer extends Server { } return keys; } + + private static KVBatch mapKVBatch(Arena arena, int count, Int2ObjectFunction getterAt) { + return new KVBatch( + mapKeysKV(arena, count, getterAt), + mapValuesKV(arena, count, getterAt) + ); + } + + private static void handleError(StreamObserver responseObserver, Throwable ex) { + if (ex instanceof CompletionException exx) { + handleError(responseObserver, exx.getCause()); + } else { + if (ex instanceof RocksDBException e) { + responseObserver.onError(Status.INTERNAL + .withDescription(e.getLocalizedMessage()) + .withCause(e) + .asException()); + } else { + responseObserver.onError(Status.INTERNAL + .withCause(ex) + .asException()); + } + } + } } @Override diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index c0ce901..887fe92 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -54,6 +54,10 @@ message KV { bytes value = 2; } +message KVBatch { + repeated KV entries = 1; +} + message OpenTransactionRequest {int64 timeoutMs = 1;} message OpenTransactionResponse {int64 transactionId = 1;} @@ -72,7 +76,8 @@ message GetColumnIdResponse {int64 columnId = 1;} message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;} -message PutBatchRequest {int64 columnId = 1; repeated KV data = 2; PutBatchMode mode = 3;} +message PutBatchInitialRequest {int64 columnId = 1; PutBatchMode mode = 2;} +message PutBatchRequest {oneof putBatchRequestType {PutBatchInitialRequest initialRequest = 1;KVBatch data = 2;}} message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;} message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}} @@ -97,7 +102,7 @@ service RocksDBService { rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty); rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse); rpc put(PutRequest) returns (google.protobuf.Empty); - rpc putBatch(PutBatchRequest) returns (google.protobuf.Empty); + rpc putBatch(stream PutBatchRequest) returns (google.protobuf.Empty); rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty); rpc putGetPrevious(PutRequest) returns (Previous); rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous); diff --git a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf index 8c67118..4b340a2 100644 --- a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf +++ b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf @@ -65,7 +65,7 @@ database: { block-cache: 512MiB # Database write buffer manager size # You should enable this option if you are using direct I/O or spinning disks - write-buffer-manager: 64MiB + write-buffer-manager: 128MiB # Log data path log-path: ./logs # Write-Ahead-Log data path @@ -132,7 +132,7 @@ database: { # This should be kept to null if write-buffer-manager is set, # or if you want to use the "memtable-memory-budget-size" logic. # Remember that there are "max-write-buffer-number" in memory, 2 by default - write-buffer-size: 200MiB + write-buffer-size: 64MiB # Enable blob files blob-files: false } diff --git a/src/native/java/module-info.java b/src/native/java/module-info.java index e338461..a95a2e3 100644 --- a/src/native/java/module-info.java +++ b/src/native/java/module-info.java @@ -28,6 +28,7 @@ module rockserver.core { requires io.netty.codec; requires io.netty.codec.http2; requires io.netty.transport.classes.epoll; + requires org.reactivestreams; exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.common; diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java index b0cefa4..b70a461 100644 --- a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java @@ -3,24 +3,25 @@ package it.cavallium.rockserver.core.impl.test; import static it.cavallium.rockserver.core.common.Utils.toMemorySegmentSimple; import it.cavallium.rockserver.core.client.EmbeddedConnection; -import it.cavallium.rockserver.core.common.Keys; -import it.cavallium.rockserver.core.common.RequestType; -import it.cavallium.rockserver.core.common.ColumnHashType; -import it.cavallium.rockserver.core.common.ColumnSchema; -import it.cavallium.rockserver.core.common.Delta; -import it.cavallium.rockserver.core.common.RocksDBException; -import it.cavallium.rockserver.core.common.RocksDBRetryException; -import it.cavallium.rockserver.core.common.Utils; +import it.cavallium.rockserver.core.common.*; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.objects.ObjectList; import java.lang.foreign.Arena; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; + +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import java.io.IOException; import java.lang.foreign.MemorySegment; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; abstract class EmbeddedDBTest { @@ -362,6 +363,73 @@ abstract class EmbeddedDBTest { } } + @Test + void putBatchSST() { + @NotNull Publisher<@NotNull KVBatch> batchPublisher = new Publisher() { + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new Subscription() { + Iterator it; + { + ArrayList items = new ArrayList<>(); + ArrayList keys = new ArrayList<>(); + ArrayList values = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + var keyI = getKeyI(i); + var valueI = getValueI(i); + keys.add(keyI); + values.add(valueI); + } + items.add(new KVBatch(keys, values)); + keys = new ArrayList<>(); + values = new ArrayList<>(); + for (int i = 2; i < 4; i++) { + var keyI = getKeyI(i); + var valueI = getValueI(i); + keys.add(keyI); + values.add(valueI); + } + items.add(new KVBatch(keys, values)); + it = items.iterator(); + } + @Override + public void request(long l) { + while (l-- > 0) { + if (it.hasNext()) { + subscriber.onNext(it.next()); + } else { + subscriber.onComplete(); + return; + } + } + } + + @Override + public void cancel() { + + } + }); + } + }; + if (this.getSchema().variableLengthKeysCount() <= 0) { + db.putBatch(colId, batchPublisher, PutBatchMode.SST_INGESTION); + + if (getHasValues()) { + for (int i = 0; i < 4; i++) { + assertSegmentEquals(getValueI(i), db.get(arena, 0, colId, getKeyI(i), RequestType.current())); + } + } + for (int i = 0; i < 4; i++) { + Assertions.assertTrue(db.get(arena, 0, colId, getKeyI(i), RequestType.exists())); + } + } else { + Assertions.assertThrows(RocksDBException.class, () -> { + db.putBatch(colId, batchPublisher, PutBatchMode.SST_INGESTION); + }); + + } + } + @Test void concurrentUpdate() { if (getHasValues()) { diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/TestSSTWriter.java b/src/test/java/it/cavallium/rockserver/core/impl/test/TestSSTWriter.java new file mode 100644 index 0000000..162a044 --- /dev/null +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/TestSSTWriter.java @@ -0,0 +1,208 @@ +package it.cavallium.rockserver.core.impl.test; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.config.*; +import it.cavallium.rockserver.core.impl.EmbeddedDB; +import it.unimi.dsi.fastutil.ints.IntList; +import it.unimi.dsi.fastutil.objects.ObjectList; +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.rocksdb.CompressionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +public class TestSSTWriter { + + private static final Logger LOG = LoggerFactory.getLogger(TestSSTWriter.class); + + private EmbeddedDB db; + private long colId; + + @BeforeEach + public void setUp() throws IOException { + db = new EmbeddedDB(null, "test", null); + this.colId = db.createColumn("test", ColumnSchema.of(IntList.of(Long.BYTES), ObjectList.of(), true)); + } + + @Test + public void test() throws IOException { + LOG.info("Obtaining sst writer"); + var globalDatabaseConfigOverride = new GlobalDatabaseConfig() { + @Override + public boolean spinning() { + return false; + } + + @Override + public boolean checksum() { + return false; + } + + @Override + public boolean useDirectIo() { + return false; + } + + @Override + public boolean allowRocksdbMemoryMapping() { + return true; + } + + @Override + public @Nullable Integer maximumOpenFiles() { + return -1; + } + + @Override + public boolean optimistic() { + return true; + } + + @Override + public @Nullable DataSize blockCache() { + return new DataSize("10MiB"); + } + + @Override + public @Nullable DataSize writeBufferManager() { + return new DataSize("1MiB"); + } + + @Override + public @Nullable Path logPath() { + return null; + } + + @Override + public @Nullable Path walPath() { + return null; + } + + @Override + public @Nullable Duration delayWalFlushDuration() { + return null; + } + + @Override + public boolean absoluteConsistency() { + return false; + } + + @Override + public boolean ingestBehind() { + return true; + } + + @Override + public boolean unorderedWrite() { + return false; + } + + @Override + public VolumeConfig[] volumes() { + return new VolumeConfig[0]; + } + + @Override + public FallbackColumnConfig fallbackColumnOptions() { + return null; + } + + @Override + public NamedColumnConfig[] columnOptions() { + return new NamedColumnConfig[0]; + } + }; + var fallbackColumnConfig = new FallbackColumnConfig() { + + @Override + public ColumnLevelConfig[] levels() { + return new ColumnLevelConfig[] { + new ColumnLevelConfig() { + @Override + public CompressionType compression() { + return CompressionType.NO_COMPRESSION; + } + + @Override + public DataSize maxDictBytes() { + return DataSize.ZERO; + } + } + }; + } + + @Override + public @Nullable DataSize memtableMemoryBudgetBytes() { + return new DataSize("1MiB"); + } + + @Override + public @Nullable Boolean cacheIndexAndFilterBlocks() { + return true; + } + + @Override + public @Nullable Boolean partitionFilters() { + return false; + } + + @Override + public @Nullable BloomFilterConfig bloomFilter() { + return new BloomFilterConfig() { + @Override + public int bitsPerKey() { + return 10; + } + + @Override + public @Nullable Boolean optimizeForHits() { + return true; + } + }; + } + + @Override + public @Nullable DataSize blockSize() { + return new DataSize("128KiB"); + } + + @Override + public @Nullable DataSize writeBufferSize() { + return new DataSize("1MiB"); + } + }; + try (var sstWriter = db.getSSTWriter(colId, globalDatabaseConfigOverride, fallbackColumnConfig, false, true)) { + LOG.info("Creating sst"); + var tl = ThreadLocalRandom.current(); + var bytes = new byte[1024]; + long i = 0; + while (i < 10_000) { + var ib = Longs.toByteArray(i++); + tl.nextBytes(bytes); + sstWriter.put(ib, bytes); + } + LOG.info("Writing pending sst data"); + sstWriter.writePending(); + LOG.info("Done, closing"); + } + LOG.info("Done"); + } + + @AfterEach + public void tearDown() throws IOException { + db.close(); + } +} diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java index 042f488..8f31edf 100644 --- a/src/test/java/module-info.java +++ b/src/test/java/module-info.java @@ -3,6 +3,12 @@ module rockserver.core.test { requires rockserver.core; requires org.junit.jupiter.api; requires it.unimi.dsi.fastutil; + requires com.google.common; + requires org.slf4j; + requires org.github.gestalt.core; + requires org.jetbrains.annotations; + requires rocksdbjni; + requires org.reactivestreams; opens it.cavallium.rockserver.core.test; opens it.cavallium.rockserver.core.impl.test; } \ No newline at end of file