From b4a26efe5b5fbb54020414efddcee70b8df53c10 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 2 Jul 2022 12:28:59 +0200 Subject: [PATCH] Handle discards --- .../dbengine/database/server/QuicRPCServer.java | 12 ++++++++---- .../database/server/LLQuicConnectionTest.java | 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java b/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java index 3fd7863..8aaefb7 100644 --- a/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java +++ b/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java @@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLSingleton; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.remote.QuicUtils; @@ -174,9 +175,9 @@ public class QuicRPCServer { .maxStreamsUnidirectional(100) ); QuicRPCServer server = new QuicRPCServer(rocksDBManager, localDb, qs); - server.bind().block(); - server.onDispose().block(); - localDb.disconnect().block(); + server.bind().transform(LLUtils::handleDiscard).block(); + server.onDispose().transform(LLUtils::handleDiscard).block(); + localDb.disconnect().transform(LLUtils::handleDiscard).block(); rocksDBManager.closeAll(); } @@ -193,7 +194,10 @@ public class QuicRPCServer { prev != null ? toByteList(prev) : ByteList.of() )) .orThrow(); - SingletonUpdateEnd newValue = (SingletonUpdateEnd) otherRequests.singleOrEmpty().block(); + SingletonUpdateEnd newValue = (SingletonUpdateEnd) otherRequests + .singleOrEmpty() + .transform(LLUtils::handleDiscard) + .block(); Objects.requireNonNull(newValue); if (!newValue.exist()) { return null; diff --git a/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java b/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java index a4d4f25..de2e3ae 100644 --- a/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java +++ b/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java @@ -14,6 +14,7 @@ import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLDatabaseConnection; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.LuceneOptions; @@ -36,7 +37,7 @@ public class LLQuicConnectionTest { this.allocator = DefaultBufferAllocators.preferredAllocator(); this.meterRegistry = new CompositeMeterRegistry(); this.server = TestDbServer.create(allocator, meterRegistry); - server.bind().block(); + server.bind().transform(LLUtils::handleDiscard).block(); this.client = TestDbClient.create(allocator, meterRegistry, server.address()).connect().block(); }