update reactor

This commit is contained in:
Andrea Cavalli 2022-06-30 15:06:28 +02:00
parent 245adc3847
commit 5774bd1a74
3 changed files with 26 additions and 6 deletions

22
pom.xml
View File

@ -72,7 +72,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.19</version>
<version>2020.0.20</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@ -97,6 +97,10 @@
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -142,12 +146,24 @@
<artifactId>junit-jupiter-params</artifactId>
<version>5.8.2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.22.0</version>
<version>3.23.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- This will get hamcrest-core automatically -->
<dependency>
@ -210,7 +226,7 @@
</dependency>
</dependencies>
<configuration>
<argLine>--add-modules jdk.incubator.foreign -Dforeign.restricted=permit
<argLine>-Dforeign.restricted=permit
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED
</argLine>

View File

@ -53,6 +53,7 @@ import reactor.core.publisher.Signal;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.DisposableChannel;
import reactor.netty.incubator.quic.QuicServer;
@ -67,7 +68,9 @@ public class QuicRPCServer {
private final ReferencedResources<String, GetDatabase, LLKeyValueDatabase> dbs = new ReferencedResources<>(this::obtainDatabase, LLKeyValueDatabase::close);
private final ReferencedResources<DatabasePartName, GetSingleton, LLSingleton> singletons = new ReferencedResources<>(this::obtainSingleton, s -> Mono.empty());
private final ReferencedResources<String, GetLuceneIndex, LLLuceneIndex> indices = new ReferencedResources<>(this::obtainLuceneIndex, LLLuceneIndex::close);
private final ReferencedResources<String, GetLuceneIndex, LLLuceneIndex> indices = new ReferencedResources<>(this::obtainLuceneIndex,
s -> Mono.<Void>fromRunnable(s::close).subscribeOn(Schedulers.boundedElastic())
);
public QuicRPCServer(LuceneRocksDBManager rocksDBManager, LLDatabaseConnection localDb, QuicServer quicServer) {
this.rocksDBManager = rocksDBManager;
@ -275,7 +278,8 @@ public class QuicRPCServer {
private Mono<RPCEvent> handleCloseLuceneIndex(CloseLuceneIndex closeLuceneIndex) {
return this.indices
.getResource(closeLuceneIndex.luceneIndexId())
.flatMap(LLLuceneIndex::close)
.publishOn(Schedulers.boundedElastic())
.doOnNext(LLLuceneIndex::close)
.thenReturn(Empty.of());
}

View File

@ -97,6 +97,6 @@ public class LLQuicConnectionTest {
),
null).blockOptional().orElseThrow();
assertEquals(shardName, index.getLuceneIndexName());
assertDoesNotThrow(() -> index.close().block());
assertDoesNotThrow(index::close);
}
}