Add write batch

This commit is contained in:
Andrea Cavalli 2024-09-20 15:18:45 +02:00
parent e7a201aab3
commit 3594cb6b76
8 changed files with 175 additions and 106 deletions

View File

@ -33,36 +33,10 @@ import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBSyncAPI; import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import it.cavallium.rockserver.core.common.UpdateContext; import it.cavallium.rockserver.core.common.UpdateContext;
import it.cavallium.rockserver.core.common.Utils.HostAndPort; import it.cavallium.rockserver.core.common.Utils.HostAndPort;
import it.cavallium.rockserver.core.common.api.proto.Changed; import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse;
import it.cavallium.rockserver.core.common.api.proto.ColumnHashType;
import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest;
import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse;
import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest;
import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest;
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse;
import it.cavallium.rockserver.core.common.api.proto.GetRequest;
import it.cavallium.rockserver.core.common.api.proto.GetResponse;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest;
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse;
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest;
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse;
import it.cavallium.rockserver.core.common.api.proto.Previous;
import it.cavallium.rockserver.core.common.api.proto.PreviousPresence;
import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest;
import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest;
import it.cavallium.rockserver.core.common.api.proto.PutRequest;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub;
import it.cavallium.rockserver.core.common.api.proto.SeekToRequest;
import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.io.IOException; import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
@ -214,6 +188,25 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
+ count + " != " + allValues.size()); + count + " != " + allValues.size());
} }
CompletableFuture<List<T>> responseObserver;
if (requestType instanceof RequestType.RequestNothing<?>) {
var putBatchRequestBuilder = PutBatchRequest.newBuilder()
.setTransactionOrUpdateId(transactionOrUpdateId)
.setColumnId(columnId);
var it1 = allKeys.iterator();
var it2 = allValues.iterator();
while (it1.hasNext()) {
var k = it1.next();
var v = it2.next();
putBatchRequestBuilder.addData(mapKV(k, v));
}
return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null);
}
var initialRequest = PutMultiRequest.newBuilder() var initialRequest = PutMultiRequest.newBuilder()
.setInitialRequest(PutMultiInitialRequest.newBuilder() .setInitialRequest(PutMultiInitialRequest.newBuilder()
.setTransactionOrUpdateId(transactionOrUpdateId) .setTransactionOrUpdateId(transactionOrUpdateId)
@ -221,15 +214,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI {
.build()) .build())
.build(); .build();
CompletableFuture<List<T>> responseObserver;
StreamObserver<PutMultiRequest> requestPublisher = switch (requestType) { StreamObserver<PutMultiRequest> requestPublisher = switch (requestType) {
case RequestNothing<?> _ -> {
var thisResponseObserver = new CollectListStreamObserver<Empty>(0);
//noinspection unchecked
responseObserver = (CompletableFuture<List<T>>) (CompletableFuture<?>) thisResponseObserver;
yield this.asyncStub.putMulti(thisResponseObserver);
}
case RequestPrevious<?> _ -> { case RequestPrevious<?> _ -> {
var thisResponseObserver = new CollectListMappedStreamObserver<Previous, @Nullable MemorySegment>( var thisResponseObserver = new CollectListMappedStreamObserver<Previous, @Nullable MemorySegment>(
GrpcConnection::mapPrevious, count); GrpcConnection::mapPrevious, count);

View File

@ -25,7 +25,8 @@ public class RocksDBException extends RuntimeException {
COMMIT_FAILED_TRY_AGAIN, COMMIT_FAILED_TRY_AGAIN,
COMMIT_FAILED, COMMIT_FAILED,
TX_NOT_FOUND, TX_NOT_FOUND,
KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR,
WRITE_BATCH_1
} }
public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) {

View File

@ -20,12 +20,8 @@ import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.config.ConfigParser; import it.cavallium.rockserver.core.config.ConfigParser;
import it.cavallium.rockserver.core.config.ConfigPrinter; import it.cavallium.rockserver.core.config.ConfigPrinter;
import it.cavallium.rockserver.core.config.DatabaseConfig; import it.cavallium.rockserver.core.config.DatabaseConfig;
import it.cavallium.rockserver.core.impl.rocksdb.REntry; import it.cavallium.rockserver.core.impl.rocksdb.*;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
import it.cavallium.rockserver.core.impl.rocksdb.Tx;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -51,14 +47,8 @@ import java.util.logging.Level;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.*;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Status.Code; import org.rocksdb.Status.Code;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -427,21 +417,48 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
if (keys.size() != values.size()) { if (keys.size() != values.size()) {
throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size());
} }
List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size()); if (requestType instanceof RequestType.RequestNothing<?>
for (int i = 0; i < keys.size(); i++) { && !getColumn(columnId).hasBuckets()
var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType); && transactionOrUpdateId == 0L) {
if (responses != null) { ops.beginOp();
responses.add(result); try {
// Column id
var col = getColumn(columnId);
try (var wb = new WB(new WriteBatch())) {
var keyIt = keys.iterator();
var valusIt = values.iterator();
while (keyIt.hasNext()) {
var key = keyIt.next();
var value = valusIt.next();
put(arena, wb, col, 0, key, value, requestType);
wb.write(db.get());
}
}
return List.of();
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex);
} finally {
ops.endOp();
} }
} else {
List<T> responses = requestType instanceof RequestType.RequestNothing<?> ? null : new ArrayList<>(keys.size());
for (int i = 0; i < keys.size(); i++) {
var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType);
if (responses != null) {
responses.add(result);
}
}
return responses != null ? responses : List.of();
} }
return responses != null ? responses : List.of();
} }
/** /**
* @param txConsumer this can be called multiple times, if the optimistic transaction failed * @param txConsumer this can be called multiple times, if the optimistic transaction failed
*/ */
public <R> R wrapWithTransactionIfNeeded(@Nullable Tx tx, boolean needTransaction, public <T extends TxOrWb, R> R wrapWithTransactionIfNeeded(@Nullable T tx, boolean needTransaction,
ExFunction<@Nullable Tx, R> txConsumer) throws Exception { ExFunction<@Nullable T, R> txConsumer) throws Exception {
if (needTransaction) { if (needTransaction) {
return ensureWrapWithTransaction(tx, txConsumer); return ensureWrapWithTransaction(tx, txConsumer);
} else { } else {
@ -453,8 +470,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
/** /**
* @param txConsumer this can be called multiple times, if the optimistic transaction failed * @param txConsumer this can be called multiple times, if the optimistic transaction failed
*/ */
public <R> R ensureWrapWithTransaction(@Nullable Tx tx, public <T extends TxOrWb, R> R ensureWrapWithTransaction(@Nullable T tx,
ExFunction<@NotNull Tx, R> txConsumer) throws Exception { ExFunction<@NotNull T, R> txConsumer) throws Exception {
R result; R result;
if (tx == null) { if (tx == null) {
// Retry using a transaction: transactions are required to handle this kind of data // Retry using a transaction: transactions are required to handle this kind of data
@ -462,7 +479,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
try { try {
boolean committed; boolean committed;
do { do {
result = txConsumer.apply(newTx); //noinspection unchecked
result = txConsumer.apply((T) newTx);
committed = this.closeTransaction(newTx, true); committed = this.closeTransaction(newTx, true);
if (!committed) { if (!committed) {
Thread.yield(); Thread.yield();
@ -478,7 +496,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} }
private <U> U put(Arena arena, private <U> U put(Arena arena,
@Nullable Tx optionalTxOrUpdate, @Nullable TxOrWb optionalTxOrUpdate,
ColumnInstance col, ColumnInstance col,
long updateId, long updateId,
@NotNull Keys keys, @NotNull Keys keys,
@ -492,49 +510,55 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
boolean needsTx = col.hasBuckets() boolean needsTx = col.hasBuckets()
|| requirePreviousValue || requirePreviousValue
|| requirePreviousPresence; || requirePreviousPresence;
if (optionalTxOrUpdate != null && optionalTxOrUpdate.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) { if (optionalTxOrUpdate instanceof Tx tx && tx.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"You can't get the previous value or delta, when you are already updating that value"); "You can't get the previous value or delta, when you are already updating that value");
} }
if (updateId != 0L && optionalTxOrUpdate == null) { if (updateId != 0L && !(optionalTxOrUpdate instanceof Tx)) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"Update id must be accompanied with a valid transaction"); "Update id must be accompanied with a valid transaction");
} }
if (col.hasBuckets() && (optionalTxOrUpdate instanceof WB)) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST,
"Column with buckets don't support write batches");
}
return wrapWithTransactionIfNeeded(optionalTxOrUpdate, needsTx, tx -> { return wrapWithTransactionIfNeeded(optionalTxOrUpdate, needsTx, tx -> {
MemorySegment previousValue; MemorySegment previousValue;
MemorySegment calculatedKey = col.calculateKey(arena, keys.keys()); MemorySegment calculatedKey = col.calculateKey(arena, keys.keys());
if (updateId != 0L) { if (updateId != 0L) {
assert tx != null; assert tx instanceof Tx;
tx.val().setSavePoint(); ((Tx) tx).val().setSavePoint();
} }
if (col.hasBuckets()) { if (col.hasBuckets()) {
assert tx != null; assert tx instanceof Tx;
var bucketElementKeys = col.getBucketElementKeys(keys.keys()); var bucketElementKeys = col.getBucketElementKeys(keys.keys());
try (var readOptions = new ReadOptions()) { try (var readOptions = new ReadOptions()) {
var previousRawBucketByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); var previousRawBucketByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray); MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray);
var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col); var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col);
previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value)); previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value));
tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(bucket.toSegment(arena))); var k = Utils.toByteArray(calculatedKey);
var v = Utils.toByteArray(bucket.toSegment(arena));
((Tx) tx).val().put(col.cfh(), k, v);
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e);
} }
} else { } else {
if (RequestType.requiresGettingPreviousValue(callback)) { if (RequestType.requiresGettingPreviousValue(callback)) {
assert tx != null; assert tx instanceof Tx;
try (var readOptions = new ReadOptions()) { try (var readOptions = new ReadOptions()) {
byte[] previousValueByteArray; byte[] previousValueByteArray;
previousValueByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray)); previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray));
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
} }
} else if (RequestType.requiresGettingPreviousPresence(callback)) { } else if (RequestType.requiresGettingPreviousPresence(callback)) {
// todo: in the future this should be replaced with just keyExists // todo: in the future this should be replaced with just keyExists
assert tx != null; assert tx instanceof Tx;
try (var readOptions = new ReadOptions()) { try (var readOptions = new ReadOptions()) {
byte[] previousValueByteArray; byte[] previousValueByteArray;
previousValueByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true);
previousValue = previousValueByteArray != null ? MemorySegment.NULL : null; previousValue = previousValueByteArray != null ? MemorySegment.NULL : null;
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e);
@ -542,13 +566,15 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
} else { } else {
previousValue = null; previousValue = null;
} }
if (tx != null) { switch (tx) {
tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
} else { case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value));
try (var w = new WriteOptions()) { case null -> {
var keyBB = calculatedKey.asByteBuffer(); try (var w = new WriteOptions()) {
ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer(); var keyBB = calculatedKey.asByteBuffer();
db.get().put(col.cfh(), w, keyBB, valueBB); ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer();
db.get().put(col.cfh(), w, keyBB, valueBB);
}
} }
} }
} }
@ -562,8 +588,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
if (updateId != 0L) { if (updateId != 0L) {
if (!closeTransaction(updateId, true)) { if (!closeTransaction(updateId, true)) {
tx.val().rollbackToSavePoint(); ((Tx) tx).val().rollbackToSavePoint();
tx.val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey)); ((Tx) tx).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey));
throw new RocksDBRetryException(); throw new RocksDBRetryException();
} }
} }

View File

@ -5,7 +5,7 @@ import org.rocksdb.AbstractNativeReference;
import org.rocksdb.Transaction; import org.rocksdb.Transaction;
public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs) public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs)
implements Closeable { implements Closeable, TxOrWb {
@Override @Override
public void close() { public void close() {

View File

@ -0,0 +1,4 @@
package it.cavallium.rockserver.core.impl.rocksdb;
public sealed interface TxOrWb permits Tx, WB {
}

View File

@ -0,0 +1,25 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.common.RocksDBException;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import java.io.Closeable;
import java.io.IOException;
public record WB(@NotNull WriteBatch wb) implements Closeable, TxOrWb {
@Override
public void close() {
wb.close();
}
public void write(RocksDB rocksDB) throws RocksDBException {
try (var w = new WriteOptions()) {
rocksDB.write(w, wb);
} catch (org.rocksdb.RocksDBException e) {
throw RocksDBException.of(RocksDBException.RocksDBErrorType.WRITE_BATCH_1, e);
}
}
}

View File

@ -15,6 +15,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.client.RocksDBConnection;
import it.cavallium.rockserver.core.common.*; import it.cavallium.rockserver.core.common.*;
import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestChanged;
import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent;
import it.cavallium.rockserver.core.common.RequestType.RequestDelta; import it.cavallium.rockserver.core.common.RequestType.RequestDelta;
@ -24,33 +26,9 @@ import it.cavallium.rockserver.core.common.RequestType.RequestMulti;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing; import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; import it.cavallium.rockserver.core.common.RequestType.RequestPrevious;
import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence;
import it.cavallium.rockserver.core.common.api.proto.Changed; import it.cavallium.rockserver.core.common.api.proto.*;
import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest;
import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse;
import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest;
import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse;
import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest;
import it.cavallium.rockserver.core.common.api.proto.Delta; import it.cavallium.rockserver.core.common.api.proto.Delta;
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest;
import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse;
import it.cavallium.rockserver.core.common.api.proto.GetRequest;
import it.cavallium.rockserver.core.common.api.proto.GetResponse;
import it.cavallium.rockserver.core.common.api.proto.KV;
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest;
import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse;
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest;
import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse;
import it.cavallium.rockserver.core.common.api.proto.Previous;
import it.cavallium.rockserver.core.common.api.proto.PreviousPresence;
import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest;
import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest;
import it.cavallium.rockserver.core.common.api.proto.PutRequest;
import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase;
import it.cavallium.rockserver.core.common.api.proto.SeekToRequest;
import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest;
import it.cavallium.rockserver.core.common.api.proto.UpdateBegin;
import it.unimi.dsi.fastutil.ints.Int2IntFunction; import it.unimi.dsi.fastutil.ints.Int2IntFunction;
import it.unimi.dsi.fastutil.ints.Int2ObjectFunction; import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
@ -61,6 +39,8 @@ import java.io.IOException;
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment; import java.lang.foreign.MemorySegment;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -147,6 +127,7 @@ public class GrpcServer extends Server {
executor.execute(() -> { executor.execute(() -> {
try { try {
api.closeFailedUpdate(request.getUpdateId()); api.closeFailedUpdate(request.getUpdateId());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); responseObserver.onError(ex);
@ -168,6 +149,7 @@ public class GrpcServer extends Server {
public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) { public void deleteColumn(DeleteColumnRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> { executor.execute(() -> {
api.deleteColumn(request.getColumnId()); api.deleteColumn(request.getColumnId());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
}); });
} }
@ -199,6 +181,7 @@ public class GrpcServer extends Server {
new RequestNothing<>() new RequestNothing<>()
); );
} }
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); responseObserver.onError(ex);
@ -206,6 +189,27 @@ public class GrpcServer extends Server {
}); });
} }
@Override
public void putBatch(PutBatchRequest request, StreamObserver<Empty> responseObserver) {
executor.execute(() -> {
try {
try (var arena = Arena.ofConfined()) {
api.putMulti(arena,
request.getTransactionOrUpdateId(),
request.getColumnId(),
mapKeysKV(arena, request.getDataCount(), request::getData),
mapValuesKV(arena, request.getDataCount(), request::getData),
new RequestNothing<>()
);
}
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Throwable ex) {
responseObserver.onError(ex);
}
});
}
@Override @Override
public StreamObserver<PutMultiRequest> putMulti(StreamObserver<Empty> responseObserver) { public StreamObserver<PutMultiRequest> putMulti(StreamObserver<Empty> responseObserver) {
return new StreamObserver<>() { return new StreamObserver<>() {
@ -248,6 +252,7 @@ public class GrpcServer extends Server {
var newProcessedRequestCount = processedRequestsCount.incrementAndGet(); var newProcessedRequestCount = processedRequestsCount.incrementAndGet();
if (requestsCountFinalized) { if (requestsCountFinalized) {
if (newProcessedRequestCount == requestsCount) { if (newProcessedRequestCount == requestsCount) {
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
} }
@ -471,6 +476,7 @@ public class GrpcServer extends Server {
executor.execute(() -> { executor.execute(() -> {
try { try {
api.closeIterator(request.getIteratorId()); api.closeIterator(request.getIteratorId());
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); responseObserver.onError(ex);
@ -485,6 +491,7 @@ public class GrpcServer extends Server {
try (var arena = Arena.ofConfined()) { try (var arena = Arena.ofConfined()) {
api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys));
} }
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); responseObserver.onError(ex);
@ -502,6 +509,7 @@ public class GrpcServer extends Server {
request.getTakeCount(), request.getTakeCount(),
new RequestNothing<>()); new RequestNothing<>());
} }
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted(); responseObserver.onCompleted();
} catch (Throwable ex) { } catch (Throwable ex) {
responseObserver.onError(ex); responseObserver.onError(ex);
@ -608,6 +616,23 @@ public class GrpcServer extends Server {
} }
return new Keys(segments); return new Keys(segments);
} }
private static List<Keys> mapKeysKV(Arena arena, int count, Int2ObjectFunction<KV> keyGetterAt) {
var keys = new ArrayList<Keys>(count);
for (int i = 0; i < count; i++) {
var k = keyGetterAt.apply(i);
keys.add(mapKeys(arena, k.getKeysCount(), k::getKeys));
}
return keys;
}
private static List<MemorySegment> mapValuesKV(Arena arena, int count, Int2ObjectFunction<KV> keyGetterAt) {
var keys = new ArrayList<MemorySegment>(count);
for (int i = 0; i < count; i++) {
keys.add(toMemorySegment(arena, keyGetterAt.get(i).getValue()));
}
return keys;
}
} }
@Override @Override

View File

@ -65,6 +65,8 @@ message GetColumnIdResponse {int64 columnId = 1;}
message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;} message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;}
message PutBatchRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; repeated KV data = 3;}
message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;} message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;}
message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}} message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}}
@ -88,6 +90,7 @@ service RocksDBService {
rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty); rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty);
rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse); rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse);
rpc put(PutRequest) returns (google.protobuf.Empty); rpc put(PutRequest) returns (google.protobuf.Empty);
rpc putBatch(PutBatchRequest) returns (google.protobuf.Empty);
rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty); rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty);
rpc putGetPrevious(PutRequest) returns (Previous); rpc putGetPrevious(PutRequest) returns (Previous);
rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous); rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous);