Handle discards

This commit is contained in:
Andrea Cavalli 2022-07-02 12:28:59 +02:00
parent 5774bd1a74
commit b4a26efe5b
2 changed files with 10 additions and 5 deletions

View File

@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction; import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.remote.QuicUtils; import it.cavallium.dbengine.database.remote.QuicUtils;
@ -174,9 +175,9 @@ public class QuicRPCServer {
.maxStreamsUnidirectional(100) .maxStreamsUnidirectional(100)
); );
QuicRPCServer server = new QuicRPCServer(rocksDBManager, localDb, qs); QuicRPCServer server = new QuicRPCServer(rocksDBManager, localDb, qs);
server.bind().block(); server.bind().transform(LLUtils::handleDiscard).block();
server.onDispose().block(); server.onDispose().transform(LLUtils::handleDiscard).block();
localDb.disconnect().block(); localDb.disconnect().transform(LLUtils::handleDiscard).block();
rocksDBManager.closeAll(); rocksDBManager.closeAll();
} }
@ -193,7 +194,10 @@ public class QuicRPCServer {
prev != null ? toByteList(prev) : ByteList.of() prev != null ? toByteList(prev) : ByteList.of()
)) ))
.orThrow(); .orThrow();
SingletonUpdateEnd newValue = (SingletonUpdateEnd) otherRequests.singleOrEmpty().block(); SingletonUpdateEnd newValue = (SingletonUpdateEnd) otherRequests
.singleOrEmpty()
.transform(LLUtils::handleDiscard)
.block();
Objects.requireNonNull(newValue); Objects.requireNonNull(newValue);
if (!newValue.exist()) { if (!newValue.exist()) {
return null; return null;

View File

@ -14,6 +14,7 @@ import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory; import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory;
import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
@ -36,7 +37,7 @@ public class LLQuicConnectionTest {
this.allocator = DefaultBufferAllocators.preferredAllocator(); this.allocator = DefaultBufferAllocators.preferredAllocator();
this.meterRegistry = new CompositeMeterRegistry(); this.meterRegistry = new CompositeMeterRegistry();
this.server = TestDbServer.create(allocator, meterRegistry); this.server = TestDbServer.create(allocator, meterRegistry);
server.bind().block(); server.bind().transform(LLUtils::handleDiscard).block();
this.client = TestDbClient.create(allocator, meterRegistry, server.address()).connect().block(); this.client = TestDbClient.create(allocator, meterRegistry, server.address()).connect().block();
} }