diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index 826226b..677fc3c 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -1,15 +1,8 @@ package it.cavallium.rockserver.core.client; -import it.cavallium.rockserver.core.common.Keys; -import it.cavallium.rockserver.core.common.RequestType; +import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; -import it.cavallium.rockserver.core.common.ColumnSchema; -import it.cavallium.rockserver.core.common.RocksDBAPI; -import it.cavallium.rockserver.core.common.RocksDBAPICommand; -import it.cavallium.rockserver.core.common.RocksDBAsyncAPI; -import it.cavallium.rockserver.core.common.RocksDBException; -import it.cavallium.rockserver.core.common.RocksDBSyncAPI; import it.cavallium.rockserver.core.impl.EmbeddedDB; import java.io.IOException; import java.lang.foreign.Arena; @@ -118,6 +111,15 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { return db.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType); } + @Override + public void putBatch(Arena arena, + long columnId, + @NotNull List keys, + @NotNull List<@NotNull MemorySegment> values, + @NotNull PutBatchMode mode) throws RocksDBException { + db.putBatch(arena, columnId, keys, values, mode); + } + @Override public T get(Arena arena, long transactionOrUpdateId, diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index fc6cafc..4bc0339 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -12,9 +12,9 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnSchema; -import it.cavallium.rockserver.core.common.Keys; -import it.cavallium.rockserver.core.common.RequestType; +import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestDelta; @@ -26,14 +26,10 @@ import it.cavallium.rockserver.core.common.RequestType.RequestNothing; import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; import it.cavallium.rockserver.core.common.RequestType.RequestPut; -import it.cavallium.rockserver.core.common.RocksDBAPI; -import it.cavallium.rockserver.core.common.RocksDBAPICommand; -import it.cavallium.rockserver.core.common.RocksDBAsyncAPI; -import it.cavallium.rockserver.core.common.RocksDBException; -import it.cavallium.rockserver.core.common.RocksDBSyncAPI; -import it.cavallium.rockserver.core.common.UpdateContext; import it.cavallium.rockserver.core.common.Utils.HostAndPort; import it.cavallium.rockserver.core.common.api.proto.*; +import it.cavallium.rockserver.core.common.api.proto.ColumnHashType; +import it.cavallium.rockserver.core.common.api.proto.Delta; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; @@ -190,10 +186,10 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { CompletableFuture> responseObserver; - if (requestType instanceof RequestType.RequestNothing) { + if (requestType instanceof RequestType.RequestNothing && transactionOrUpdateId == 0L) { var putBatchRequestBuilder = PutBatchRequest.newBuilder() - .setTransactionOrUpdateId(transactionOrUpdateId) - .setColumnId(columnId); + .setColumnId(columnId) + .setMode(it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH); var it1 = allKeys.iterator(); var it2 = allValues.iterator(); @@ -215,6 +211,12 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { .build(); StreamObserver requestPublisher = switch (requestType) { + case RequestNothing _ -> { + var thisResponseObserver = new CollectListStreamObserver(0); + //noinspection unchecked + responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; + yield this.asyncStub.putMulti(thisResponseObserver); + } case RequestPrevious _ -> { var thisResponseObserver = new CollectListMappedStreamObserver( GrpcConnection::mapPrevious, count); @@ -258,6 +260,39 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { return responseObserver; } + @Override + public CompletableFuture putBatchAsync(Arena arena, + long columnId, + @NotNull List<@NotNull Keys> allKeys, + @NotNull List<@NotNull MemorySegment> allValues, + @NotNull PutBatchMode mode) throws RocksDBException { + var count = allKeys.size(); + if (count != allValues.size()) { + throw new IllegalArgumentException("Keys length is different than values length! " + + count + " != " + allValues.size()); + } + + var putBatchRequestBuilder = PutBatchRequest.newBuilder() + .setColumnId(columnId) + .setMode(switch (mode) { + case WRITE_BATCH -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH; + case WRITE_BATCH_NO_WAL -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH_NO_WAL; + case SST_INGESTION -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGESTION; + case SST_INGEST_BEHIND -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGEST_BEHIND; + }); + + var it1 = allKeys.iterator(); + var it2 = allValues.iterator(); + + while (it1.hasNext()) { + var k = it1.next(); + var v = it2.next(); + putBatchRequestBuilder.addData(mapKV(k, v)); + } + + return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null); + } + @SuppressWarnings("unchecked") @Override public CompletableFuture getAsync(Arena arena, diff --git a/src/main/java/it/cavallium/rockserver/core/common/PutBatchMode.java b/src/main/java/it/cavallium/rockserver/core/common/PutBatchMode.java new file mode 100644 index 0000000..7e12958 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/PutBatchMode.java @@ -0,0 +1,12 @@ +package it.cavallium.rockserver.core.common; + +public enum PutBatchMode { + WRITE_BATCH, + WRITE_BATCH_NO_WAL, + SST_INGESTION, + /** + * Ingest an SST behind, skipping duplicate keys + * and ingesting everything in the bottommost level + */ + SST_INGEST_BEHIND +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java index 9dd8961..e31dd05 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPICommand.java @@ -176,9 +176,9 @@ public sealed interface RocksDBAPICommand { * @param requestType the request type determines which type of data will be returned. */ record PutMulti(Arena arena, long transactionOrUpdateId, long columnId, - @NotNull List keys, - @NotNull List<@NotNull MemorySegment> values, - RequestPut requestType) implements RocksDBAPICommand> { + @NotNull List keys, + @NotNull List<@NotNull MemorySegment> values, + RequestPut requestType) implements RocksDBAPICommand> { @Override public List handleSync(RocksDBSyncAPI api) { @@ -208,6 +208,45 @@ public sealed interface RocksDBAPICommand { return sb.toString(); } } + /** + * Put multiple elements into the specified positions + * @param arena arena + * @param columnId column id + * @param keys multiple lists of column keys + * @param values multiple values, or null if not needed + * @param mode put batch mode + */ + record PutBatch(Arena arena, long columnId, + @NotNull List keys, + @NotNull List<@NotNull MemorySegment> values, + @NotNull PutBatchMode mode) implements RocksDBAPICommand { + + @Override + public Void handleSync(RocksDBSyncAPI api) { + api.putBatch(arena, columnId, keys, values, mode); + return null; + } + + @Override + public CompletionStage handleAsync(RocksDBAsyncAPI api) { + return api.putBatchAsync(arena, columnId, keys, values, mode); + } + + @Override + public String toString() { + var sb = new StringBuilder("PUT_BATCH"); + sb.append(" column:").append(columnId); + sb.append(" mode:").append(mode); + sb.append(" batch:["); + for (int i = 0; i < keys.size(); i++) { + if (i > 0) sb.append(","); + sb.append(" keys:").append(keys.get(i)); + sb.append(" value:").append(Utils.toPrettyString(values.get(i))); + } + sb.append("]"); + return sb.toString(); + } + } /** * Get an element from the specified position * @param arena arena diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java index b91d6ef..84bc44e 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAsyncAPI.java @@ -13,6 +13,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator; import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch; import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; import java.lang.foreign.Arena; @@ -74,6 +75,15 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler { return requestAsync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType)); } + /** See: {@link PutBatch}. */ + default CompletableFuture putBatchAsync(Arena arena, + long columnId, + @NotNull List<@NotNull Keys> keys, + @NotNull List<@NotNull MemorySegment> values, + @NotNull PutBatchMode mode) throws RocksDBException { + return requestAsync(new PutBatch(arena, columnId, keys, values, mode)); + } + /** See: {@link Get}. */ default CompletableFuture getAsync(Arena arena, long transactionOrUpdateId, diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java index 7fe0e4c..207a5c7 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -26,7 +26,8 @@ public class RocksDBException extends RuntimeException { COMMIT_FAILED, TX_NOT_FOUND, KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR, - WRITE_BATCH_1 + WRITE_BATCH_1, + SST_WRITE_1 } public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java index c0d1a41..82a1b5b 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBSyncAPI.java @@ -13,6 +13,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator; import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; +import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch; import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; import java.lang.foreign.Arena; @@ -73,6 +74,15 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler { return requestSync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType)); } + /** See: {@link PutBatch}. */ + default void putBatch(Arena arena, + long columnId, + @NotNull List keys, + @NotNull List<@NotNull MemorySegment> values, + @NotNull PutBatchMode mode) throws RocksDBException { + requestSync(new PutBatch(arena, columnId, keys, values, mode)); + } + /** See: {@link Get}. */ default T get(Arena arena, long transactionOrUpdateId, diff --git a/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java b/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java index 84ece82..8c39e6a 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java +++ b/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java @@ -69,6 +69,7 @@ public class ConfigPrinter { "log-path": "%s", "wal-path": "%s", "absolute-consistency": %b, + "ingest-behind": %b, "volumes": %s, "fallback-column-options": %s, "column-options": %s @@ -84,6 +85,7 @@ public class ConfigPrinter { o.logPath(), o.walPath(), o.absoluteConsistency(), + o.ingestBehind(), result.toString(), stringifyFallbackColumn(o.fallbackColumnOptions()), joiner.toString() diff --git a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java index 8d2bd0a..4a8df78 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java @@ -38,6 +38,10 @@ public interface GlobalDatabaseConfig { boolean absoluteConsistency() throws GestaltException; + boolean ingestBehind() throws GestaltException; + + boolean unorderedWrite() throws GestaltException; + VolumeConfig[] volumes() throws GestaltException; FallbackColumnConfig fallbackColumnOptions() throws GestaltException; diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index 9c5957e..011d687 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -5,18 +5,10 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; -import it.cavallium.rockserver.core.common.ColumnHashType; -import it.cavallium.rockserver.core.common.Keys; -import it.cavallium.rockserver.core.common.RequestType; +import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestPut; -import it.cavallium.rockserver.core.common.ColumnSchema; -import it.cavallium.rockserver.core.common.Delta; -import it.cavallium.rockserver.core.common.RocksDBSyncAPI; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; -import it.cavallium.rockserver.core.common.RocksDBRetryException; -import it.cavallium.rockserver.core.common.UpdateContext; -import it.cavallium.rockserver.core.common.Utils; import it.cavallium.rockserver.core.config.ConfigParser; import it.cavallium.rockserver.core.config.ConfigPrinter; import it.cavallium.rockserver.core.config.DatabaseConfig; @@ -34,6 +26,7 @@ import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -43,11 +36,12 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; -import java.util.logging.Level; + import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.*; +import org.rocksdb.RocksDBException; import org.rocksdb.Status.Code; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -420,28 +414,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { if (requestType instanceof RequestType.RequestNothing && !getColumn(columnId).hasBuckets() && transactionOrUpdateId == 0L) { - ops.beginOp(); - try { - // Column id - var col = getColumn(columnId); - try (var wb = new WB(new WriteBatch())) { - var keyIt = keys.iterator(); - var valusIt = values.iterator(); - while (keyIt.hasNext()) { - var key = keyIt.next(); - var value = valusIt.next(); - put(arena, wb, col, 0, key, value, requestType); - } - wb.write(db.get()); - } - return List.of(); - } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { - throw ex; - } catch (Exception ex) { - throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); - } finally { - ops.endOp(); - } + putBatch(arena, columnId, keys, values, PutBatchMode.WRITE_BATCH); + return List.of(); } else { List responses = requestType instanceof RequestType.RequestNothing ? null : new ArrayList<>(keys.size()); for (int i = 0; i < keys.size(); i++) { @@ -454,11 +428,82 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } } + + @Override + public void putBatch(Arena arena, + long columnId, + @NotNull List keys, + @NotNull List<@NotNull MemorySegment> values, + @NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException { + if (keys.size() != values.size()) { + throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); + } + ops.beginOp(); + try { + // Column id + var col = getColumn(columnId); + List refs = new ArrayList<>(); + try { + DBWriter writer = switch (mode) { + case WRITE_BATCH, WRITE_BATCH_NO_WAL -> { + var wb = new WB(db.get(), new WriteBatch(), mode == PutBatchMode.WRITE_BATCH_NO_WAL); + refs.add(wb); + yield wb; + } + case SST_INGESTION, SST_INGEST_BEHIND -> { + var envOptions = new EnvOptions(); + refs.add(envOptions); + var compressionOptions = new CompressionOptions() + .setEnabled(true) + .setMaxDictBytes(32768) + .setZStdMaxTrainBytes(32768 * 4); + refs.add(compressionOptions); + var options = new Options() + .setCompressionOptions(compressionOptions) + .setCompressionType(CompressionType.ZSTD_COMPRESSION) + .setUnorderedWrite(true) + .setAllowIngestBehind(mode == PutBatchMode.SST_INGEST_BEHIND) + .setAllowConcurrentMemtableWrite(true); + refs.add(options); + var tempFile = Files.createTempFile("temp", ".sst"); + var sstFileWriter = new SstFileWriter(envOptions, options); + var sstWriter = new SSTWriter(db.get(), col, tempFile, sstFileWriter, mode == PutBatchMode.SST_INGEST_BEHIND); + refs.add(sstWriter); + sstFileWriter.open(tempFile.toString()); + yield sstWriter; + } + }; + var keyIt = keys.iterator(); + var valusIt = values.iterator(); + while (keyIt.hasNext()) { + var key = keyIt.next(); + var value = valusIt.next(); + put(arena, writer, col, 0, key, value, RequestType.none()); + } + writer.writePending(); + } finally { + for (int i = refs.size() - 1; i >= 0; i--) { + try { + refs.get(i).close(); + } catch (Exception ex) { + logger.error("Failed to close reference during batch write", ex); + } + } + } + } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { + throw ex; + } catch (Exception ex) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); + } finally { + ops.endOp(); + } + } + /** * @param txConsumer this can be called multiple times, if the optimistic transaction failed */ - public R wrapWithTransactionIfNeeded(@Nullable T tx, boolean needTransaction, - ExFunction<@Nullable T, R> txConsumer) throws Exception { + public R wrapWithTransactionIfNeeded(@Nullable T tx, boolean needTransaction, + ExFunction<@Nullable T, R> txConsumer) throws Exception { if (needTransaction) { return ensureWrapWithTransaction(tx, txConsumer); } else { @@ -470,8 +515,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { /** * @param txConsumer this can be called multiple times, if the optimistic transaction failed */ - public R ensureWrapWithTransaction(@Nullable T tx, - ExFunction<@NotNull T, R> txConsumer) throws Exception { + public R ensureWrapWithTransaction(@Nullable T tx, + ExFunction<@NotNull T, R> txConsumer) throws Exception { R result; if (tx == null) { // Retry using a transaction: transactions are required to handle this kind of data @@ -496,7 +541,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } private U put(Arena arena, - @Nullable TxOrWb optionalTxOrUpdate, + @Nullable DBWriter optionalDbWriter, ColumnInstance col, long updateId, @NotNull Keys keys, @@ -510,55 +555,55 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { boolean needsTx = col.hasBuckets() || requirePreviousValue || requirePreviousPresence; - if (optionalTxOrUpdate instanceof Tx tx && tx.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) { + if (optionalDbWriter instanceof Tx tx && tx.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, "You can't get the previous value or delta, when you are already updating that value"); } - if (updateId != 0L && !(optionalTxOrUpdate instanceof Tx)) { + if (updateId != 0L && !(optionalDbWriter instanceof Tx)) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, "Update id must be accompanied with a valid transaction"); } - if (col.hasBuckets() && (optionalTxOrUpdate instanceof WB)) { + if (col.hasBuckets() && (optionalDbWriter != null && !(optionalDbWriter instanceof Tx))) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, "Column with buckets don't support write batches"); } - return wrapWithTransactionIfNeeded(optionalTxOrUpdate, needsTx, tx -> { + return wrapWithTransactionIfNeeded(optionalDbWriter, needsTx, dbWriter -> { MemorySegment previousValue; MemorySegment calculatedKey = col.calculateKey(arena, keys.keys()); if (updateId != 0L) { - assert tx instanceof Tx; - ((Tx) tx).val().setSavePoint(); + assert dbWriter instanceof Tx; + ((Tx) dbWriter).val().setSavePoint(); } if (col.hasBuckets()) { - assert tx instanceof Tx; + assert dbWriter instanceof Tx; var bucketElementKeys = col.getBucketElementKeys(keys.keys()); try (var readOptions = new ReadOptions()) { - var previousRawBucketByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); + var previousRawBucketByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray); var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col); previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value)); var k = Utils.toByteArray(calculatedKey); var v = Utils.toByteArray(bucket.toSegment(arena)); - ((Tx) tx).val().put(col.cfh(), k, v); + ((Tx) dbWriter).val().put(col.cfh(), k, v); } catch (RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e); } } else { if (RequestType.requiresGettingPreviousValue(callback)) { - assert tx instanceof Tx; + assert dbWriter instanceof Tx; try (var readOptions = new ReadOptions()) { byte[] previousValueByteArray; - previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); + previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray)); } catch (RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); } } else if (RequestType.requiresGettingPreviousPresence(callback)) { // todo: in the future this should be replaced with just keyExists - assert tx instanceof Tx; + assert dbWriter instanceof Tx; try (var readOptions = new ReadOptions()) { byte[] previousValueByteArray; - previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); + previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = previousValueByteArray != null ? MemorySegment.NULL : null; } catch (RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); @@ -566,8 +611,9 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } else { previousValue = null; } - switch (tx) { + switch (dbWriter) { case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); + case SSTWriter sstWriter -> sstWriter.sstFileWriter().put(Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case null -> { try (var w = new WriteOptions()) { @@ -588,8 +634,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { if (updateId != 0L) { if (!closeTransaction(updateId, true)) { - ((Tx) tx).val().rollbackToSavePoint(); - ((Tx) tx).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey)); + ((Tx) dbWriter).val().rollbackToSavePoint(); + ((Tx) dbWriter).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey)); throw new RocksDBRetryException(); } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/DBWriter.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/DBWriter.java new file mode 100644 index 0000000..a2802cb --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/DBWriter.java @@ -0,0 +1,10 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import org.rocksdb.RocksDB; + +public sealed interface DBWriter permits SSTWriter, Tx, WB { + /** + * Writes any pending kv pair to the db + */ + void writePending() throws it.cavallium.rockserver.core.common.RocksDBException; +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java index b9379b1..57ccf48 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java @@ -260,6 +260,10 @@ public class RocksDBLoader { options.setUseDirectIoForFlushAndCompaction(true); } + options + .setAllowIngestBehind(databaseOptions.global().ingestBehind()) + .setUnorderedWrite(databaseOptions.global().unorderedWrite()); + return new OptionsWithCache(options, blockCache); } catch (GestaltException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, e); diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/SSTWriter.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/SSTWriter.java new file mode 100644 index 0000000..e45e717 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/SSTWriter.java @@ -0,0 +1,37 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import it.cavallium.rockserver.core.common.RocksDBException; +import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.SstFileWriter; + +import java.io.Closeable; +import java.nio.file.Path; +import java.util.List; + +public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInstance col, Path path, SstFileWriter sstFileWriter, boolean ingestBehind) implements Closeable, DBWriter { + + @Override + public void writePending() throws it.cavallium.rockserver.core.common.RocksDBException { + try { + sstFileWriter.finish(); + try (var ingestOptions = new IngestExternalFileOptions()) { + ingestOptions + .setIngestBehind(ingestBehind) + .setAllowBlockingFlush(true) + .setMoveFiles(true) + .setAllowGlobalSeqNo(true) + .setWriteGlobalSeqno(false) + .setSnapshotConsistency(false); + db.ingestExternalFile(col.cfh(), List.of(path.toString()), ingestOptions); + } + } catch (org.rocksdb.RocksDBException e) { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_WRITE_1, e); + } + } + + @Override + public void close() { + sstFileWriter.close(); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java index b1b54b3..029f6bc 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java @@ -1,11 +1,11 @@ package it.cavallium.rockserver.core.impl.rocksdb; import java.io.Closeable; -import org.rocksdb.AbstractNativeReference; + import org.rocksdb.Transaction; public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs) - implements Closeable, TxOrWb { + implements Closeable, DBWriter { @Override public void close() { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TxOrWb.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TxOrWb.java deleted file mode 100644 index 3bc5c41..0000000 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TxOrWb.java +++ /dev/null @@ -1,4 +0,0 @@ -package it.cavallium.rockserver.core.impl.rocksdb; - -public sealed interface TxOrWb permits Tx, WB { -} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java index c017889..fce01f3 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java @@ -8,16 +8,16 @@ import org.rocksdb.WriteOptions; import java.io.Closeable; -public record WB(@NotNull WriteBatch wb) implements Closeable, TxOrWb { +public record WB(RocksDB rocksDB, @NotNull WriteBatch wb, boolean disableWal) implements Closeable, DBWriter { private static final boolean MIGRATE = Boolean.parseBoolean(System.getProperty("rocksdb.migrate", "false")); @Override public void close() { wb.close(); } - public void write(RocksDB rocksDB) throws RocksDBException { + public void writePending() throws RocksDBException { try (var w = new WriteOptions()) { - if (MIGRATE) { + if (disableWal || MIGRATE) { w.setDisableWAL(true); } rocksDB.write(w, wb); diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index 09af251..bf103c3 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -17,6 +17,7 @@ import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestDelta; @@ -196,12 +197,17 @@ public class GrpcServer extends Server { executor.execute(() -> { try { try (var arena = Arena.ofConfined()) { - api.putMulti(arena, - request.getTransactionOrUpdateId(), + api.putBatch(arena, request.getColumnId(), mapKeysKV(arena, request.getDataCount(), request::getData), mapValuesKV(arena, request.getDataCount(), request::getData), - new RequestNothing<>() + switch (request.getMode()) { + case WRITE_BATCH -> PutBatchMode.WRITE_BATCH; + case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL; + case SST_INGESTION -> PutBatchMode.SST_INGESTION; + case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND; + case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\""); + } ); } responseObserver.onNext(Empty.getDefaultInstance()); diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index 41a0ab8..c0ce901 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -29,6 +29,13 @@ enum Operation { PREVIOUS_PRESENCE = 8; } +enum PutBatchMode { + WRITE_BATCH = 0; + WRITE_BATCH_NO_WAL = 1; + SST_INGESTION = 2; + SST_INGEST_BEHIND = 3; +} + message Delta { optional bytes previous = 1; optional bytes current = 2; @@ -65,7 +72,7 @@ message GetColumnIdResponse {int64 columnId = 1;} message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;} -message PutBatchRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; repeated KV data = 3;} +message PutBatchRequest {int64 columnId = 1; repeated KV data = 2; PutBatchMode mode = 3;} message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;} message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}} diff --git a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf index 4682ffe..8c67118 100644 --- a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf +++ b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf @@ -6,6 +6,40 @@ database: { spinning: false # Enable to require absolute consistency after a crash. False to use the PointInTime recovery strategy absolute-consistency: true + # Set this option to true during creation of database if you want to be able + # to ingest behind (call IngestExternalFile() skipping keys that already + # exist, rather than overwriting matching keys). + # Setting this option to true will affect 2 things: + # 1) Disable some internal optimizations around SST file compression + # 2) Reserve bottom-most level for ingested files only. + # 3) Note that num_levels should be >= 3 if this option is turned on. + # DEFAULT: false + ingest-behind: false + # ENABLE THIS ONLY WHEN DOING BULK WRITES, THIS IS UNSAFE TO USE IN NORMAL SCENARIOS + # Setting unorderedWrite() to true trades higher write throughput + # with relaxing the immutability guarantee of snapshots. + # This violates the repeatability one expects from ::Get from a snapshot, + # as well as ::MultiGet and Iterator's consistent-point-in-time view property. + # If the application cannot tolerate the relaxed guarantees, + # it can implement its own mechanisms to work around + # that and yet benefit from the higher throughput. + # Using TransactionDB with WRITE_PREPARED write policy and twoWriteQueues() true + # is one way to achieve immutable snapshots despite unordered_write. + # By default, i. e., when it is false, rocksdb does not advance the sequence + # number for new snapshots unless all the writes with + # lower sequence numbers are already finished. + # This provides the immutability that we except from snapshots. + # Moreover, since Iterator and MultiGet internally depend on snapshots, + # the snapshot immutability results into Iterator + # and MultiGet offering consistent-point-in-time view. + # If set to true, although Read-Your-Own-Write property is still provided, + # the snapshot immutability property is relaxed: the writes issued after + # the snapshot is obtained (with larger sequence numbers) will be still not + # visible to the reads from that snapshot, however, there still might be pending + # writes (with lower sequence number) that will change the state visible + # to the snapshot after they are landed to the memtable. + # DEFAULT: false + unordered-write: false # Error checking checksum: true # Use direct I/O in RocksDB databases (Higher I/O read throughput but OS cache is not used, less swapping, less memory pressure)