diff --git a/pom.xml b/pom.xml
index f850c18..ca190a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,7 @@
io.projectreactor
reactor-bom
- 2020.0.19
+ 2020.0.20
pom
import
@@ -97,6 +97,10 @@
io.projectreactor.netty
reactor-netty-core
+
+ org.mockito
+ mockito-core
+
@@ -142,12 +146,24 @@
junit-jupiter-params
5.8.2
test
+
+
+ org.mockito
+ mockito-core
+
+
org.assertj
assertj-core
- 3.22.0
+ 3.23.1
test
+
+
+ org.mockito
+ mockito-core
+
+
@@ -210,7 +226,7 @@
- --add-modules jdk.incubator.foreign -Dforeign.restricted=permit
+ -Dforeign.restricted=permit
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED
diff --git a/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java b/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java
index 638351d..3fd7863 100644
--- a/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java
+++ b/src/main/java/it/cavallium/dbengine/database/server/QuicRPCServer.java
@@ -53,6 +53,7 @@ import reactor.core.publisher.Signal;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
+import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.DisposableChannel;
import reactor.netty.incubator.quic.QuicServer;
@@ -67,7 +68,9 @@ public class QuicRPCServer {
private final ReferencedResources dbs = new ReferencedResources<>(this::obtainDatabase, LLKeyValueDatabase::close);
private final ReferencedResources singletons = new ReferencedResources<>(this::obtainSingleton, s -> Mono.empty());
- private final ReferencedResources indices = new ReferencedResources<>(this::obtainLuceneIndex, LLLuceneIndex::close);
+ private final ReferencedResources indices = new ReferencedResources<>(this::obtainLuceneIndex,
+ s -> Mono.fromRunnable(s::close).subscribeOn(Schedulers.boundedElastic())
+ );
public QuicRPCServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) {
this.rocksDBManager = rocksDBManager;
@@ -275,7 +278,8 @@ public class QuicRPCServer {
private Mono handleCloseLuceneIndex(CloseLuceneIndex closeLuceneIndex) {
return this.indices
.getResource(closeLuceneIndex.luceneIndexId())
- .flatMap(LLLuceneIndex::close)
+ .publishOn(Schedulers.boundedElastic())
+ .doOnNext(LLLuceneIndex::close)
.thenReturn(Empty.of());
}
diff --git a/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java b/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java
index 4e961dc..a4d4f25 100644
--- a/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java
+++ b/src/test/java/it/cavallium/dbengine/database/server/LLQuicConnectionTest.java
@@ -97,6 +97,6 @@ public class LLQuicConnectionTest {
),
null).blockOptional().orElseThrow();
assertEquals(shardName, index.getLuceneIndexName());
- assertDoesNotThrow(() -> index.close().block());
+ assertDoesNotThrow(index::close);
}
}
\ No newline at end of file