From 5774bd1a740636bd759887aff25551e493da4483 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 30 Jun 2022 15:06:28 +0200 Subject: [PATCH] update reactor --- pom.xml | 22 ++++++++++++++++--- .../database/server/QuicRPCServer.java | 8 +++++-- .../database/server/LLQuicConnectionTest.java | 2 +- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index f850c18..ca190a8 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ io.projectreactor reactor-bom - 2020.0.19 + 2020.0.20 pom import @@ -97,6 +97,10 @@ io.projectreactor.netty reactor-netty-core + + org.mockito + mockito-core + @@ -142,12 +146,24 @@ junit-jupiter-params 5.8.2 test + + + org.mockito + mockito-core + + org.assertj assertj-core - 3.22.0 + 3.23.1 test + + + org.mockito + mockito-core + + @@ -210,7 +226,7 @@ - --add-modules jdk.incubator.foreign -Dforeign.restricted=permit + -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED diff --git a/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java b/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java index 638351d..3fd7863 100644 --- a/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java +++ b/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java @@ -53,6 +53,7 @@ import reactor.core.publisher.Signal; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.DisposableChannel; import reactor.netty.incubator.quic.QuicServer; @@ -67,7 +68,9 @@ public class QuicRPCServer { private final ReferencedResources dbs = new ReferencedResources<>(this::obtainDatabase, LLKeyValueDatabase::close); private final ReferencedResources singletons = new ReferencedResources<>(this::obtainSingleton, s -> Mono.empty()); - private final ReferencedResources indices = new ReferencedResources<>(this::obtainLuceneIndex, LLLuceneIndex::close); + private final ReferencedResources indices = new ReferencedResources<>(this::obtainLuceneIndex, + s -> Mono.fromRunnable(s::close).subscribeOn(Schedulers.boundedElastic()) + ); public QuicRPCServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) { this.rocksDBManager = rocksDBManager; @@ -275,7 +278,8 @@ public class QuicRPCServer { private Mono handleCloseLuceneIndex(CloseLuceneIndex closeLuceneIndex) { return this.indices .getResource(closeLuceneIndex.luceneIndexId()) - .flatMap(LLLuceneIndex::close) + .publishOn(Schedulers.boundedElastic()) + .doOnNext(LLLuceneIndex::close) .thenReturn(Empty.of()); } diff --git a/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java b/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java index 4e961dc..a4d4f25 100644 --- a/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java +++ b/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java @@ -97,6 +97,6 @@ public class LLQuicConnectionTest { ), null).blockOptional().orElseThrow(); assertEquals(shardName, index.getLuceneIndexName()); - assertDoesNotThrow(() -> index.close().block()); + assertDoesNotThrow(index::close); } } \ No newline at end of file