Implement reactive sst batching

This commit is contained in:
Andrea Cavalli 2024-09-25 11:11:09 +02:00
parent bcd18e72a9
commit a11ae8cebc
21 changed files with 1203 additions and 441 deletions

View File

@ -135,6 +135,11 @@
<artifactId>grpc-stub</artifactId> <artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version> <version>${grpc.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
</dependency>
<dependency> <dependency>
<groupId>org.lz4</groupId> <groupId>org.lz4</groupId>

View File

@ -28,6 +28,7 @@ module rockserver.core {
requires io.netty.codec.http2; requires io.netty.codec.http2;
requires jdk.unsupported; requires jdk.unsupported;
requires io.netty.transport.classes.epoll; requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;

View File

@ -26,6 +26,7 @@ module rockserver.core {
requires io.netty.codec.http2; requires io.netty.codec.http2;
requires jdk.unsupported; requires jdk.unsupported;
requires io.netty.transport.classes.epoll; requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;

View File

@ -17,6 +17,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
@ -88,6 +89,10 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
@Override @Override
public <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) { public <R> CompletableFuture<R> requestAsync(RocksDBAPICommand<R> req) {
if (req instanceof RocksDBAPICommand.PutBatch putBatch) {
//noinspection unchecked
return (CompletableFuture<R>) this.putBatchAsync(putBatch.columnId(), putBatch.batchPublisher(), putBatch.mode());
}
return CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor); return CompletableFuture.supplyAsync(() -> req.handleSync(this), exeuctor);
} }
@ -112,12 +117,17 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
} }
@Override @Override
public void putBatch(Arena arena, public CompletableFuture<Void> putBatchAsync(long columnId,
long columnId, @NotNull Publisher<@NotNull KVBatch> batchPublisher,
@NotNull List<Keys> keys, @NotNull PutBatchMode mode) throws RocksDBException {
@NotNull List<@NotNull MemorySegment> values, return db.putBatchInternal(columnId, batchPublisher, mode);
}
@Override
public void putBatch(long columnId,
@NotNull Publisher<@NotNull KVBatch> batchPublisher,
@NotNull PutBatchMode mode) throws RocksDBException { @NotNull PutBatchMode mode) throws RocksDBException {
db.putBatch(arena, columnId, keys, values, mode); db.putBatch(columnId, batchPublisher, mode);
} }
@Override @Override

View File

@ -11,9 +11,10 @@ import com.google.protobuf.UnsafeByteOperations;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.stub.StreamObserver; import io.grpc.stub.*;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.KVBatch;
import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.PutBatchMode;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
@ -40,13 +41,18 @@ import java.lang.foreign.MemorySegment;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -187,20 +193,20 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
CompletableFuture<List<T>> responseObserver; CompletableFuture<List<T>> responseObserver;
if (requestType instanceof RequestType.RequestNothing<?> && transactionOrUpdateId == 0L) { if (requestType instanceof RequestType.RequestNothing<?> && transactionOrUpdateId == 0L) {
var putBatchRequestBuilder = PutBatchRequest.newBuilder() return putBatchAsync(columnId, subscriber -> {
.setColumnId(columnId) var sub = new Subscription() {
.setMode(it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH); @Override
public void request(long l) {
}
var it1 = allKeys.iterator(); @Override
var it2 = allValues.iterator(); public void cancel() {
while (it1.hasNext()) { }
var k = it1.next(); };
var v = it2.next(); subscriber.onSubscribe(sub);
putBatchRequestBuilder.addData(mapKV(k, v)); subscriber.onNext(new KVBatch(allKeys, allValues));
} }, PutBatchMode.WRITE_BATCH).thenApply(_ -> List.of());
return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null);
} }
var initialRequest = PutMultiRequest.newBuilder() var initialRequest = PutMultiRequest.newBuilder()
@ -261,36 +267,105 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
} }
@Override @Override
public CompletableFuture<Void> putBatchAsync(Arena arena, public CompletableFuture<Void> putBatchAsync(long columnId,
long columnId, @NotNull Publisher<@NotNull KVBatch> batchPublisher,
@NotNull List<@NotNull Keys> allKeys,
@NotNull List<@NotNull MemorySegment> allValues,
@NotNull PutBatchMode mode) throws RocksDBException { @NotNull PutBatchMode mode) throws RocksDBException {
var count = allKeys.size(); var cf = new CompletableFuture<Void>();
if (count != allValues.size()) { var responseobserver = new ClientResponseObserver<PutBatchRequest, Empty>() {
throw new IllegalArgumentException("Keys length is different than values length! " private ClientCallStreamObserver<PutBatchRequest> requestStream;
+ count + " != " + allValues.size()); private Subscription subscription;
}
var putBatchRequestBuilder = PutBatchRequest.newBuilder() @Override
.setColumnId(columnId) public void beforeStart(ClientCallStreamObserver<PutBatchRequest> requestStream) {
.setMode(switch (mode) { this.requestStream = requestStream;
case WRITE_BATCH -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH; // Set up manual flow control for the response stream. It feels backwards to configure the response
case WRITE_BATCH_NO_WAL -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH_NO_WAL; // stream's flow control using the request stream's observer, but this is the way it is.
case SST_INGESTION -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGESTION; requestStream.disableAutoRequestWithInitial(1);
case SST_INGEST_BEHIND -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGEST_BEHIND;
});
var it1 = allKeys.iterator(); var subscriber = new Subscriber<KVBatch>() {
var it2 = allValues.iterator(); private volatile boolean finalized;
while (it1.hasNext()) { @Override
var k = it1.next(); public void onSubscribe(Subscription subscription2) {
var v = it2.next(); subscription = subscription2;
putBatchRequestBuilder.addData(mapKV(k, v)); }
}
return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null); @Override
public void onNext(KVBatch batch) {
var request = PutBatchRequest.newBuilder();
request.setData(mapKVBatch(batch));
requestStream.onNext(request.build());
if (requestStream.isReady()) {
subscription.request(1);
}
}
@Override
public void onError(Throwable throwable) {
this.finalized = true;
requestStream.onError(throwable);
}
@Override
public void onComplete() {
this.finalized = true;
requestStream.onCompleted();
}
};
batchPublisher.subscribe(subscriber);
// Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
//
// Messages are serialized into a transport-specific transmit buffer. Depending on the size of this buffer,
// MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call
// request() to pull a buffered message from the client.
//
// Note: the onReadyHandler's invocation is serialized on the same thread pool as the incoming
// StreamObserver's onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent
// additional messages from being processed by the incoming StreamObserver. The onReadyHandler must return
// in a timely manner or else message processing throughput will suffer.
requestStream.setOnReadyHandler(new Runnable() {
@Override
public void run() {
// Start generating values from where we left off on a non-gRPC thread.
subscription.request(1);
}
});
}
@Override
public void onNext(Empty empty) {}
@Override
public void onError(Throwable throwable) {
cf.completeExceptionally(throwable);
}
@Override
public void onCompleted() {
cf.complete(null);
}
};
var requestStream = asyncStub.putBatch(responseobserver);
requestStream.onNext(PutBatchRequest.newBuilder()
.setInitialRequest(PutBatchInitialRequest.newBuilder()
.setColumnId(columnId)
.setMode(switch (mode) {
case WRITE_BATCH -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH;
case WRITE_BATCH_NO_WAL -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.WRITE_BATCH_NO_WAL;
case SST_INGESTION -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGESTION;
case SST_INGEST_BEHIND -> it.cavallium.rockserver.core.common.api.proto.PutBatchMode.SST_INGEST_BEHIND;
})
.build())
.build());
return cf;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -397,6 +472,33 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
return data != null ? MemorySegment.ofBuffer(data.asReadOnlyByteBuffer()) : null; return data != null ? MemorySegment.ofBuffer(data.asReadOnlyByteBuffer()) : null;
} }
private static it.cavallium.rockserver.core.common.api.proto.KVBatch mapKVBatch(@NotNull KVBatch kvBatch) {
return it.cavallium.rockserver.core.common.api.proto.KVBatch.newBuilder()
.addAllEntries(mapKVList(kvBatch.keys(), kvBatch.values()))
.build();
}
private static Iterable<KV> mapKVList(@NotNull List<Keys> keys, @NotNull List<MemorySegment> values) {
return new Iterable<>() {
@Override
public @NotNull Iterator<KV> iterator() {
var it1 = keys.iterator();
var it2 = values.iterator();
return new Iterator<>() {
@Override
public boolean hasNext() {
return it1.hasNext();
}
@Override
public KV next() {
return mapKV(it1.next(), it2.next());
}
};
}
};
}
private static KV mapKV(@NotNull Keys keys, @NotNull MemorySegment value) { private static KV mapKV(@NotNull Keys keys, @NotNull MemorySegment value) {
return KV.newBuilder() return KV.newBuilder()
.addAllKeys(mapKeys(keys)) .addAllKeys(mapKeys(keys))

View File

@ -0,0 +1,10 @@
package it.cavallium.rockserver.core.common;
import org.jetbrains.annotations.NotNull;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.List;
public record KVBatch(@NotNull List<@NotNull Keys> keys, @NotNull List<@NotNull MemorySegment> values) {
}

View File

@ -17,8 +17,10 @@ public sealed interface RocksDBAPICommand<R> {
/** /**
* Open a transaction * Open a transaction
* <p>
* Returns the transaction id
*
* @param timeoutMs timeout in milliseconds * @param timeoutMs timeout in milliseconds
* @return transaction id
*/ */
record OpenTransaction(long timeoutMs) implements RocksDBAPICommand<Long> { record OpenTransaction(long timeoutMs) implements RocksDBAPICommand<Long> {
@ -35,10 +37,11 @@ public sealed interface RocksDBAPICommand<R> {
} }
/** /**
* Close a transaction * Close a transaction
* <p>
* Returns true if committed, if false, you should try again
* *
* @param transactionId transaction id to close * @param transactionId transaction id to close
* @param commit true to commit the transaction, false to rollback it * @param commit true to commit the transaction, false to rollback it
* @return true if committed, if false, you should try again
*/ */
record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommand<Boolean> { record CloseTransaction(long transactionId, boolean commit) implements RocksDBAPICommand<Boolean> {
@ -74,9 +77,11 @@ public sealed interface RocksDBAPICommand<R> {
} }
/** /**
* Create a column * Create a column
* <p>
* Returns the column id
*
* @param name column name * @param name column name
* @param schema column key-value schema * @param schema column key-value schema
* @return column id
*/ */
record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommand<Long> { record CreateColumn(String name, @NotNull ColumnSchema schema) implements RocksDBAPICommand<Long> {
@ -111,8 +116,10 @@ public sealed interface RocksDBAPICommand<R> {
} }
/** /**
* Get column id by name * Get column id by name
* <p>
* Returns the column id
*
* @param name column name * @param name column name
* @return column id
*/ */
record GetColumnId(@NotNull String name) implements RocksDBAPICommand<Long> { record GetColumnId(@NotNull String name) implements RocksDBAPICommand<Long> {
@ -210,26 +217,23 @@ public sealed interface RocksDBAPICommand<R> {
} }
/** /**
* Put multiple elements into the specified positions * Put multiple elements into the specified positions
* @param arena arena
* @param columnId column id * @param columnId column id
* @param keys multiple lists of column keys * @param batchPublisher publisher of batches of keys and values
* @param values multiple values, or null if not needed
* @param mode put batch mode * @param mode put batch mode
*/ */
record PutBatch(Arena arena, long columnId, record PutBatch(long columnId,
@NotNull List<Keys> keys, @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) implements RocksDBAPICommand<Void> { @NotNull PutBatchMode mode) implements RocksDBAPICommand<Void> {
@Override @Override
public Void handleSync(RocksDBSyncAPI api) { public Void handleSync(RocksDBSyncAPI api) {
api.putBatch(arena, columnId, keys, values, mode); api.putBatch(columnId, batchPublisher, mode);
return null; return null;
} }
@Override @Override
public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) { public CompletionStage<Void> handleAsync(RocksDBAsyncAPI api) {
return api.putBatchAsync(arena, columnId, keys, values, mode); return api.putBatchAsync(columnId, batchPublisher, mode);
} }
@Override @Override
@ -237,13 +241,7 @@ public sealed interface RocksDBAPICommand<R> {
var sb = new StringBuilder("PUT_BATCH"); var sb = new StringBuilder("PUT_BATCH");
sb.append(" column:").append(columnId); sb.append(" column:").append(columnId);
sb.append(" mode:").append(mode); sb.append(" mode:").append(mode);
sb.append(" batch:["); sb.append(" batch:[...]");
for (int i = 0; i < keys.size(); i++) {
if (i > 0) sb.append(",");
sb.append(" keys:").append(keys.get(i));
sb.append(" value:").append(Utils.toPrettyString(values.get(i)));
}
sb.append("]");
return sb.toString(); return sb.toString();
} }
} }
@ -285,6 +283,9 @@ public sealed interface RocksDBAPICommand<R> {
} }
/** /**
* Open an iterator * Open an iterator
* <p>
* Returns the iterator id
*
* @param arena arena * @param arena arena
* @param transactionId transaction id, or 0 * @param transactionId transaction id, or 0
* @param columnId column id * @param columnId column id
@ -292,7 +293,6 @@ public sealed interface RocksDBAPICommand<R> {
* @param endKeysExclusive end keys, exclusive. Null means "the end" * @param endKeysExclusive end keys, exclusive. Null means "the end"
* @param reverse if true, seek in reverse direction * @param reverse if true, seek in reverse direction
* @param timeoutMs timeout in milliseconds * @param timeoutMs timeout in milliseconds
* @return iterator id
*/ */
record OpenIterator(Arena arena, record OpenIterator(Arena arena,
long transactionId, long transactionId,

View File

@ -76,12 +76,10 @@ public interface RocksDBAsyncAPI extends RocksDBAsyncAPIRequestHandler {
} }
/** See: {@link PutBatch}. */ /** See: {@link PutBatch}. */
default CompletableFuture<Void> putBatchAsync(Arena arena, default CompletableFuture<Void> putBatchAsync(long columnId,
long columnId, @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher,
@NotNull List<@NotNull Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) throws RocksDBException { @NotNull PutBatchMode mode) throws RocksDBException {
return requestAsync(new PutBatch(arena, columnId, keys, values, mode)); return requestAsync(new PutBatch(columnId, batchPublisher, mode));
} }
/** See: {@link Get}. */ /** See: {@link Get}. */

View File

@ -27,7 +27,11 @@ public class RocksDBException extends RuntimeException {
TX_NOT_FOUND, TX_NOT_FOUND,
KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR, KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR,
WRITE_BATCH_1, WRITE_BATCH_1,
SST_WRITE_1 SST_WRITE_1,
SST_WRITE_2,
SST_WRITE_3,
SST_WRITE_4,
SST_GET_SIZE_FAILED
} }
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) {
@ -35,7 +39,7 @@ public class RocksDBException extends RuntimeException {
} }
public static RocksDBException of(RocksDBErrorType errorUniqueId, Throwable ex) { public static RocksDBException of(RocksDBErrorType errorUniqueId, Throwable ex) {
if (ex instanceof RocksDBException e) { if (ex instanceof org.rocksdb.RocksDBException e) {
return new RocksDBException(errorUniqueId, e); return new RocksDBException(errorUniqueId, e);
} else { } else {
return new RocksDBException(errorUniqueId, ex); return new RocksDBException(errorUniqueId, ex);
@ -43,7 +47,7 @@ public class RocksDBException extends RuntimeException {
} }
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message, Throwable ex) { public static RocksDBException of(RocksDBErrorType errorUniqueId, String message, Throwable ex) {
if (ex instanceof RocksDBException e) { if (ex instanceof org.rocksdb.RocksDBException e) {
return new RocksDBException(errorUniqueId, message, e); return new RocksDBException(errorUniqueId, message, e);
} else { } else {
return new RocksDBException(errorUniqueId, message, ex); return new RocksDBException(errorUniqueId, message, ex);
@ -67,6 +71,7 @@ public class RocksDBException extends RuntimeException {
protected RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) { protected RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) {
this(errorUniqueId, ex.getMessage()); this(errorUniqueId, ex.getMessage());
super.initCause(ex);
} }
protected RocksDBException(RocksDBErrorType errorUniqueId, String message, org.rocksdb.RocksDBException ex) { protected RocksDBException(RocksDBErrorType errorUniqueId, String message, org.rocksdb.RocksDBException ex) {

View File

@ -75,12 +75,10 @@ public interface RocksDBSyncAPI extends RocksDBSyncAPIRequestHandler {
} }
/** See: {@link PutBatch}. */ /** See: {@link PutBatch}. */
default void putBatch(Arena arena, default void putBatch(long columnId,
long columnId, @NotNull org.reactivestreams.Publisher<@NotNull KVBatch> batchPublisher,
@NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) throws RocksDBException { @NotNull PutBatchMode mode) throws RocksDBException {
requestSync(new PutBatch(arena, columnId, keys, values, mode)); requestSync(new PutBatch(columnId, batchPublisher, mode));
} }
/** See: {@link Get}. */ /** See: {@link Get}. */

View File

@ -9,9 +9,7 @@ import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestPut; import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import it.cavallium.rockserver.core.config.ConfigParser; import it.cavallium.rockserver.core.config.*;
import it.cavallium.rockserver.core.config.ConfigPrinter;
import it.cavallium.rockserver.core.config.DatabaseConfig;
import it.cavallium.rockserver.core.impl.rocksdb.*; import it.cavallium.rockserver.core.impl.rocksdb.*;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
@ -26,20 +24,21 @@ import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays; import java.util.concurrent.CompletableFuture;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.rocksdb.*; import org.rocksdb.*;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code; import org.rocksdb.Status.Code;
@ -55,13 +54,16 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
private final Logger logger; private final Logger logger;
private final @Nullable Path path; private final @Nullable Path path;
private final TransactionalDB db; private final TransactionalDB db;
private final DBOptions dbOptions;
private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle; private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle;
private final NonBlockingHashMapLong<ColumnInstance> columns; private final NonBlockingHashMapLong<ColumnInstance> columns;
private final Map<String, ColumnFamilyOptions> columnsConifg;
private final ConcurrentMap<String, Long> columnNamesIndex; private final ConcurrentMap<String, Long> columnNamesIndex;
private final NonBlockingHashMapLong<Tx> txs; private final NonBlockingHashMapLong<Tx> txs;
private final NonBlockingHashMapLong<REntry<RocksIterator>> its; private final NonBlockingHashMapLong<REntry<RocksIterator>> its;
private final SafeShutdown ops; private final SafeShutdown ops;
private final Object columnEditLock = new Object(); private final Object columnEditLock = new Object();
private final DatabaseConfig config;
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException { public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException {
this.path = path; this.path = path;
@ -72,7 +74,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.columnNamesIndex = new ConcurrentHashMap<>(); this.columnNamesIndex = new ConcurrentHashMap<>();
this.ops = new SafeShutdown(); this.ops = new SafeShutdown();
DatabaseConfig config = ConfigParser.parse(embeddedConfigPath); DatabaseConfig config = ConfigParser.parse(embeddedConfigPath);
this.db = RocksDBLoader.load(path, config, logger); this.config = config;
var loadedDb = RocksDBLoader.load(path, config, logger);
this.db = loadedDb.db();
this.dbOptions = loadedDb.dbOptions();
this.columnsConifg = loadedDb.definitiveColumnFamilyOptionsMap();
var existingColumnSchemasColumnDescriptorOptional = db var existingColumnSchemasColumnDescriptorOptional = db
.getStartupColumns() .getStartupColumns()
.entrySet() .entrySet()
@ -411,91 +417,163 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
if (keys.size() != values.size()) { if (keys.size() != values.size()) {
throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size());
} }
if (requestType instanceof RequestType.RequestNothing<?> List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size());
&& !getColumn(columnId).hasBuckets() for (int i = 0; i < keys.size(); i++) {
&& transactionOrUpdateId == 0L) { var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType);
putBatch(arena, columnId, keys, values, PutBatchMode.WRITE_BATCH); if (responses != null) {
return List.of(); responses.add(result);
} else {
List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size());
for (int i = 0; i < keys.size(); i++) {
var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType);
if (responses != null) {
responses.add(result);
}
} }
return responses != null ? responses : List.of(); }
return responses != null ? responses : List.of();
}
public CompletableFuture<Void> putBatchInternal(long columnId,
@NotNull Publisher<@NotNull KVBatch> batchPublisher,
@NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException {
try {
var cf = new CompletableFuture<Void>();
batchPublisher.subscribe(new Subscriber<>() {
private Subscription subscription;
private ColumnInstance col;
private ArrayList<AutoCloseable> refs;
private DBWriter writer;
@Override
public void onSubscribe(Subscription subscription) {
ops.beginOp();
try {
// Column id
col = getColumn(columnId);
refs = new ArrayList<>();
writer = switch (mode) {
case WRITE_BATCH, WRITE_BATCH_NO_WAL -> {
var wb = new WB(db.get(), new WriteBatch(), mode == PutBatchMode.WRITE_BATCH_NO_WAL);
refs.add(wb);
yield wb;
}
case SST_INGESTION, SST_INGEST_BEHIND -> {
var sstWriter = getSSTWriter(columnId, null, null, true, mode == PutBatchMode.SST_INGEST_BEHIND);
refs.add(sstWriter);
yield sstWriter;
}
};
} catch (Throwable ex) {
doFinally();
throw ex;
}
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(KVBatch kvBatch) {
var keyIt = kvBatch.keys().iterator();
var valueIt = kvBatch.values().iterator();
try (var arena = Arena.ofConfined()) {
while (keyIt.hasNext()) {
var key = keyIt.next();
var value = valueIt.next();
put(arena, writer, col, 0, key, value, RequestType.none());
}
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
doFinally();
throw ex;
} catch (Exception ex) {
doFinally();
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
cf.completeExceptionally(throwable);
doFinally();
}
@Override
public void onComplete() {
try {
try {
writer.writePending();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return;
}
cf.complete(null);
} finally {
doFinally();
}
}
private void doFinally() {
for (int i = refs.size() - 1; i >= 0; i--) {
try {
var c = refs.get(i);
if (c instanceof AbstractImmutableNativeReference fr) {
if (fr.isOwningHandle()) {
c.close();
}
} else {
c.close();
}
} catch (Exception ex) {
logger.error("Failed to close reference during batch write", ex);
}
}
ops.endOp();
}
});
return cf;
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} }
} }
@VisibleForTesting
public SSTWriter getSSTWriter(long colId,
@Nullable GlobalDatabaseConfig globalDatabaseConfigOverride,
@Nullable FallbackColumnConfig columnConfigOverride,
boolean forceNoOptions,
boolean ingestBehind) throws it.cavallium.rockserver.core.common.RocksDBException {
try {
var col = getColumn(colId);
ColumnFamilyOptions columnConifg;
RocksDBObjects refs;
if (!forceNoOptions) {
if (columnConfigOverride != null) {
refs = new RocksDBObjects();
columnConifg = RocksDBLoader.getColumnOptions(globalDatabaseConfigOverride, columnConfigOverride, logger, refs, path == null, null);
} else {
columnConifg = columnsConifg.get(new String(col.cfh().getName(), StandardCharsets.UTF_8));
refs = null;
}
} else {
columnConifg = null;
refs = null;
}
return SSTWriter.open(db, col, columnConifg, forceNoOptions, ingestBehind, refs);
} catch (IOException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_2, ex);
} catch (RocksDBException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.SST_WRITE_3, ex);
}
}
@Override @Override
public void putBatch(Arena arena, public void putBatch(long columnId,
long columnId, @NotNull Publisher<@NotNull KVBatch> batchPublisher,
@NotNull List<Keys> keys,
@NotNull List<@NotNull MemorySegment> values,
@NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException { @NotNull PutBatchMode mode) throws it.cavallium.rockserver.core.common.RocksDBException {
if (keys.size() != values.size()) {
throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size());
}
ops.beginOp();
try { try {
// Column id putBatchInternal(columnId, batchPublisher, mode).get();
var col = getColumn(columnId);
List<AutoCloseable> refs = new ArrayList<>();
try {
DBWriter writer = switch (mode) {
case WRITE_BATCH, WRITE_BATCH_NO_WAL -> {
var wb = new WB(db.get(), new WriteBatch(), mode == PutBatchMode.WRITE_BATCH_NO_WAL);
refs.add(wb);
yield wb;
}
case SST_INGESTION, SST_INGEST_BEHIND -> {
var envOptions = new EnvOptions();
refs.add(envOptions);
var compressionOptions = new CompressionOptions()
.setEnabled(true)
.setMaxDictBytes(32768)
.setZStdMaxTrainBytes(32768 * 4);
refs.add(compressionOptions);
var options = new Options()
.setCompressionOptions(compressionOptions)
.setCompressionType(CompressionType.ZSTD_COMPRESSION)
.setUnorderedWrite(true)
.setAllowIngestBehind(mode == PutBatchMode.SST_INGEST_BEHIND)
.setAllowConcurrentMemtableWrite(true);
refs.add(options);
var tempFile = Files.createTempFile("temp", ".sst");
var sstFileWriter = new SstFileWriter(envOptions, options);
var sstWriter = new SSTWriter(db.get(), col, tempFile, sstFileWriter, mode == PutBatchMode.SST_INGEST_BEHIND);
refs.add(sstWriter);
sstFileWriter.open(tempFile.toString());
yield sstWriter;
}
};
var keyIt = keys.iterator();
var valusIt = values.iterator();
while (keyIt.hasNext()) {
var key = keyIt.next();
var value = valusIt.next();
put(arena, writer, col, 0, key, value, RequestType.none());
}
writer.writePending();
} finally {
for (int i = refs.size() - 1; i >= 0; i--) {
try {
refs.get(i).close();
} catch (Exception ex) {
logger.error("Failed to close reference during batch write", ex);
}
}
}
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) { } catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex; throw ex;
} catch (Exception ex) { } catch (Exception ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} finally {
ops.endOp();
} }
} }
@ -613,7 +691,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
switch (dbWriter) { switch (dbWriter) {
case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
case SSTWriter sstWriter -> sstWriter.sstFileWriter().put(Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case SSTWriter sstWriter -> {
var keyBB = calculatedKey.asByteBuffer();
ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer();
sstWriter.put(keyBB, valueBB);
}
case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
case null -> { case null -> {
try (var w = new WriteOptions()) { try (var w = new WriteOptions()) {

View File

@ -5,14 +5,8 @@ import it.cavallium.rockserver.core.config.*;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.*;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SequencedMap;
import java.util.logging.Level;
import org.github.gestalt.config.exceptions.GestaltException; import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -90,8 +84,207 @@ public class RocksDBLoader {
throw new RuntimeException("rocksdb was not found inside JAR."); throw new RuntimeException("rocksdb was not found inside JAR.");
} }
public record LoadedDb(TransactionalDB db, DBOptions dbOptions,
Map<String, ColumnFamilyOptions> definitiveColumnFamilyOptionsMap) {}
public static TransactionalDB load(@Nullable Path path, DatabaseConfig config, Logger logger) { public static ColumnFamilyOptions getColumnOptions(
GlobalDatabaseConfig globalDatabaseConfig,
FallbackColumnConfig columnOptions, Logger logger,
RocksDBObjects refs,
boolean inMemory,
@Nullable Cache cache) {
try {
var columnFamilyOptions = new ColumnFamilyOptions();
refs.add(columnFamilyOptions);
//noinspection ConstantConditions
if (columnOptions.memtableMemoryBudgetBytes() != null) {
// about 512MB of ram will be used for level style compaction
columnFamilyOptions.optimizeLevelStyleCompaction(Optional.ofNullable(columnOptions.memtableMemoryBudgetBytes())
.map(DataSize::longValue)
.orElse(DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET));
}
if (isDisableAutoCompactions()) {
columnFamilyOptions.setDisableAutoCompactions(true);
}
try {
columnFamilyOptions.setPrepopulateBlobCache(PrepopulateBlobCache.PREPOPULATE_BLOB_FLUSH_ONLY);
} catch (Throwable ex) {
logger.error("Failed to set prepopulate blob cache", ex);
}
// This option is not supported with multiple db paths
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
boolean dynamicLevelBytes = (globalDatabaseConfig.volumes() == null || globalDatabaseConfig.volumes().length <= 1)
&& !globalDatabaseConfig.ingestBehind();
if (dynamicLevelBytes) {
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true);
}
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions
.setTargetFileSizeBase(256 * SizeUnit.MB)
.setMaxBytesForLevelBase(SizeUnit.GB);
if (isDisableAutoCompactions()) {
columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1);
} else if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
// ArangoDB uses a value of 2: https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// Higher values speed up writes, but slow down reads
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
}
if (isDisableSlowdown()) {
columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1);
columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE);
columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE);
columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE);
}
{
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0SlowdownWritesTrigger(20);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0StopWritesTrigger(36);
}
if (columnOptions.levels().length > 0) {
columnFamilyOptions.setNumLevels(columnOptions.levels().length);
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels()[0], refs);
columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType);
columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions);
var lastLevelOptions = getRocksLevelOptions(columnOptions
.levels()[columnOptions.levels().length - 1], refs);
columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType);
columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions);
List<CompressionType> compressionPerLevel = new ArrayList<>();
for (ColumnLevelConfig columnLevelConfig : columnOptions.levels()) {
CompressionType compression = columnLevelConfig.compression();
compressionPerLevel.add(compression);
}
columnFamilyOptions.setCompressionPerLevel(compressionPerLevel);
} else {
columnFamilyOptions.setNumLevels(7);
List<CompressionType> compressionTypes = new ArrayList<>(7);
for (int i = 0; i < 7; i++) {
if (i < 2) {
compressionTypes.add(CompressionType.NO_COMPRESSION);
} else {
compressionTypes.add(CompressionType.LZ4_COMPRESSION);
}
}
columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
var compressionOptions = new CompressionOptions()
.setEnabled(true)
.setMaxDictBytes(Math.toIntExact(32 * SizeUnit.KB));
refs.add(compressionOptions);
setZstdCompressionOptions(compressionOptions);
columnFamilyOptions.setBottommostCompressionOptions(compressionOptions);
columnFamilyOptions.setCompressionPerLevel(compressionTypes);
}
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB);
}
Optional.ofNullable(columnOptions.writeBufferSize())
.map(DataSize::longValue)
.ifPresent(columnFamilyOptions::setWriteBufferSize);
columnFamilyOptions.setMaxWriteBufferNumberToMaintain(1);
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setVerifyCompression(false);
}
// If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1)
// If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys
BloomFilterConfig filter = null;
BloomFilterConfig bloomFilterConfig = columnOptions.bloomFilter();
if (bloomFilterConfig != null) filter = bloomFilterConfig;
if (filter == null) {
if (inMemory) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases");
}
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(null);
}
} else {
final BloomFilter bloomFilter = new BloomFilter(filter.bitsPerKey());
refs.add(bloomFilter);
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(bloomFilter);
}
}
boolean cacheIndexAndFilterBlocks = !inMemory && Optional.ofNullable(columnOptions.cacheIndexAndFilterBlocks())
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.orElse(true);
if (globalDatabaseConfig.spinning()) {
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMinWriteBufferNumberToMerge(3);
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxWriteBufferNumber(4);
}
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockHashTableUtilRatio(0.75)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinTopLevelIndexAndFilter(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinL0FilterAndIndexBlocksInCache(!inMemory)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
// Enabling partition filters increase the reads by 2x
.setPartitionFilters(Optional.ofNullable(columnOptions.partitionFilters()).orElse(false))
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setIndexType(inMemory ? IndexType.kHashSearch : Optional.ofNullable(columnOptions.partitionFilters()).orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch)
.setChecksumType(inMemory ? ChecksumType.kNoChecksum : ChecksumType.kXXH3)
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
.setBlockSize(inMemory ? 4 * SizeUnit.KB : Optional.ofNullable(columnOptions.blockSize())
.map(DataSize::longValue)
.orElse((globalDatabaseConfig.spinning() ? 128 : 16) * SizeUnit.KB))
.setBlockCache(cache)
.setNoBlockCache(cache == null);
}
if (inMemory) {
columnFamilyOptions.useCappedPrefixExtractor(4);
tableOptions.setBlockRestartInterval(4);
}
columnFamilyOptions.setTableFormatConfig(tableOptions);
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
BloomFilterConfig bloomFilterOptions = columnOptions.bloomFilter();
if (bloomFilterOptions != null) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
boolean optimizeForHits = globalDatabaseConfig.spinning();
Boolean value = bloomFilterOptions.optimizeForHits();
if (value != null) optimizeForHits = value;
columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits);
}
return columnFamilyOptions;
} catch (GestaltException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, ex);
}
}
private static void setZstdCompressionOptions(CompressionOptions compressionOptions) {
// https://rocksdb.org/blog/2021/05/31/dictionary-compression.html#:~:text=(zstd%20only,it%20to%20100x
compressionOptions
.setZStdMaxTrainBytes(compressionOptions.maxDictBytes() * 100);
}
public static LoadedDb load(@Nullable Path path, DatabaseConfig config, Logger logger) {
var refs = new RocksDBObjects(); var refs = new RocksDBObjects();
// Get databases directory path // Get databases directory path
Path definitiveDbPath; Path definitiveDbPath;
@ -132,7 +325,9 @@ public class RocksDBLoader {
refs.add(options); refs.add(options);
options.setParanoidChecks(PARANOID_CHECKS); options.setParanoidChecks(PARANOID_CHECKS);
options.setSkipCheckingSstFileSizesOnDbOpen(true); options.setSkipCheckingSstFileSizesOnDbOpen(true);
options.setEnablePipelinedWrite(true); if (!databaseOptions.global().unorderedWrite()) {
options.setEnablePipelinedWrite(true);
}
var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1")); var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1"));
if (maxSubCompactions > 0) { if (maxSubCompactions > 0) {
options.setMaxSubcompactions(maxSubCompactions); options.setMaxSubcompactions(maxSubCompactions);
@ -301,11 +496,12 @@ public class RocksDBLoader {
.toList(); .toList();
} }
private static TransactionalDB loadDb(@Nullable Path path, private static LoadedDb loadDb(@Nullable Path path,
@NotNull Path definitiveDbPath, @NotNull Path definitiveDbPath,
DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) { DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) {
var inMemory = path == null; var inMemory = path == null;
var rocksdbOptions = optionsWithCache.options(); var rocksdbOptions = optionsWithCache.options();
Map<String, ColumnFamilyOptions> definitiveColumnFamilyOptionsMap = new HashMap<>();
try { try {
List<DbPathRecord> volumeConfigs = getVolumeConfigs(definitiveDbPath, databaseOptions); List<DbPathRecord> volumeConfigs = getVolumeConfigs(definitiveDbPath, databaseOptions);
List<ColumnFamilyDescriptor> descriptors = new ArrayList<>(); List<ColumnFamilyDescriptor> descriptors = new ArrayList<>();
@ -359,199 +555,12 @@ public class RocksDBLoader {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name); throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name);
} }
var columnFamilyOptions = new ColumnFamilyOptions(); var columnFamilyOptions = getColumnOptions(databaseOptions.global(), columnOptions,
logger, refs, path == null, optionsWithCache.standardCache());
refs.add(columnFamilyOptions); refs.add(columnFamilyOptions);
//noinspection ConstantConditions
if (columnOptions.memtableMemoryBudgetBytes() != null) {
// about 512MB of ram will be used for level style compaction
columnFamilyOptions.optimizeLevelStyleCompaction(Optional.ofNullable(columnOptions.memtableMemoryBudgetBytes())
.map(DataSize::longValue)
.orElse(DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET));
}
if (isDisableAutoCompactions()) {
columnFamilyOptions.setDisableAutoCompactions(true);
}
try {
columnFamilyOptions.setPrepopulateBlobCache(PrepopulateBlobCache.PREPOPULATE_BLOB_FLUSH_ONLY);
} catch (Throwable ex) {
logger.error("Failed to set prepopulate blob cache", ex);
}
// This option is not supported with multiple db paths
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
boolean dynamicLevelBytes = volumeConfigs.size() <= 1;
if (dynamicLevelBytes) {
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true);
columnFamilyOptions.setMaxBytesForLevelBase(10 * SizeUnit.GB);
columnFamilyOptions.setMaxBytesForLevelMultiplier(10);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setMaxBytesForLevelMultiplier(10);
}
if (isDisableAutoCompactions()) {
columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1);
} else if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
// ArangoDB uses a value of 2: https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// Higher values speed up writes, but slow down reads
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
}
if (isDisableSlowdown()) {
columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1);
columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE);
columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE);
columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE);
}
{
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0SlowdownWritesTrigger(20);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0StopWritesTrigger(36);
}
if (columnOptions.levels().length > 0) {
columnFamilyOptions.setNumLevels(columnOptions.levels().length);
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels()[0], refs);
columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType);
columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions);
var lastLevelOptions = getRocksLevelOptions(columnOptions
.levels()[columnOptions.levels().length - 1], refs);
columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType);
columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions);
List<CompressionType> compressionPerLevel = new ArrayList<>();
for (ColumnLevelConfig columnLevelConfig : columnOptions.levels()) {
CompressionType compression = columnLevelConfig.compression();
compressionPerLevel.add(compression);
}
columnFamilyOptions.setCompressionPerLevel(compressionPerLevel);
} else {
columnFamilyOptions.setNumLevels(7);
List<CompressionType> compressionTypes = new ArrayList<>(7);
for (int i = 0; i < 7; i++) {
if (i < 2) {
compressionTypes.add(CompressionType.NO_COMPRESSION);
} else {
compressionTypes.add(CompressionType.LZ4_COMPRESSION);
}
}
columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
var compressionOptions = new CompressionOptions()
.setEnabled(true)
.setMaxDictBytes(32768);
refs.add(compressionOptions);
columnFamilyOptions.setBottommostCompressionOptions(compressionOptions);
columnFamilyOptions.setCompressionPerLevel(compressionTypes);
}
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB);
}
Optional.ofNullable(columnOptions.writeBufferSize())
.map(DataSize::longValue)
.ifPresent(columnFamilyOptions::setWriteBufferSize);
columnFamilyOptions.setMaxWriteBufferNumberToMaintain(1);
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setVerifyCompression(false);
}
// If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1)
// If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys
BloomFilterConfig filter = null;
BloomFilterConfig bloomFilterConfig = columnOptions.bloomFilter();
if (bloomFilterConfig != null) filter = bloomFilterConfig;
if (filter == null) {
if (path == null) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases");
}
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(null);
}
} else {
final BloomFilter bloomFilter = new BloomFilter(filter.bitsPerKey());
refs.add(bloomFilter);
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(bloomFilter);
}
}
boolean cacheIndexAndFilterBlocks = path != null && Optional.ofNullable(columnOptions.cacheIndexAndFilterBlocks())
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.orElse(true);
if (databaseOptions.global().spinning()) {
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// cacheIndexAndFilterBlocks = true;
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMinWriteBufferNumberToMerge(3);
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxWriteBufferNumber(4);
}
}
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockHashTableUtilRatio(0.75)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinTopLevelIndexAndFilter(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinL0FilterAndIndexBlocksInCache(path != null)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
// Enabling partition filters increase the reads by 2x
.setPartitionFilters(Optional.ofNullable(columnOptions.partitionFilters()).orElse(false))
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setIndexType(path == null ? IndexType.kHashSearch : Optional.ofNullable(columnOptions.partitionFilters()).orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch)
.setChecksumType(path == null ? ChecksumType.kNoChecksum : ChecksumType.kXXH3)
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
.setBlockSize(path == null ? 4096 : Optional.ofNullable(columnOptions.blockSize()).map(DataSize::longValue).orElse((databaseOptions.global().spinning() ? 128 : 16) * 1024L))
.setBlockCache(optionsWithCache.standardCache())
.setNoBlockCache(optionsWithCache.standardCache() == null);
}
if (path == null) {
columnFamilyOptions.useCappedPrefixExtractor(4);
tableOptions.setBlockRestartInterval(4);
}
columnFamilyOptions.setTableFormatConfig(tableOptions);
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
BloomFilterConfig bloomFilterOptions = columnOptions.bloomFilter();
if (bloomFilterOptions != null) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
boolean optimizeForHits = databaseOptions.global().spinning();
Boolean value = bloomFilterOptions.optimizeForHits();
if (value != null) optimizeForHits = value;
columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits);
}
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
// // Increasing this value can reduce the frequency of compaction and reduce write amplification,
// // but it will also cause old data to be unable to be cleaned up in time, thus increasing read amplification.
// // This parameter is not easy to adjust. It is generally not recommended to set it above 256MB.
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setTargetFileSizeBase(64 * SizeUnit.MB);
// // For each level up, the threshold is multiplied by the factor target_file_size_multiplier
// // (but the default value is 1, which means that the maximum sstable of each level is the same).
columnFamilyOptions.setTargetFileSizeMultiplier(2);
}
descriptors.add(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), columnFamilyOptions)); descriptors.add(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), columnFamilyOptions));
definitiveColumnFamilyOptionsMap.put(name, columnFamilyOptions);
} }
var handles = new ArrayList<ColumnFamilyHandle>(); var handles = new ArrayList<ColumnFamilyHandle>();
@ -587,7 +596,7 @@ public class RocksDBLoader {
var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions); var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions);
var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig); var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig);
return TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks); return new LoadedDb(TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks), rocksdbOptions, definitiveColumnFamilyOptionsMap);
} catch (IOException | RocksDBException ex) { } catch (IOException | RocksDBException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex); throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
} catch (GestaltException e) { } catch (GestaltException e) {
@ -612,8 +621,9 @@ public class RocksDBLoader {
var compressionOptions = new CompressionOptions(); var compressionOptions = new CompressionOptions();
refs.add(compressionOptions); refs.add(compressionOptions);
if (compressionType != CompressionType.NO_COMPRESSION) { if (compressionType != CompressionType.NO_COMPRESSION) {
compressionOptions.setEnabled(true); compressionOptions.setEnabled(true)
compressionOptions.setMaxDictBytes(Math.toIntExact(levelOptions.maxDictBytes().longValue())); .setMaxDictBytes(Math.toIntExact(levelOptions.maxDictBytes().longValue()));
setZstdCompressionOptions(compressionOptions);
} else { } else {
compressionOptions.setEnabled(false); compressionOptions.setEnabled(false);
} }

View File

@ -1,37 +1,151 @@
package it.cavallium.rockserver.core.impl.rocksdb; package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.common.RocksDBException; import it.cavallium.rockserver.core.common.RocksDBException;
import org.rocksdb.IngestExternalFileOptions; import it.cavallium.rockserver.core.impl.ColumnInstance;
import org.rocksdb.RocksDB; import org.rocksdb.*;
import org.rocksdb.SstFileWriter; import org.rocksdb.util.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.UUID;
public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInstance col, Path path, SstFileWriter sstFileWriter, boolean ingestBehind) implements Closeable, DBWriter { public record SSTWriter(RocksDB db, it.cavallium.rockserver.core.impl.ColumnInstance col, Path path, SstFileWriter sstFileWriter, boolean ingestBehind,
RocksDBObjects refs) implements Closeable, DBWriter {
private static final Logger LOG = LoggerFactory.getLogger(SSTWriter.class);
public static SSTWriter open(TransactionalDB db, ColumnInstance col, ColumnFamilyOptions columnConifg, boolean forceNoOptions, boolean ingestBehind, RocksDBObjects refs) throws IOException, org.rocksdb.RocksDBException {
if (refs == null) {
refs = new RocksDBObjects();
}
var envOptions = new EnvOptions();
if (!forceNoOptions) {
envOptions
.setAllowFallocate(true)
.setWritableFileMaxBufferSize(10 * SizeUnit.MB)
.setRandomAccessMaxBufferSize(10 * SizeUnit.MB)
.setCompactionReadaheadSize(2 * SizeUnit.MB)
.setBytesPerSync(10 * SizeUnit.MB);
}
refs.add(envOptions);
var options = new Options();
refs.add(options);
if (!forceNoOptions) {
options
.setDisableAutoCompactions(true)
.setManualWalFlush(true)
.setUseDirectIoForFlushAndCompaction(true)
.setBytesPerSync(5 * SizeUnit.MB)
.setParanoidChecks(false)
.setSkipCheckingSstFileSizesOnDbOpen(true)
.setForceConsistencyChecks(false)
.setParanoidFileChecks(false);
if (columnConifg != null) {
options
.setCompressionType(columnConifg.compressionType())
.setCompressionOptions(columnConifg.compressionOptions())
.setBottommostCompressionType(columnConifg.bottommostCompressionType())
.setBottommostCompressionOptions(columnConifg.bottommostCompressionOptions())
.setCompressionPerLevel(columnConifg.compressionPerLevel())
.setNumLevels(columnConifg.numLevels())
.setTableFormatConfig(columnConifg.tableFormatConfig())
.setMemTableConfig(columnConifg.memTableConfig())
.setTargetFileSizeBase(columnConifg.targetFileSizeBase())
.setTargetFileSizeMultiplier(columnConifg.targetFileSizeMultiplier())
.setMaxOpenFiles(-1);
}
}
Path tempFile;
try {
var tempDir = Path.of(db.getPath()).resolve(".temp_sst");
if (Files.notExists(tempDir)) {
Files.createDirectories(tempDir);
}
tempFile = tempDir.resolve(UUID.randomUUID() + ".sst");
} catch (IOException ex) {
refs.close();
throw ex;
}
var sstFileWriter = new SstFileWriter(envOptions, options);
var sstWriter = new SSTWriter(db.get(), col, tempFile, sstFileWriter, ingestBehind, refs);
sstFileWriter.open(tempFile.toString());
return sstWriter;
}
public void put(byte[] key, byte[] value) throws RocksDBException {
try {
checkOwningHandle();
sstFileWriter.put(key, value);
} catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e);
}
}
public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
try {
checkOwningHandle();
sstFileWriter.put(key, value);
} catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, e);
}
}
@Override @Override
public void writePending() throws it.cavallium.rockserver.core.common.RocksDBException { public void writePending() throws it.cavallium.rockserver.core.common.RocksDBException {
try { try {
sstFileWriter.finish(); try (this) {
try (var ingestOptions = new IngestExternalFileOptions()) { checkOwningHandle();
ingestOptions sstFileWriter.finish();
.setIngestBehind(ingestBehind) try (var ingestOptions = new IngestExternalFileOptions()) {
.setAllowBlockingFlush(true) ingestOptions
.setMoveFiles(true) .setIngestBehind(ingestBehind)
.setAllowGlobalSeqNo(true) .setAllowBlockingFlush(true)
.setWriteGlobalSeqno(false) .setMoveFiles(true)
.setSnapshotConsistency(false); .setAllowGlobalSeqNo(true)
db.ingestExternalFile(col.cfh(), List.of(path.toString()), ingestOptions); .setWriteGlobalSeqno(false)
.setSnapshotConsistency(false);
db.ingestExternalFile(col.cfh(), List.of(path.toString()), ingestOptions);
}
} }
} catch (org.rocksdb.RocksDBException e) { } catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_WRITE_1, e); throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_WRITE_1, e);
} }
} }
private void checkOwningHandle() {
if (!sstFileWriter.isOwningHandle()) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_WRITE_4, "SST writer is closed");
}
}
@Override @Override
public void close() { public void close() {
sstFileWriter.close(); if (sstFileWriter.isOwningHandle()) {
sstFileWriter.close();
try {
Files.deleteIfExists(path);
} catch (IOException e) {
LOG.error("Failed to delete a file: {}", path, e);
}
}
refs.close();
}
public long fileSize() {
if (!sstFileWriter.isOwningHandle()) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_GET_SIZE_FAILED, "The SSTWriter is closed");
}
try {
return sstFileWriter.fileSize();
} catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.SST_GET_SIZE_FAILED, e);
}
} }
} }

View File

@ -2,6 +2,7 @@ package it.cavallium.rockserver.core.impl.rocksdb;
import java.io.Closeable; import java.io.Closeable;
import it.cavallium.rockserver.core.common.RocksDBException;
import org.rocksdb.Transaction; import org.rocksdb.Transaction;
public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs) public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs)
@ -12,4 +13,9 @@ public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects obj
val.close(); val.close();
objs.close(); objs.close();
} }
@Override
public void writePending() throws RocksDBException {
}
} }

View File

@ -4,7 +4,9 @@ import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Empty; import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -17,6 +19,7 @@ import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.KVBatch;
import it.cavallium.rockserver.core.common.PutBatchMode; import it.cavallium.rockserver.core.common.PutBatchMode;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
@ -42,11 +45,14 @@ import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.rocksdb.util.SizeUnit; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -79,7 +85,7 @@ public class GrpcServer extends Server {
.directExecutor() .directExecutor()
.channelType(channelType) .channelType(channelType)
.withChildOption(ChannelOption.SO_KEEPALIVE, false) .withChildOption(ChannelOption.SO_KEEPALIVE, false)
.maxInboundMessageSize(Math.toIntExact(128 * SizeUnit.MB)) .maxInboundMessageSize(128 * 1024 * 1024)
.addService(grpc) .addService(grpc)
.build(); .build();
server.start(); server.start();
@ -88,9 +94,11 @@ public class GrpcServer extends Server {
private final class GrpcServerImpl extends RocksDBServiceImplBase { private final class GrpcServerImpl extends RocksDBServiceImplBase {
private final RocksDBAsyncAPI asyncApi;
private final RocksDBSyncAPI api; private final RocksDBSyncAPI api;
public GrpcServerImpl(RocksDBConnection client) { public GrpcServerImpl(RocksDBConnection client) {
this.asyncApi = client.getAsyncApi();
this.api = client.getSyncApi(); this.api = client.getSyncApi();
} }
@ -105,7 +113,7 @@ public class GrpcServer extends Server {
responseObserver.onNext(OpenTransactionResponse.newBuilder().setTransactionId(txId).build()); responseObserver.onNext(OpenTransactionResponse.newBuilder().setTransactionId(txId).build());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -120,7 +128,7 @@ public class GrpcServer extends Server {
responseObserver.onNext(response); responseObserver.onNext(response);
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -133,7 +141,7 @@ public class GrpcServer extends Server {
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -141,19 +149,27 @@ public class GrpcServer extends Server {
@Override @Override
public void createColumn(CreateColumnRequest request, StreamObserver<CreateColumnResponse> responseObserver) { public void createColumn(CreateColumnRequest request, StreamObserver<CreateColumnResponse> responseObserver) {
executor.execute(() -> { executor.execute(() -> {
var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema())); try {
var response = CreateColumnResponse.newBuilder().setColumnId(colId).build(); var colId = api.createColumn(request.getName(), mapColumnSchema(request.getSchema()));
responseObserver.onNext(response); var response = CreateColumnResponse.newBuilder().setColumnId(colId).build();
responseObserver.onCompleted(); responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
}); });
} }
@Override @Override
public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) { public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> { executor.execute(() -> {
api.deleteColumn(request.getColumnId()); try {
responseObserver.onNext(Empty.getDefaultInstance()); api.deleteColumn(request.getColumnId());
responseObserver.onCompleted(); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
handleError(responseObserver, ex);
}
}); });
} }
@ -166,7 +182,7 @@ public class GrpcServer extends Server {
responseObserver.onNext(response); responseObserver.onNext(response);
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -187,35 +203,127 @@ public class GrpcServer extends Server {
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@Override @Override
public void putBatch(PutBatchRequest request, StreamObserver<Empty> responseObserver) { public StreamObserver<PutBatchRequest> putBatch(StreamObserver<Empty> responseObserver) {
executor.execute(() -> { final ServerCallStreamObserver<Empty> serverCallStreamObserver =
try { (ServerCallStreamObserver<Empty>) responseObserver;
try (var arena = Arena.ofConfined()) { serverCallStreamObserver.disableAutoRequest();
api.putBatch(arena, serverCallStreamObserver.request(1);
request.getColumnId(), var requestObserver = new StreamObserver<PutBatchRequest>() {
mapKeysKV(arena, request.getDataCount(), request::getData), enum State {
mapValuesKV(arena, request.getDataCount(), request::getData), BEFORE_INITIAL_REQUEST,
switch (request.getMode()) { RECEIVING_DATA,
case WRITE_BATCH -> PutBatchMode.WRITE_BATCH; RECEIVED_ALL
case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL;
case SST_INGESTION -> PutBatchMode.SST_INGESTION;
case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND;
case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\"");
}
);
}
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
} }
}); private final ExecutorService sstExecutor = Executors.newSingleThreadExecutor();
final AtomicInteger pendingRequests = new AtomicInteger();
State state = State.BEFORE_INITIAL_REQUEST;
private PutBatchInitialRequest initialRequest;
private Subscriber<? super KVBatch> putBatchInputsSubscriber;
@Override
public void onNext(PutBatchRequest putBatchRequest) {
if (state == State.BEFORE_INITIAL_REQUEST) {
if (!putBatchRequest.hasInitialRequest()) {
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
}
initialRequest = putBatchRequest.getInitialRequest();
try {
asyncApi.putBatchAsync(initialRequest.getColumnId(),
subscriber2 -> {
putBatchInputsSubscriber = subscriber2;
subscriber2.onSubscribe(new Subscription() {
@Override
public void request(long l) {
serverCallStreamObserver.request(Math.toIntExact(l));
}
@Override
public void cancel() {
serverCallStreamObserver.onError(new IOException("Cancelled"));
}
});
},
switch (initialRequest.getMode()) {
case WRITE_BATCH -> PutBatchMode.WRITE_BATCH;
case WRITE_BATCH_NO_WAL -> PutBatchMode.WRITE_BATCH_NO_WAL;
case SST_INGESTION -> PutBatchMode.SST_INGESTION;
case SST_INGEST_BEHIND -> PutBatchMode.SST_INGEST_BEHIND;
case UNRECOGNIZED -> throw new UnsupportedOperationException("Unrecognized request \"mode\"");
}
).whenComplete((_, ex) -> {
if (ex != null) {
handleError(serverCallStreamObserver, ex);
} else {
serverCallStreamObserver.onNext(Empty.getDefaultInstance());
serverCallStreamObserver.onCompleted();
}
});
} catch (Throwable ex) {
handleError(serverCallStreamObserver, ex);
}
state = State.RECEIVING_DATA;
} else if (state == State.RECEIVING_DATA) {
pendingRequests.incrementAndGet();
var kvBatch = putBatchRequest.getData();
sstExecutor.execute(() -> {
try {
try (var arena = Arena.ofConfined()) {
putBatchInputsSubscriber.onNext(mapKVBatch(arena, kvBatch.getEntriesCount(), kvBatch::getEntries));
}
checkCompleted(true);
} catch (Throwable ex) {
putBatchInputsSubscriber.onError(ex);
}
});
} else {
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Invalid request"));
}
}
@Override
public void onError(Throwable throwable) {
state = State.RECEIVED_ALL;
doFinally();
if (putBatchInputsSubscriber != null) {
putBatchInputsSubscriber.onError(throwable);
} else {
serverCallStreamObserver.onError(throwable);
}
}
@Override
public void onCompleted() {
if (state == State.BEFORE_INITIAL_REQUEST) {
serverCallStreamObserver.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_INVALID_REQUEST, "Missing initial request"));
} else if (state == State.RECEIVING_DATA) {
state = State.RECEIVED_ALL;
checkCompleted(false);
} else {
putBatchInputsSubscriber.onError(RocksDBException.of(RocksDBException.RocksDBErrorType.PUT_UNKNOWN_ERROR, "Unknown state during onComplete: " + state));
}
}
private void checkCompleted(boolean requestDone) {
if ((requestDone ? pendingRequests.decrementAndGet() : pendingRequests.get()) == 0
&& state == State.RECEIVED_ALL) {
doFinally();
putBatchInputsSubscriber.onComplete();
}
}
private void doFinally() {
sstExecutor.shutdown();
}
};
return requestObserver;
} }
@Override @Override
@ -253,7 +361,7 @@ public class GrpcServer extends Server {
new RequestNothing<>()); new RequestNothing<>());
} }
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
return; return;
} }
@ -308,7 +416,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -337,7 +445,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -359,7 +467,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -381,7 +489,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -406,7 +514,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -432,7 +540,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -452,7 +560,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -474,7 +582,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -487,7 +595,7 @@ public class GrpcServer extends Server {
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -502,7 +610,7 @@ public class GrpcServer extends Server {
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -520,7 +628,7 @@ public class GrpcServer extends Server {
responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -539,7 +647,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -571,7 +679,7 @@ public class GrpcServer extends Server {
} }
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); handleError(responseObserver, ex);
} }
}); });
} }
@ -641,6 +749,30 @@ public class GrpcServer extends Server {
} }
return keys; return keys;
} }
private static KVBatch mapKVBatch(Arena arena, int count, Int2ObjectFunction<KV> getterAt) {
return new KVBatch(
mapKeysKV(arena, count, getterAt),
mapValuesKV(arena, count, getterAt)
);
}
private static void handleError(StreamObserver<?> responseObserver, Throwable ex) {
if (ex instanceof CompletionException exx) {
handleError(responseObserver, exx.getCause());
} else {
if (ex instanceof RocksDBException e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getLocalizedMessage())
.withCause(e)
.asException());
} else {
responseObserver.onError(Status.INTERNAL
.withCause(ex)
.asException());
}
}
}
} }
@Override @Override

View File

@ -54,6 +54,10 @@ message KV {
bytes value = 2; bytes value = 2;
} }
message KVBatch {
repeated KV entries = 1;
}
message OpenTransactionRequest {int64 timeoutMs = 1;} message OpenTransactionRequest {int64 timeoutMs = 1;}
message OpenTransactionResponse {int64 transactionId = 1;} message OpenTransactionResponse {int64 transactionId = 1;}
@ -72,7 +76,8 @@ message GetColumnIdResponse {int64 columnId = 1;}
message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;} message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;}
message PutBatchRequest {int64 columnId = 1; repeated KV data = 2; PutBatchMode mode = 3;} message PutBatchInitialRequest {int64 columnId = 1; PutBatchMode mode = 2;}
message PutBatchRequest {oneof putBatchRequestType {PutBatchInitialRequest initialRequest = 1;KVBatch data = 2;}}
message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;} message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;}
message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}} message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}}
@ -97,7 +102,7 @@ service RocksDBService {
rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty); rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty);
rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse); rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse);
rpc put(PutRequest) returns (google.protobuf.Empty); rpc put(PutRequest) returns (google.protobuf.Empty);
rpc putBatch(PutBatchRequest) returns (google.protobuf.Empty); rpc putBatch(stream PutBatchRequest) returns (google.protobuf.Empty);
rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty); rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty);
rpc putGetPrevious(PutRequest) returns (Previous); rpc putGetPrevious(PutRequest) returns (Previous);
rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous); rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous);

View File

@ -65,7 +65,7 @@ database: {
block-cache: 512MiB block-cache: 512MiB
# Database write buffer manager size # Database write buffer manager size
# You should enable this option if you are using direct I/O or spinning disks # You should enable this option if you are using direct I/O or spinning disks
write-buffer-manager: 64MiB write-buffer-manager: 128MiB
# Log data path # Log data path
log-path: ./logs log-path: ./logs
# Write-Ahead-Log data path # Write-Ahead-Log data path
@ -132,7 +132,7 @@ database: {
# This should be kept to null if write-buffer-manager is set, # This should be kept to null if write-buffer-manager is set,
# or if you want to use the "memtable-memory-budget-size" logic. # or if you want to use the "memtable-memory-budget-size" logic.
# Remember that there are "max-write-buffer-number" in memory, 2 by default # Remember that there are "max-write-buffer-number" in memory, 2 by default
write-buffer-size: 200MiB write-buffer-size: 64MiB
# Enable blob files # Enable blob files
blob-files: false blob-files: false
} }

View File

@ -28,6 +28,7 @@ module rockserver.core {
requires io.netty.codec; requires io.netty.codec;
requires io.netty.codec.http2; requires io.netty.codec.http2;
requires io.netty.transport.classes.epoll; requires io.netty.transport.classes.epoll;
requires org.reactivestreams;
exports it.cavallium.rockserver.core.client; exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common; exports it.cavallium.rockserver.core.common;

View File

@ -3,24 +3,25 @@ package it.cavallium.rockserver.core.impl.test;
import static it.cavallium.rockserver.core.common.Utils.toMemorySegmentSimple; import static it.cavallium.rockserver.core.common.Utils.toMemorySegmentSimple;
import it.cavallium.rockserver.core.client.EmbeddedConnection; import it.cavallium.rockserver.core.client.EmbeddedConnection;
import it.cavallium.rockserver.core.common.Keys; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Delta;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBRetryException;
import it.cavallium.rockserver.core.common.Utils;
import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.ObjectList; import it.unimi.dsi.fastutil.objects.ObjectList;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import java.io.IOException; import java.io.IOException;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
abstract class EmbeddedDBTest { abstract class EmbeddedDBTest {
@ -362,6 +363,73 @@ abstract class EmbeddedDBTest {
} }
} }
@Test
void putBatchSST() {
@NotNull Publisher<@NotNull KVBatch> batchPublisher = new Publisher<KVBatch>() {
@Override
public void subscribe(Subscriber<? super KVBatch> subscriber) {
subscriber.onSubscribe(new Subscription() {
Iterator<KVBatch> it;
{
ArrayList<KVBatch> items = new ArrayList<>();
ArrayList<Keys> keys = new ArrayList<>();
ArrayList<MemorySegment> values = new ArrayList<>();
for (int i = 0; i < 2; i++) {
var keyI = getKeyI(i);
var valueI = getValueI(i);
keys.add(keyI);
values.add(valueI);
}
items.add(new KVBatch(keys, values));
keys = new ArrayList<>();
values = new ArrayList<>();
for (int i = 2; i < 4; i++) {
var keyI = getKeyI(i);
var valueI = getValueI(i);
keys.add(keyI);
values.add(valueI);
}
items.add(new KVBatch(keys, values));
it = items.iterator();
}
@Override
public void request(long l) {
while (l-- > 0) {
if (it.hasNext()) {
subscriber.onNext(it.next());
} else {
subscriber.onComplete();
return;
}
}
}
@Override
public void cancel() {
}
});
}
};
if (this.getSchema().variableLengthKeysCount() <= 0) {
db.putBatch(colId, batchPublisher, PutBatchMode.SST_INGESTION);
if (getHasValues()) {
for (int i = 0; i < 4; i++) {
assertSegmentEquals(getValueI(i), db.get(arena, 0, colId, getKeyI(i), RequestType.current()));
}
}
for (int i = 0; i < 4; i++) {
Assertions.assertTrue(db.get(arena, 0, colId, getKeyI(i), RequestType.exists()));
}
} else {
Assertions.assertThrows(RocksDBException.class, () -> {
db.putBatch(colId, batchPublisher, PutBatchMode.SST_INGESTION);
});
}
}
@Test @Test
void concurrentUpdate() { void concurrentUpdate() {
if (getHasValues()) { if (getHasValues()) {

View File

@ -0,0 +1,208 @@
package it.cavallium.rockserver.core.impl.test;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.config.*;
import it.cavallium.rockserver.core.impl.EmbeddedDB;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.objects.ObjectList;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.rocksdb.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
public class TestSSTWriter {
private static final Logger LOG = LoggerFactory.getLogger(TestSSTWriter.class);
private EmbeddedDB db;
private long colId;
@BeforeEach
public void setUp() throws IOException {
db = new EmbeddedDB(null, "test", null);
this.colId = db.createColumn("test", ColumnSchema.of(IntList.of(Long.BYTES), ObjectList.of(), true));
}
@Test
public void test() throws IOException {
LOG.info("Obtaining sst writer");
var globalDatabaseConfigOverride = new GlobalDatabaseConfig() {
@Override
public boolean spinning() {
return false;
}
@Override
public boolean checksum() {
return false;
}
@Override
public boolean useDirectIo() {
return false;
}
@Override
public boolean allowRocksdbMemoryMapping() {
return true;
}
@Override
public @Nullable Integer maximumOpenFiles() {
return -1;
}
@Override
public boolean optimistic() {
return true;
}
@Override
public @Nullable DataSize blockCache() {
return new DataSize("10MiB");
}
@Override
public @Nullable DataSize writeBufferManager() {
return new DataSize("1MiB");
}
@Override
public @Nullable Path logPath() {
return null;
}
@Override
public @Nullable Path walPath() {
return null;
}
@Override
public @Nullable Duration delayWalFlushDuration() {
return null;
}
@Override
public boolean absoluteConsistency() {
return false;
}
@Override
public boolean ingestBehind() {
return true;
}
@Override
public boolean unorderedWrite() {
return false;
}
@Override
public VolumeConfig[] volumes() {
return new VolumeConfig[0];
}
@Override
public FallbackColumnConfig fallbackColumnOptions() {
return null;
}
@Override
public NamedColumnConfig[] columnOptions() {
return new NamedColumnConfig[0];
}
};
var fallbackColumnConfig = new FallbackColumnConfig() {
@Override
public ColumnLevelConfig[] levels() {
return new ColumnLevelConfig[] {
new ColumnLevelConfig() {
@Override
public CompressionType compression() {
return CompressionType.NO_COMPRESSION;
}
@Override
public DataSize maxDictBytes() {
return DataSize.ZERO;
}
}
};
}
@Override
public @Nullable DataSize memtableMemoryBudgetBytes() {
return new DataSize("1MiB");
}
@Override
public @Nullable Boolean cacheIndexAndFilterBlocks() {
return true;
}
@Override
public @Nullable Boolean partitionFilters() {
return false;
}
@Override
public @Nullable BloomFilterConfig bloomFilter() {
return new BloomFilterConfig() {
@Override
public int bitsPerKey() {
return 10;
}
@Override
public @Nullable Boolean optimizeForHits() {
return true;
}
};
}
@Override
public @Nullable DataSize blockSize() {
return new DataSize("128KiB");
}
@Override
public @Nullable DataSize writeBufferSize() {
return new DataSize("1MiB");
}
};
try (var sstWriter = db.getSSTWriter(colId, globalDatabaseConfigOverride, fallbackColumnConfig, false, true)) {
LOG.info("Creating sst");
var tl = ThreadLocalRandom.current();
var bytes = new byte[1024];
long i = 0;
while (i < 10_000) {
var ib = Longs.toByteArray(i++);
tl.nextBytes(bytes);
sstWriter.put(ib, bytes);
}
LOG.info("Writing pending sst data");
sstWriter.writePending();
LOG.info("Done, closing");
}
LOG.info("Done");
}
@AfterEach
public void tearDown() throws IOException {
db.close();
}
}

View File

@ -3,6 +3,12 @@ module rockserver.core.test {
requires rockserver.core; requires rockserver.core;
requires org.junit.jupiter.api; requires org.junit.jupiter.api;
requires it.unimi.dsi.fastutil; requires it.unimi.dsi.fastutil;
requires com.google.common;
requires org.slf4j;
requires org.github.gestalt.core;
requires org.jetbrains.annotations;
requires rocksdbjni;
requires org.reactivestreams;
opens it.cavallium.rockserver.core.test; opens it.cavallium.rockserver.core.test;
opens it.cavallium.rockserver.core.impl.test; opens it.cavallium.rockserver.core.impl.test;
} }