PutBatch command

This commit is contained in:
Andrea Cavalli 2024-09-23 15:15:45 +02:00
parent d8f9907ed8
commit bcd18e72a9
19 changed files with 344 additions and 89 deletions

View File

@ -1,15 +1,8 @@
package it.cavallium.rockserver.core.client; package it.cavallium.rockserver.core.client;
import it.cavallium.rockserver.core.common.Keys; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.RocksDBAPI;
import it.cavallium.rockserver.core.common.RocksDBAPICommand;
import it.cavallium.rockserver.core.common.RocksDBAsyncAPI;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import it.cavallium.rockserver.core.impl.EmbeddedDB; import it.cavallium.rockserver.core.impl.EmbeddedDB;
import java.io.IOException; import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
@ -118,6 +111,15 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
return db.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType); return db.putMulti(arena, transactionOrUpdateId, columnId, keys, values, requestType);
} }
@Override
public void putBatch(Arena arena,
long columnId,
@NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) throws RocksDBException {
db.putBatch(arena, columnId, keys, values, mode);
}
@Override @Override
public <T> T get(Arena arena, public <T> T get(Arena arena,
long transactionOrUpdateId, long transactionOrUpdateId,

View File

@ -12,9 +12,9 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Keys; import it.cavallium.rockserver.core.common.PutBatchMode;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
import it.cavallium.rockserver.core.common.RequestType.RequestDelta; import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
@ -26,14 +26,10 @@ import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBAPI;
import it.cavallium.rockserver.core.common.RocksDBAPICommand;
import it.cavallium.rockserver.core.common.RocksDBAsyncAPI;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import it.cavallium.rockserver.core.common.UpdateContext;
import it.cavallium.rockserver.core.common.Utils.HostAndPort; import it.cavallium.rockserver.core.common.Utils.HostAndPort;
import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.ColumnHashType;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub;
@ -190,10 +186,10 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
CompletableFuture<List<T>> responseObserver; CompletableFuture<List<T>> responseObserver;
if (requestType instanceof RequestType.RequestNothing<?>) { if (requestType instanceof RequestType.RequestNothing<?> && transactionOrUpdateId == 0L) {
var putBatchRequestBuilder = PutBatchRequest.newBuilder() var putBatchRequestBuilder = PutBatchRequest.newBuilder()
.setTransactionOrUpdateId(transactionOrUpdateId) .setColumnId(columnId)
.setColumnId(columnId); .setMode(it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH);
var it1 = allKeys.iterator(); var it1 = allKeys.iterator();
var it2 = allValues.iterator(); var it2 = allValues.iterator();
@ -215,6 +211,12 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.build(); .build();
StreamObserver<PutMultiRequest> requestPublisher = switch (requestType) { StreamObserver<PutMultiRequest> requestPublisher = switch (requestType) {
case RequestNothing<?> _ -> {
var thisResponseObserver = new CollectListStreamObserver<Empty>(0);
//noinspection unchecked
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
yield this.asyncStub.putMulti(thisResponseObserver);
}
case RequestPrevious<?> _ -> { case RequestPrevious<?> _ -> {
var thisResponseObserver = new CollectListMappedStreamObserver<Previous, @Nullable MemorySegment>( var thisResponseObserver = new CollectListMappedStreamObserver<Previous, @Nullable MemorySegment>(
GrpcConnection::mapPrevious, count); GrpcConnection::mapPrevious, count);
@ -258,6 +260,39 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
return responseObserver; return responseObserver;
} }
@Override
public CompletableFuture<Void> putBatchAsync(Arena arena,
long columnId,
@NotNull List<@NotNull Keys> allKeys,
@NotNull List<@NotNull MemorySegment> allValues,
@NotNull PutBatchMode mode) throws RocksDBException {
var count = allKeys.size();
if (count != allValues.size()) {
throw new IllegalArgumentException("Keys length is different than values length! "
+ count + " != " + allValues.size());
}
var putBatchRequestBuilder = PutBatchRequest.newBuilder()
.setColumnId(columnId)
.setMode(switch (mode) {
case WRITE_BATCH -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH;
case WRITE_BATCH_NO_WAL -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH_NO_WAL;
case SST_INGESTION -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGESTION;
case SST_INGEST_BEHIND -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGEST_BEHIND;
});
var it1 = allKeys.iterator();
var it2 = allValues.iterator();
while (it1.hasNext()) {
var k = it1.next();
var v = it2.next();
putBatchRequestBuilder.addData(mapKV(k, v));
}
return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T> CompletableFuture<T> getAsync(Arena arena, public <T> CompletableFuture<T> getAsync(Arena arena,

View File

@ -0,0 +1,12 @@
package it.cavallium.rockserver.core.common;
public enum PutBatchMode {
WRITE_BATCH,
WRITE_BATCH_NO_WAL,
SST_INGESTION,
/**
* Ingest an SST behind, skipping duplicate keys
* and ingesting everything in the bottommost level
*/
SST_INGEST_BEHIND
}

View File

@ -176,9 +176,9 @@ public sealed interface RocksDBAPICommand<R> {
* @param requestType the request type determines which type of data will be returned. * @param requestType the request type determines which type of data will be returned.
*/ */
record PutMulti<T>(Arena arena, long transactionOrUpdateId, long columnId, record PutMulti<T>(Arena arena, long transactionOrUpdateId, long columnId,
@NotNull List<Keys> keys, @NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values, @NotNull List<@NotNull MemorySegment> values,
RequestPut<? super MemorySegment, T> requestType) implements RocksDBAPICommand<List<T>> { RequestPut<? super MemorySegment, T> requestType) implements RocksDBAPICommand<List<T>> {
@Override @Override
public List<T> handleSync(RocksDBSyncAPI api) { public List<T> handleSync(RocksDBSyncAPI api) {
@ -208,6 +208,45 @@ public sealed interface RocksDBAPICommand<R> {
return sb.toString(); return sb.toString();
} }
} }
/**
* Put multiple elements into the specified positions
* @param arena arena
* @param columnId column id
* @param keys multiple lists of column keys
* @param values multiple values, or null if not needed
* @param mode put batch mode
*/
record PutBatch(Arena arena, long columnId,
@NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) implements RocksDBAPICommand<Void> {
@Override
public Void handleSync(RocksDBSyncAPI api) {
api.putBatch(arena, columnId, keys, values, mode);
return null;
}
@Override
public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) {
return api.putBatchAsync(arena, columnId, keys, values, mode);
}
@Override
public String toString() {
var sb = new StringBuilder("PUT_BATCH");
sb.append(" column:").append(columnId);
sb.append(" mode:").append(mode);
sb.append(" batch:[");
for (int i = 0; i < keys.size(); i++) {
if (i > 0) sb.append(",");
sb.append(" keys:").append(keys.get(i));
sb.append(" value:").append(Utils.toPrettyString(values.get(i)));
}
sb.append("]");
return sb.toString();
}
}
/** /**
* Get an element from the specified position * Get an element from the specified position
* @param arena arena * @param arena arena

View File

@ -13,6 +13,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
@ -74,6 +75,15 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
return requestAsync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType)); return requestAsync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType));
} }
/** See: {@link PutBatch}. */
default CompletableFuture<Void> putBatchAsync(Arena arena,
long columnId,
@NotNull List<@NotNull Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) throws RocksDBException {
return requestAsync(new PutBatch(arena, columnId, keys, values, mode));
}
/** See: {@link Get}. */ /** See: {@link Get}. */
default <T> CompletableFuture<T> getAsync(Arena arena, default <T> CompletableFuture<T> getAsync(Arena arena,
long transactionOrUpdateId, long transactionOrUpdateId,

View File

@ -26,7 +26,8 @@ public class RocksDBException extends RuntimeException {
COMMIT_FAILED, COMMIT_FAILED,
TX_NOT_FOUND, TX_NOT_FOUND,
KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR, KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR,
WRITE_BATCH_1 WRITE_BATCH_1,
SST_WRITE_1
} }
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) {

View File

@ -13,6 +13,7 @@ import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenIterator;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction; import it.cavallium.rockserver.core.common.RocksDBAPICommand.OpenTransaction;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Put;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti; import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutMulti;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.PutBatch;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo; import it.cavallium.rockserver.core.common.RocksDBAPICommand.SeekTo;
import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent; import it.cavallium.rockserver.core.common.RocksDBAPICommand.Subsequent;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
@ -73,6 +74,15 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler {
return requestSync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType)); return requestSync(new PutMulti<>(arena, transactionOrUpdateId, columnId, keys, values, requestType));
} }
/** See: {@link PutBatch}. */
default void putBatch(Arena arena,
long columnId,
@NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) throws RocksDBException {
requestSync(new PutBatch(arena, columnId, keys, values, mode));
}
/** See: {@link Get}. */ /** See: {@link Get}. */
default <T> T get(Arena arena, default <T> T get(Arena arena,
long transactionOrUpdateId, long transactionOrUpdateId,

View File

@ -69,6 +69,7 @@ public class ConfigPrinter {
"log-path": "%s", "log-path": "%s",
"wal-path": "%s", "wal-path": "%s",
"absolute-consistency": %b, "absolute-consistency": %b,
"ingest-behind": %b,
"volumes": %s, "volumes": %s,
"fallback-column-options": %s, "fallback-column-options": %s,
"column-options": %s "column-options": %s
@ -84,6 +85,7 @@ public class ConfigPrinter {
o.logPath(), o.logPath(),
o.walPath(), o.walPath(),
o.absoluteConsistency(), o.absoluteConsistency(),
o.ingestBehind(),
result.toString(), result.toString(),
stringifyFallbackColumn(o.fallbackColumnOptions()), stringifyFallbackColumn(o.fallbackColumnOptions()),
joiner.toString() joiner.toString()

View File

@ -38,6 +38,10 @@ public interface GlobalDatabaseConfig {
boolean absoluteConsistency() throws GestaltException; boolean absoluteConsistency() throws GestaltException;
boolean ingestBehind() throws GestaltException;
boolean unorderedWrite() throws GestaltException;
VolumeConfig[] volumes() throws GestaltException; VolumeConfig[] volumes() throws GestaltException;
FallbackColumnConfig fallbackColumnOptions() throws GestaltException; FallbackColumnConfig fallbackColumnOptions() throws GestaltException;

View File

@ -5,18 +5,10 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.Keys;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Delta;
import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import it.cavallium.rockserver.core.common.RocksDBRetryException;
import it.cavallium.rockserver.core.common.UpdateContext;
import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.config.ConfigParser; import it.cavallium.rockserver.core.config.ConfigParser;
import it.cavallium.rockserver.core.config.ConfigPrinter; import it.cavallium.rockserver.core.config.ConfigPrinter;
import it.cavallium.rockserver.core.config.DatabaseConfig; import it.cavallium.rockserver.core.config.DatabaseConfig;
@ -34,6 +26,7 @@ import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -43,11 +36,12 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.*; import org.rocksdb.*;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code; import org.rocksdb.Status.Code;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -420,28 +414,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
if (requestType instanceof RequestType.RequestNothing<?> if (requestType instanceof RequestType.RequestNothing<?>
&& !getColumn(columnId).hasBuckets() && !getColumn(columnId).hasBuckets()
&& transactionOrUpdateId == 0L) { && transactionOrUpdateId == 0L) {
ops.beginOp(); putBatch(arena, columnId, keys, values, PutBatchMode.WRITE_BATCH);
try { return List.of();
// Column id
var col = getColumn(columnId);
try (var wb = new WB(new WriteBatch())) {
var keyIt = keys.iterator();
var valusIt = values.iterator();
while (keyIt.hasNext()) {
var key = keyIt.next();
var value = valusIt.next();
put(arena, wb, col, 0, key, value, requestType);
}
wb.write(db.get());
}
return List.of();
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} finally {
ops.endOp();
}
} else { } else {
List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size()); List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size());
for (int i = 0; i < keys.size(); i++) { for (int i = 0; i < keys.size(); i++) {
@ -454,11 +428,82 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
} }
@Override
public void putBatch(Arena arena,
long columnId,
@NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException {
if (keys.size() != values.size()) {
throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size());
}
ops.beginOp();
try {
// Column id
var col = getColumn(columnId);
List<AutoCloseable> refs = new ArrayList<>();
try {
DBWriter writer = switch (mode) {
case WRITE_BATCH, WRITE_BATCH_NO_WAL -> {
var wb = new WB(db.get(), new WriteBatch(), mode == PutBatchMode.WRITE_BATCH_NO_WAL);
refs.add(wb);
yield wb;
}
case SST_INGESTION, SST_INGEST_BEHIND -> {
var envOptions = new EnvOptions();
refs.add(envOptions);
var compressionOptions = new CompressionOptions()
.setEnabled(true)
.setMaxDictBytes(32768)
.setZStdMaxTrainBytes(32768 * 4);
refs.add(compressionOptions);
var options = new Options()
.setCompressionOptions(compressionOptions)
.setCompressionType(CompressionType.ZSTD_COMPRESSION)
.setUnorderedWrite(true)
.setAllowIngestBehind(mode == PutBatchMode.SST_INGEST_BEHIND)
.setAllowConcurrentMemtableWrite(true);
refs.add(options);
var tempFile = Files.createTempFile("temp", ".sst");
var sstFileWriter = new SstFileWriter(envOptions, options);
var sstWriter = new SSTWriter(db.get(), col, tempFile, sstFileWriter, mode == PutBatchMode.SST_INGEST_BEHIND);
refs.add(sstWriter);
sstFileWriter.open(tempFile.toString());
yield sstWriter;
}
};
var keyIt = keys.iterator();
var valusIt = values.iterator();
while (keyIt.hasNext()) {
var key = keyIt.next();
var value = valusIt.next();
put(arena, writer, col, 0, key, value, RequestType.none());
}
writer.writePending();
} finally {
for (int i = refs.size() - 1; i >= 0; i--) {
try {
refs.get(i).close();
} catch (Exception ex) {
logger.error("Failed to close reference during batch write", ex);
}
}
}
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} finally {
ops.endOp();
}
}
/** /**
* @param txConsumer this can be called multiple times, if the optimistic transaction failed * @param txConsumer this can be called multiple times, if the optimistic transaction failed
*/ */
public <T extends TxOrWb, R> R wrapWithTransactionIfNeeded(@Nullable T tx, boolean needTransaction, public <T extends DBWriter, R> R wrapWithTransactionIfNeeded(@Nullable T tx, boolean needTransaction,
ExFunction<@Nullable T, R> txConsumer) throws Exception { ExFunction<@Nullable T, R> txConsumer) throws Exception {
if (needTransaction) { if (needTransaction) {
return ensureWrapWithTransaction(tx, txConsumer); return ensureWrapWithTransaction(tx, txConsumer);
} else { } else {
@ -470,8 +515,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
/** /**
* @param txConsumer this can be called multiple times, if the optimistic transaction failed * @param txConsumer this can be called multiple times, if the optimistic transaction failed
*/ */
public <T extends TxOrWb, R> R ensureWrapWithTransaction(@Nullable T tx, public <T extends DBWriter, R> R ensureWrapWithTransaction(@Nullable T tx,
ExFunction<@NotNull T, R> txConsumer) throws Exception { ExFunction<@NotNull T, R> txConsumer) throws Exception {
R result; R result;
if (tx == null) { if (tx == null) {
// Retry using a transaction: transactions are required to handle this kind of data // Retry using a transaction: transactions are required to handle this kind of data
@ -496,7 +541,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
private <U> U put(Arena arena, private <U> U put(Arena arena,
@Nullable TxOrWb optionalTxOrUpdate, @Nullable DBWriter optionalDbWriter,
ColumnInstance col, ColumnInstance col,
long updateId, long updateId,
@NotNull Keys keys, @NotNull Keys keys,
@ -510,55 +555,55 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
boolean needsTx = col.hasBuckets() boolean needsTx = col.hasBuckets()
|| requirePreviousValue || requirePreviousValue
|| requirePreviousPresence; || requirePreviousPresence;
if (optionalTxOrUpdate instanceof Tx tx && tx.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) { if (optionalDbWriter instanceof Tx tx && tx.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"You can't get the previous value or delta, when you are already updating that value"); "You can't get the previous value or delta, when you are already updating that value");
} }
if (updateId != 0L && !(optionalTxOrUpdate instanceof Tx)) { if (updateId != 0L && !(optionalDbWriter instanceof Tx)) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"Update id must be accompanied with a valid transaction"); "Update id must be accompanied with a valid transaction");
} }
if (col.hasBuckets() && (optionalTxOrUpdate instanceof WB)) { if (col.hasBuckets() && (optionalDbWriter != null && !(optionalDbWriter instanceof Tx))) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"Column with buckets don't support write batches"); "Column with buckets don't support write batches");
} }
return wrapWithTransactionIfNeeded(optionalTxOrUpdate, needsTx, tx -> { return wrapWithTransactionIfNeeded(optionalDbWriter, needsTx, dbWriter -> {
MemorySegment previousValue; MemorySegment previousValue;
MemorySegment calculatedKey = col.calculateKey(arena, keys.keys()); MemorySegment calculatedKey = col.calculateKey(arena, keys.keys());
if (updateId != 0L) { if (updateId != 0L) {
assert tx instanceof Tx; assert dbWriter instanceof Tx;
((Tx) tx).val().setSavePoint(); ((Tx) dbWriter).val().setSavePoint();
} }
if (col.hasBuckets()) { if (col.hasBuckets()) {
assert tx instanceof Tx; assert dbWriter instanceof Tx;
var bucketElementKeys = col.getBucketElementKeys(keys.keys()); var bucketElementKeys = col.getBucketElementKeys(keys.keys());
try (var readOptions = new ReadOptions()) { try (var readOptions = new ReadOptions()) {
var previousRawBucketByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); var previousRawBucketByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray); MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray);
var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col); var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col);
previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value)); previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value));
var k = Utils.toByteArray(calculatedKey); var k = Utils.toByteArray(calculatedKey);
var v = Utils.toByteArray(bucket.toSegment(arena)); var v = Utils.toByteArray(bucket.toSegment(arena));
((Tx) tx).val().put(col.cfh(), k, v); ((Tx) dbWriter).val().put(col.cfh(), k, v);
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e);
} }
} else { } else {
if (RequestType.requiresGettingPreviousValue(callback)) { if (RequestType.requiresGettingPreviousValue(callback)) {
assert tx instanceof Tx; assert dbWriter instanceof Tx;
try (var readOptions = new ReadOptions()) { try (var readOptions = new ReadOptions()) {
byte[] previousValueByteArray; byte[] previousValueByteArray;
previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray)); previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray));
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
} }
} else if (RequestType.requiresGettingPreviousPresence(callback)) { } else if (RequestType.requiresGettingPreviousPresence(callback)) {
// todo: in the future this should be replaced with just keyExists // todo: in the future this should be replaced with just keyExists
assert tx instanceof Tx; assert dbWriter instanceof Tx;
try (var readOptions = new ReadOptions()) { try (var readOptions = new ReadOptions()) {
byte[] previousValueByteArray; byte[] previousValueByteArray;
previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValueByteArray = ((Tx) dbWriter).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = previousValueByteArray != null ? MemorySegment.NULL : null; previousValue = previousValueByteArray != null ? MemorySegment.NULL : null;
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
@ -566,8 +611,9 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} else { } else {
previousValue = null; previousValue = null;
} }
switch (tx) { switch (dbWriter) {
case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
case SSTWriter sstWriter -> sstWriter.sstFileWriter().put(Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
case null -> { case null -> {
try (var w = new WriteOptions()) { try (var w = new WriteOptions()) {
@ -588,8 +634,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
if (updateId != 0L) { if (updateId != 0L) {
if (!closeTransaction(updateId, true)) { if (!closeTransaction(updateId, true)) {
((Tx) tx).val().rollbackToSavePoint(); ((Tx) dbWriter).val().rollbackToSavePoint();
((Tx) tx).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey)); ((Tx) dbWriter).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey));
throw new RocksDBRetryException(); throw new RocksDBRetryException();
} }
} }

View File

@ -0,0 +1,10 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import org.rocksdb.RocksDB;
public sealed interface DBWriter permits SSTWriter, Tx, WB {
/**
* Writes any pending kv pair to the db
*/
void writePending() throws it.cavallium.rockserver.core.common.RocksDBException;
}

View File

@ -260,6 +260,10 @@ public class RocksDBLoader {
options.setUseDirectIoForFlushAndCompaction(true); options.setUseDirectIoForFlushAndCompaction(true);
} }
options
.setAllowIngestBehind(databaseOptions.global().ingestBehind())
.setUnorderedWrite(databaseOptions.global().unorderedWrite());
return new OptionsWithCache(options, blockCache); return new OptionsWithCache(options, blockCache);
} catch (GestaltException e) { } catch (GestaltException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, e);

View File

@ -0,0 +1,37 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.common.RocksDBException;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.SstFileWriter;
import java.io.Closeable;
import java.nio.file.Path;
import java.util.List;
public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInstance col, Path path, SstFileWriter sstFileWriter, boolean ingestBehind) implements Closeable, DBWriter {
@Override
public void writePending() throws it.cavallium.rockserver.core.common.RocksDBException {
try {
sstFileWriter.finish();
try (var ingestOptions = new IngestExternalFileOptions()) {
ingestOptions
.setIngestBehind(ingestBehind)
.setAllowBlockingFlush(true)
.setMoveFiles(true)
.setAllowGlobalSeqNo(true)
.setWriteGlobalSeqno(false)
.setSnapshotConsistency(false);
db.ingestExternalFile(col.cfh(), List.of(path.toString()), ingestOptions);
}
} catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_WRITE_1, e);
}
}
@Override
public void close() {
sstFileWriter.close();
}
}

View File

@ -1,11 +1,11 @@
package it.cavallium.rockserver.core.impl.rocksdb; package it.cavallium.rockserver.core.impl.rocksdb;
import java.io.Closeable; import java.io.Closeable;
import org.rocksdb.AbstractNativeReference;
import org.rocksdb.Transaction; import org.rocksdb.Transaction;
public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs) public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs)
implements Closeable, TxOrWb { implements Closeable, DBWriter {
@Override @Override
public void close() { public void close() {

View File

@ -1,4 +0,0 @@
package it.cavallium.rockserver.core.impl.rocksdb;
public sealed interface TxOrWb permits Tx, WB {
}

View File

@ -8,16 +8,16 @@ import org.rocksdb.WriteOptions;
import java.io.Closeable; import java.io.Closeable;
public record WB(@NotNull WriteBatch wb) implements Closeable, TxOrWb { public record WB(RocksDB rocksDB, @NotNull WriteBatch wb, boolean disableWal) implements Closeable, DBWriter {
private static final boolean MIGRATE = Boolean.parseBoolean(System.getProperty("rocksdb.migrate", "false")); private static final boolean MIGRATE = Boolean.parseBoolean(System.getProperty("rocksdb.migrate", "false"));
@Override @Override
public void close() { public void close() {
wb.close(); wb.close();
} }
public void write(RocksDB rocksDB) throws RocksDBException { public void writePending() throws RocksDBException {
try (var w = new WriteOptions()) { try (var w = new WriteOptions()) {
if (MIGRATE) { if (disableWal || MIGRATE) {
w.setDisableWAL(true); w.setDisableWAL(true);
} }
rocksDB.write(w, wb); rocksDB.write(w, wb);

View File

@ -17,6 +17,7 @@ import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.PutBatchMode;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
import it.cavallium.rockserver.core.common.RequestType.RequestDelta; import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
@ -196,12 +197,17 @@ public class GrpcServer extends Server {
executor.execute(() -> { executor.execute(() -> {
try { try {
try (var arena = Arena.ofConfined()) { try (var arena = Arena.ofConfined()) {
api.putMulti(arena, api.putBatch(arena,
request.getTransactionOrUpdateId(),
request.getColumnId(), request.getColumnId(),
mapKeysKV(arena, request.getDataCount(), request::getData), mapKeysKV(arena, request.getDataCount(), request::getData),
mapValuesKV(arena, request.getDataCount(), request::getData), mapValuesKV(arena, request.getDataCount(), request::getData),
new RequestNothing<>() switch (request.getMode()) {
case WRITE_BATCH -> PutBatchMode.WRITE_BATCH;
case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL;
case SST_INGESTION -> PutBatchMode.SST_INGESTION;
case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND;
case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\"");
}
); );
} }
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());

View File

@ -29,6 +29,13 @@ enum Operation {
PREVIOUS_PRESENCE = 8; PREVIOUS_PRESENCE = 8;
} }
enum PutBatchMode {
WRITE_BATCH = 0;
WRITE_BATCH_NO_WAL = 1;
SST_INGESTION = 2;
SST_INGEST_BEHIND = 3;
}
message Delta { message Delta {
optional bytes previous = 1; optional bytes previous = 1;
optional bytes current = 2; optional bytes current = 2;
@ -65,7 +72,7 @@ message GetColumnIdResponse {int64 columnId = 1;}
message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;} message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;}
message PutBatchRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; repeated KV data = 3;} message PutBatchRequest {int64 columnId = 1; repeated KV data = 2; PutBatchMode mode = 3;}
message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;} message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;}
message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}} message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}}

View File

@ -6,6 +6,40 @@ database: {
spinning: false spinning: false
# Enable to require absolute consistency after a crash. False to use the PointInTime recovery strategy # Enable to require absolute consistency after a crash. False to use the PointInTime recovery strategy
absolute-consistency: true absolute-consistency: true
# Set this option to true during creation of database if you want to be able
# to ingest behind (call IngestExternalFile() skipping keys that already
# exist, rather than overwriting matching keys).
# Setting this option to true will affect 2 things:
# 1) Disable some internal optimizations around SST file compression
# 2) Reserve bottom-most level for ingested files only.
# 3) Note that num_levels should be >= 3 if this option is turned on.
# DEFAULT: false
ingest-behind: false
# ENABLE THIS ONLY WHEN DOING BULK WRITES, THIS IS UNSAFE TO USE IN NORMAL SCENARIOS
# Setting unorderedWrite() to true trades higher write throughput
# with relaxing the immutability guarantee of snapshots.
# This violates the repeatability one expects from ::Get from a snapshot,
# as well as ::MultiGet and Iterator's consistent-point-in-time view property.
# If the application cannot tolerate the relaxed guarantees,
# it can implement its own mechanisms to work around
# that and yet benefit from the higher throughput.
# Using TransactionDB with WRITE_PREPARED write policy and twoWriteQueues() true
# is one way to achieve immutable snapshots despite unordered_write.
# By default, i. e., when it is false, rocksdb does not advance the sequence
# number for new snapshots unless all the writes with
# lower sequence numbers are already finished.
# This provides the immutability that we except from snapshots.
# Moreover, since Iterator and MultiGet internally depend on snapshots,
# the snapshot immutability results into Iterator
# and MultiGet offering consistent-point-in-time view.
# If set to true, although Read-Your-Own-Write property is still provided,
# the snapshot immutability property is relaxed: the writes issued after
# the snapshot is obtained (with larger sequence numbers) will be still not
# visible to the reads from that snapshot, however, there still might be pending
# writes (with lower sequence number) that will change the state visible
# to the snapshot after they are landed to the memtable.
# DEFAULT: false
unordered-write: false
# Error checking # Error checking
checksum: true checksum: true
# Use direct I/O in RocksDB databases (Higher I/O read throughput but OS cache is not used, less swapping, less memory pressure) # Use direct I/O in RocksDB databases (Higher I/O read throughput but OS cache is not used, less swapping, less memory pressure)