From 3594cb6b76972c574db6194a07da3ce6a3a107c0 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 20 Sep 2024 15:18:45 +0200 Subject: [PATCH] Add write batch --- .../core/client/GrpcConnection.java | 55 +++------ .../core/common/RocksDBException.java | 3 +- .../rockserver/core/impl/EmbeddedDB.java | 114 +++++++++++------- .../rockserver/core/impl/rocksdb/Tx.java | 2 +- .../rockserver/core/impl/rocksdb/TxOrWb.java | 4 + .../rockserver/core/impl/rocksdb/WB.java | 25 ++++ .../rockserver/core/server/GrpcServer.java | 75 ++++++++---- src/main/proto/rocksdb.proto | 3 + 8 files changed, 175 insertions(+), 106 deletions(-) create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TxOrWb.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java diff --git a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java index 04fcf73..fc6cafc 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/GrpcConnection.java @@ -33,36 +33,10 @@ import it.cavallium.rockserver.core.common.RocksDBException; import it.cavallium.rockserver.core.common.RocksDBSyncAPI; import it.cavallium.rockserver.core.common.UpdateContext; import it.cavallium.rockserver.core.common.Utils.HostAndPort; -import it.cavallium.rockserver.core.common.api.proto.Changed; -import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest; -import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest; -import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest; -import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse; -import it.cavallium.rockserver.core.common.api.proto.ColumnHashType; -import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest; -import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse; -import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest; -import it.cavallium.rockserver.core.common.api.proto.Delta; -import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest; -import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse; -import it.cavallium.rockserver.core.common.api.proto.GetRequest; -import it.cavallium.rockserver.core.common.api.proto.GetResponse; -import it.cavallium.rockserver.core.common.api.proto.KV; -import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest; -import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse; -import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest; -import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse; -import it.cavallium.rockserver.core.common.api.proto.Previous; -import it.cavallium.rockserver.core.common.api.proto.PreviousPresence; -import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest; -import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest; -import it.cavallium.rockserver.core.common.api.proto.PutRequest; -import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc; +import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceBlockingStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceFutureStub; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceStub; -import it.cavallium.rockserver.core.common.api.proto.SeekToRequest; -import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest; import it.unimi.dsi.fastutil.ints.IntArrayList; import java.io.IOException; import java.lang.foreign.Arena; @@ -214,6 +188,25 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { + count + " != " + allValues.size()); } + CompletableFuture> responseObserver; + + if (requestType instanceof RequestType.RequestNothing) { + var putBatchRequestBuilder = PutBatchRequest.newBuilder() + .setTransactionOrUpdateId(transactionOrUpdateId) + .setColumnId(columnId); + + var it1 = allKeys.iterator(); + var it2 = allValues.iterator(); + + while (it1.hasNext()) { + var k = it1.next(); + var v = it2.next(); + putBatchRequestBuilder.addData(mapKV(k, v)); + } + + return toResponse(futureStub.putBatch(putBatchRequestBuilder.build()), _ -> null); + } + var initialRequest = PutMultiRequest.newBuilder() .setInitialRequest(PutMultiInitialRequest.newBuilder() .setTransactionOrUpdateId(transactionOrUpdateId) @@ -221,15 +214,7 @@ public class GrpcConnection extends BaseConnection implements RocksDBAPI { .build()) .build(); - CompletableFuture> responseObserver; - StreamObserver requestPublisher = switch (requestType) { - case RequestNothing _ -> { - var thisResponseObserver = new CollectListStreamObserver(0); - //noinspection unchecked - responseObserver = (CompletableFuture>) (CompletableFuture) thisResponseObserver; - yield this.asyncStub.putMulti(thisResponseObserver); - } case RequestPrevious _ -> { var thisResponseObserver = new CollectListMappedStreamObserver( GrpcConnection::mapPrevious, count); diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java index 937ebf6..7fe0e4c 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -25,7 +25,8 @@ public class RocksDBException extends RuntimeException { COMMIT_FAILED_TRY_AGAIN, COMMIT_FAILED, TX_NOT_FOUND, - KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR + KEY_HASH_SIZE_MISMATCH, RESTRICTED_TRANSACTION, PUT_INVALID_REQUEST, UPDATE_RETRY, ROCKSDB_LOAD_ERROR, + WRITE_BATCH_1 } public static RocksDBException of(RocksDBErrorType errorUniqueId, String message) { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index c2c008a..348dd00 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -20,12 +20,8 @@ import it.cavallium.rockserver.core.common.Utils; import it.cavallium.rockserver.core.config.ConfigParser; import it.cavallium.rockserver.core.config.ConfigPrinter; import it.cavallium.rockserver.core.config.DatabaseConfig; -import it.cavallium.rockserver.core.impl.rocksdb.REntry; -import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader; -import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects; -import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB; +import it.cavallium.rockserver.core.impl.rocksdb.*; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; -import it.cavallium.rockserver.core.impl.rocksdb.Tx; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.ByteArrayInputStream; @@ -51,14 +47,8 @@ import java.util.logging.Level; import org.cliffc.high_scale_lib.NonBlockingHashMapLong; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; +import org.rocksdb.*; import org.rocksdb.Status.Code; -import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -427,21 +417,48 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { if (keys.size() != values.size()) { throw new IllegalArgumentException("keys length is different than values length: " + keys.size() + " != " + values.size()); } - List responses = requestType instanceof RequestType.RequestNothing ? null : new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); i++) { - var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType); - if (responses != null) { - responses.add(result); + if (requestType instanceof RequestType.RequestNothing + && !getColumn(columnId).hasBuckets() + && transactionOrUpdateId == 0L) { + ops.beginOp(); + try { + // Column id + var col = getColumn(columnId); + try (var wb = new WB(new WriteBatch())) { + var keyIt = keys.iterator(); + var valusIt = values.iterator(); + while (keyIt.hasNext()) { + var key = keyIt.next(); + var value = valusIt.next(); + put(arena, wb, col, 0, key, value, requestType); + wb.write(db.get()); + } + } + return List.of(); + } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { + throw ex; + } catch (Exception ex) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_UNKNOWN_ERROR, ex); + } finally { + ops.endOp(); } + } else { + List responses = requestType instanceof RequestType.RequestNothing ? null : new ArrayList<>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + var result = put(arena, transactionOrUpdateId, columnId, keys.get(i), values.get(i), requestType); + if (responses != null) { + responses.add(result); + } + } + return responses != null ? responses : List.of(); } - return responses != null ? responses : List.of(); } /** * @param txConsumer this can be called multiple times, if the optimistic transaction failed */ - public R wrapWithTransactionIfNeeded(@Nullable Tx tx, boolean needTransaction, - ExFunction<@Nullable Tx, R> txConsumer) throws Exception { + public R wrapWithTransactionIfNeeded(@Nullable T tx, boolean needTransaction, + ExFunction<@Nullable T, R> txConsumer) throws Exception { if (needTransaction) { return ensureWrapWithTransaction(tx, txConsumer); } else { @@ -453,8 +470,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { /** * @param txConsumer this can be called multiple times, if the optimistic transaction failed */ - public R ensureWrapWithTransaction(@Nullable Tx tx, - ExFunction<@NotNull Tx, R> txConsumer) throws Exception { + public R ensureWrapWithTransaction(@Nullable T tx, + ExFunction<@NotNull T, R> txConsumer) throws Exception { R result; if (tx == null) { // Retry using a transaction: transactions are required to handle this kind of data @@ -462,7 +479,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { try { boolean committed; do { - result = txConsumer.apply(newTx); + //noinspection unchecked + result = txConsumer.apply((T) newTx); committed = this.closeTransaction(newTx, true); if (!committed) { Thread.yield(); @@ -478,7 +496,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } private U put(Arena arena, - @Nullable Tx optionalTxOrUpdate, + @Nullable TxOrWb optionalTxOrUpdate, ColumnInstance col, long updateId, @NotNull Keys keys, @@ -492,49 +510,55 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { boolean needsTx = col.hasBuckets() || requirePreviousValue || requirePreviousPresence; - if (optionalTxOrUpdate != null && optionalTxOrUpdate.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) { + if (optionalTxOrUpdate instanceof Tx tx && tx.isFromGetForUpdate() && (requirePreviousValue || requirePreviousPresence)) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, "You can't get the previous value or delta, when you are already updating that value"); } - if (updateId != 0L && optionalTxOrUpdate == null) { + if (updateId != 0L && !(optionalTxOrUpdate instanceof Tx)) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, "Update id must be accompanied with a valid transaction"); } + if (col.hasBuckets() && (optionalTxOrUpdate instanceof WB)) { + throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_INVALID_REQUEST, + "Column with buckets don't support write batches"); + } return wrapWithTransactionIfNeeded(optionalTxOrUpdate, needsTx, tx -> { MemorySegment previousValue; MemorySegment calculatedKey = col.calculateKey(arena, keys.keys()); if (updateId != 0L) { - assert tx != null; - tx.val().setSavePoint(); + assert tx instanceof Tx; + ((Tx) tx).val().setSavePoint(); } if (col.hasBuckets()) { - assert tx != null; + assert tx instanceof Tx; var bucketElementKeys = col.getBucketElementKeys(keys.keys()); try (var readOptions = new ReadOptions()) { - var previousRawBucketByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); + var previousRawBucketByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); MemorySegment previousRawBucket = toMemorySegment(arena, previousRawBucketByteArray); var bucket = previousRawBucket != null ? new Bucket(col, previousRawBucket) : new Bucket(col); previousValue = transformResultValue(col, bucket.addElement(bucketElementKeys, value)); - tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(bucket.toSegment(arena))); + var k = Utils.toByteArray(calculatedKey); + var v = Utils.toByteArray(bucket.toSegment(arena)); + ((Tx) tx).val().put(col.cfh(), k, v); } catch (RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_1, e); } } else { if (RequestType.requiresGettingPreviousValue(callback)) { - assert tx != null; + assert tx instanceof Tx; try (var readOptions = new ReadOptions()) { byte[] previousValueByteArray; - previousValueByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); + previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = transformResultValue(col, toMemorySegment(arena, previousValueByteArray)); } catch (RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); } } else if (RequestType.requiresGettingPreviousPresence(callback)) { // todo: in the future this should be replaced with just keyExists - assert tx != null; + assert tx instanceof Tx; try (var readOptions = new ReadOptions()) { byte[] previousValueByteArray; - previousValueByteArray = tx.val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); + previousValueByteArray = ((Tx) tx).val().getForUpdate(readOptions, col.cfh(), calculatedKey.toArray(BIG_ENDIAN_BYTES), true); previousValue = previousValueByteArray != null ? MemorySegment.NULL : null; } catch (RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.PUT_2, e); @@ -542,13 +566,15 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } else { previousValue = null; } - if (tx != null) { - tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); - } else { - try (var w = new WriteOptions()) { - var keyBB = calculatedKey.asByteBuffer(); - ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer(); - db.get().put(col.cfh(), w, keyBB, valueBB); + switch (tx) { + case WB wb -> wb.wb().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); + case Tx t -> t.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(value)); + case null -> { + try (var w = new WriteOptions()) { + var keyBB = calculatedKey.asByteBuffer(); + ByteBuffer valueBB = (col.schema().hasValue() ? value : Utils.dummyEmptyValue()).asByteBuffer(); + db.get().put(col.cfh(), w, keyBB, valueBB); + } } } } @@ -562,8 +588,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { if (updateId != 0L) { if (!closeTransaction(updateId, true)) { - tx.val().rollbackToSavePoint(); - tx.val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey)); + ((Tx) tx).val().rollbackToSavePoint(); + ((Tx) tx).val().undoGetForUpdate(col.cfh(), Utils.toByteArray(calculatedKey)); throw new RocksDBRetryException(); } } diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java index 20027b2..b1b54b3 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/Tx.java @@ -5,7 +5,7 @@ import org.rocksdb.AbstractNativeReference; import org.rocksdb.Transaction; public record Tx(Transaction val, boolean isFromGetForUpdate, RocksDBObjects objs) - implements Closeable { + implements Closeable, TxOrWb { @Override public void close() { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TxOrWb.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TxOrWb.java new file mode 100644 index 0000000..3bc5c41 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TxOrWb.java @@ -0,0 +1,4 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +public sealed interface TxOrWb permits Tx, WB { +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java new file mode 100644 index 0000000..3a6cbfc --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/WB.java @@ -0,0 +1,25 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import it.cavallium.rockserver.core.common.RocksDBException; +import org.jetbrains.annotations.NotNull; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import java.io.Closeable; +import java.io.IOException; + +public record WB(@NotNull WriteBatch wb) implements Closeable, TxOrWb { + @Override + public void close() { + wb.close(); + } + + public void write(RocksDB rocksDB) throws RocksDBException { + try (var w = new WriteOptions()) { + rocksDB.write(w, wb); + } catch (org.rocksdb.RocksDBException e) { + throw RocksDBException.of(RocksDBException.RocksDBErrorType.WRITE_BATCH_1, e); + } + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java index f4677d3..44229f0 100644 --- a/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java +++ b/src/main/java/it/cavallium/rockserver/core/server/GrpcServer.java @@ -15,6 +15,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import it.cavallium.rockserver.core.client.RocksDBConnection; import it.cavallium.rockserver.core.common.*; +import it.cavallium.rockserver.core.common.ColumnHashType; +import it.cavallium.rockserver.core.common.ColumnSchema; import it.cavallium.rockserver.core.common.RequestType.RequestChanged; import it.cavallium.rockserver.core.common.RequestType.RequestCurrent; import it.cavallium.rockserver.core.common.RequestType.RequestDelta; @@ -24,33 +26,9 @@ import it.cavallium.rockserver.core.common.RequestType.RequestMulti; import it.cavallium.rockserver.core.common.RequestType.RequestNothing; import it.cavallium.rockserver.core.common.RequestType.RequestPrevious; import it.cavallium.rockserver.core.common.RequestType.RequestPreviousPresence; -import it.cavallium.rockserver.core.common.api.proto.Changed; -import it.cavallium.rockserver.core.common.api.proto.CloseFailedUpdateRequest; -import it.cavallium.rockserver.core.common.api.proto.CloseIteratorRequest; -import it.cavallium.rockserver.core.common.api.proto.CloseTransactionRequest; -import it.cavallium.rockserver.core.common.api.proto.CloseTransactionResponse; -import it.cavallium.rockserver.core.common.api.proto.CreateColumnRequest; -import it.cavallium.rockserver.core.common.api.proto.CreateColumnResponse; -import it.cavallium.rockserver.core.common.api.proto.DeleteColumnRequest; +import it.cavallium.rockserver.core.common.api.proto.*; import it.cavallium.rockserver.core.common.api.proto.Delta; -import it.cavallium.rockserver.core.common.api.proto.GetColumnIdRequest; -import it.cavallium.rockserver.core.common.api.proto.GetColumnIdResponse; -import it.cavallium.rockserver.core.common.api.proto.GetRequest; -import it.cavallium.rockserver.core.common.api.proto.GetResponse; -import it.cavallium.rockserver.core.common.api.proto.KV; -import it.cavallium.rockserver.core.common.api.proto.OpenIteratorRequest; -import it.cavallium.rockserver.core.common.api.proto.OpenIteratorResponse; -import it.cavallium.rockserver.core.common.api.proto.OpenTransactionRequest; -import it.cavallium.rockserver.core.common.api.proto.OpenTransactionResponse; -import it.cavallium.rockserver.core.common.api.proto.Previous; -import it.cavallium.rockserver.core.common.api.proto.PreviousPresence; -import it.cavallium.rockserver.core.common.api.proto.PutMultiInitialRequest; -import it.cavallium.rockserver.core.common.api.proto.PutMultiRequest; -import it.cavallium.rockserver.core.common.api.proto.PutRequest; import it.cavallium.rockserver.core.common.api.proto.RocksDBServiceGrpc.RocksDBServiceImplBase; -import it.cavallium.rockserver.core.common.api.proto.SeekToRequest; -import it.cavallium.rockserver.core.common.api.proto.SubsequentRequest; -import it.cavallium.rockserver.core.common.api.proto.UpdateBegin; import it.unimi.dsi.fastutil.ints.Int2IntFunction; import it.unimi.dsi.fastutil.ints.Int2ObjectFunction; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -61,6 +39,8 @@ import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; @@ -147,6 +127,7 @@ public class GrpcServer extends Server { executor.execute(() -> { try { api.closeFailedUpdate(request.getUpdateId()); + responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { responseObserver.onError(ex); @@ -168,6 +149,7 @@ public class GrpcServer extends Server { public void deleteColumn(DeleteColumnRequest request, StreamObserver responseObserver) { executor.execute(() -> { api.deleteColumn(request.getColumnId()); + responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); }); } @@ -199,6 +181,7 @@ public class GrpcServer extends Server { new RequestNothing<>() ); } + responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { responseObserver.onError(ex); @@ -206,6 +189,27 @@ public class GrpcServer extends Server { }); } + @Override + public void putBatch(PutBatchRequest request, StreamObserver responseObserver) { + executor.execute(() -> { + try { + try (var arena = Arena.ofConfined()) { + api.putMulti(arena, + request.getTransactionOrUpdateId(), + request.getColumnId(), + mapKeysKV(arena, request.getDataCount(), request::getData), + mapValuesKV(arena, request.getDataCount(), request::getData), + new RequestNothing<>() + ); + } + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } catch (Throwable ex) { + responseObserver.onError(ex); + } + }); + } + @Override public StreamObserver putMulti(StreamObserver responseObserver) { return new StreamObserver<>() { @@ -248,6 +252,7 @@ public class GrpcServer extends Server { var newProcessedRequestCount = processedRequestsCount.incrementAndGet(); if (requestsCountFinalized) { if (newProcessedRequestCount == requestsCount) { + responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } } @@ -471,6 +476,7 @@ public class GrpcServer extends Server { executor.execute(() -> { try { api.closeIterator(request.getIteratorId()); + responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { responseObserver.onError(ex); @@ -485,6 +491,7 @@ public class GrpcServer extends Server { try (var arena = Arena.ofConfined()) { api.seekTo(arena, request.getIterationId(), mapKeys(arena, request.getKeysCount(), request::getKeys)); } + responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { responseObserver.onError(ex); @@ -502,6 +509,7 @@ public class GrpcServer extends Server { request.getTakeCount(), new RequestNothing<>()); } + responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted(); } catch (Throwable ex) { responseObserver.onError(ex); @@ -608,6 +616,23 @@ public class GrpcServer extends Server { } return new Keys(segments); } + + private static List mapKeysKV(Arena arena, int count, Int2ObjectFunction keyGetterAt) { + var keys = new ArrayList(count); + for (int i = 0; i < count; i++) { + var k = keyGetterAt.apply(i); + keys.add(mapKeys(arena, k.getKeysCount(), k::getKeys)); + } + return keys; + } + + private static List mapValuesKV(Arena arena, int count, Int2ObjectFunction keyGetterAt) { + var keys = new ArrayList(count); + for (int i = 0; i < count; i++) { + keys.add(toMemorySegment(arena, keyGetterAt.get(i).getValue())); + } + return keys; + } } @Override diff --git a/src/main/proto/rocksdb.proto b/src/main/proto/rocksdb.proto index 8ec8830..41a0ab8 100644 --- a/src/main/proto/rocksdb.proto +++ b/src/main/proto/rocksdb.proto @@ -65,6 +65,8 @@ message GetColumnIdResponse {int64 columnId = 1;} message PutRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; KV data = 3;} +message PutBatchRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2; repeated KV data = 3;} + message PutMultiInitialRequest {int64 transactionOrUpdateId = 1; int64 columnId = 2;} message PutMultiRequest {oneof putMultiRequestType {PutMultiInitialRequest initialRequest = 1;KV data = 2;}} @@ -88,6 +90,7 @@ service RocksDBService { rpc deleteColumn(DeleteColumnRequest) returns (google.protobuf.Empty); rpc getColumnId(GetColumnIdRequest) returns (GetColumnIdResponse); rpc put(PutRequest) returns (google.protobuf.Empty); + rpc putBatch(PutBatchRequest) returns (google.protobuf.Empty); rpc putMulti(stream PutMultiRequest) returns (google.protobuf.Empty); rpc putGetPrevious(PutRequest) returns (Previous); rpc putMultiGetPrevious(stream PutMultiRequest) returns (stream Previous);