From 24b4387b089ab29a9a19726a4b0e980c50d1489d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 11 Oct 2022 20:08:40 +0200 Subject: [PATCH] Accept multiple tdlib clients --- .../rsocket/ConsumerConnection.java | 131 +++++++++++------- .../reactiveapi/rsocket/MyRSocketServer.java | 27 +++- .../rsocket/ProducerConnection.java | 79 ++++++----- .../tdlight/reactiveapi/test/TestChannel.java | 26 ++-- .../tdlight/reactiveapi/test/TestRSocket.java | 6 +- 5 files changed, 163 insertions(+), 106 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java index ad1a21e..5eec093 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java @@ -13,14 +13,18 @@ import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Empty; +import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; public class ConsumerConnection { private static final Logger LOG = LogManager.getLogger(ConsumerConnection.class); private final String channel; - private Flux remote; + private final int bufferSize; + private Many> remotes = Sinks.many().replay().all(); + private int remoteCount = 0; private Deserializer local; @@ -29,8 +33,9 @@ public class ConsumerConnection { private Optional localTerminationState = null; private Empty localTerminationSink = Sinks.empty(); - public ConsumerConnection(String channel) { + public ConsumerConnection(String channel, int bufferSize) { this.channel = channel; + this.bufferSize = bufferSize; if (LOG.isDebugEnabled()) LOG.debug("{} Create new blank connection", this.printStatus()); } @@ -38,7 +43,7 @@ public class ConsumerConnection { return "[\"%s\" (%d)%s%s%s]".formatted(channel, System.identityHashCode(this), local != null ? ", local" : "", - remote != null ? ", remote" : "", + remoteCount > 0 ? (remoteCount > 1 ? ", " + remoteCount + " remotes" : ", 1 remote") : "", connectedState ? ((localTerminationState != null) ? (localTerminationState.isPresent() ? ", done with error" : ", done") : ", connected") : ", waiting" ); } @@ -52,38 +57,68 @@ public class ConsumerConnection { }).publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> { synchronized (ConsumerConnection.this) { if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus()); - return RSocketUtils.deserialize(remote, local) + return Flux.merge(remotes.asFlux().map(remote -> { + return remote.doOnError(ex -> { + synchronized (ConsumerConnection.this) { + if (remoteCount <= 1) { + if (remoteCount > 0 && localTerminationState == null) { + localTerminationState = Optional.of(ex); + if (LOG.isDebugEnabled()) { + LOG.debug("%s Local connection ended with failure".formatted(this.printStatus()), ex); + } + if (remoteCount <= 1) { + var sink = localTerminationSink; + reset(); + sink.emitError(ex, EmitFailureHandler.FAIL_FAST); + if (LOG.isDebugEnabled()) { + LOG.debug("%s Local connection ended with failure, emitted termination failure".formatted(this.printStatus())); + } + } + } + } else { + remoteCount--; + if (LOG.isDebugEnabled()) { + LOG.debug("%s Local connection ended with failure, but at least one remote is still online".formatted( + this.printStatus())); + } + } + } + }).doFinally(s -> { + if (s != SignalType.ON_ERROR) { + synchronized (ConsumerConnection.this) { + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ending with status {}", this.printStatus(), s); + if (remoteCount <= 1) { + if (remoteCount > 0 && localTerminationState == null) { + assert connectedState; + localTerminationState = Optional.empty(); + if (s == SignalType.CANCEL) { + localTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.FAIL_FAST); + } else { + localTerminationSink.emitEmpty(EmitFailureHandler.FAIL_FAST); + } + } + reset(); + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this.printStatus(), s); + } else { + remoteCount--; + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, but at least one remote is still online", this.printStatus(), s); + } + } + } + }).onErrorResume(ex -> { + synchronized (ConsumerConnection.this) { + if (remoteCount <= 1) { + return Flux.error(ex); + } else { + return Flux.empty(); + } + } + }); + }), Integer.MAX_VALUE, bufferSize) + .transform(remote -> RSocketUtils.deserialize(remote, local)) .map(element -> new Timestamped<>(System.currentTimeMillis(), element)); } - })).doOnError(ex -> { - synchronized (ConsumerConnection.this) { - if (remote != null && localTerminationState == null) { - localTerminationState = Optional.of(ex); - if (LOG.isDebugEnabled()) LOG.debug("%s Local connection ended with failure, emitting termination failure".formatted(this.printStatus()), ex); - var sink = localTerminationSink; - reset(true); - sink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - if (LOG.isDebugEnabled()) LOG.debug("%s Local connection ended with failure, emitted termination failure".formatted(this.printStatus())); - } - } - }).doFinally(s -> { - if (s != SignalType.ON_ERROR) { - synchronized (ConsumerConnection.this) { - if (remote != null && localTerminationState == null) { - assert connectedState; - localTerminationState = Optional.empty(); - LOG.debug("{} Remote connection ended with status {}, emitting termination complete", this::printStatus, () -> s); - if (s == SignalType.CANCEL) { - localTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - } else { - localTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - } - LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this::printStatus, () -> s); - } - reset(false); - } - } - }); + })); } public synchronized Mono connectRemote() { @@ -97,16 +132,10 @@ public class ConsumerConnection { if (LOG.isDebugEnabled()) LOG.debug("{} Remote is connected", this.printStatus()); return localTerminationSink.asMono().publishOn(Schedulers.parallel()); } - })).doFinally(s -> { - if (s != SignalType.ON_ERROR) { - synchronized (ConsumerConnection.this) { - //reset(true); - } - } - }); + })); } - public synchronized void reset(boolean resettingFromRemote) { + public synchronized void reset() { if (LOG.isDebugEnabled()) LOG.debug("{} Reset started", this.printStatus()); if (connectedState) { if (localTerminationState == null) { @@ -123,7 +152,9 @@ public class ConsumerConnection { if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); } local = null; - remote = null; + remoteCount = 0; + remotes.emitComplete(EmitFailureHandler.FAIL_FAST); + remotes = Sinks.many().replay().all(); connectedState = false; connectedSink = Sinks.empty(); localTerminationState = null; @@ -133,32 +164,30 @@ public class ConsumerConnection { public synchronized void registerRemote(Flux remote) { if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus()); - if (this.remote != null) { - if (LOG.isDebugEnabled()) LOG.debug("{} Remote was already registered", this.printStatus()); - throw new IllegalStateException("Remote is already registered"); - } - this.remote = remote; + this.remoteCount++; + this.remotes.emitNext(remote, EmitFailureHandler.FAIL_FAST); if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus()); onChanged(); } - public synchronized void registerLocal(Deserializer local) { + public synchronized Throwable registerLocal(Deserializer local) { if (LOG.isDebugEnabled()) LOG.debug("{} Local is trying to register", this.printStatus()); if (this.local != null) { if (LOG.isDebugEnabled()) LOG.debug("{} Local was already registered", this.printStatus()); - throw new IllegalStateException("Local is already registered"); + return new IllegalStateException("Local is already registered"); } this.local = local; if (LOG.isDebugEnabled()) LOG.debug("{} Local registered", this.printStatus()); onChanged(); + return null; } private synchronized void onChanged() { if (LOG.isDebugEnabled()) LOG.debug("{} Checking connection changes", this.printStatus()); - if (local != null && remote != null) { + if (local != null && remoteCount > 0) { connectedState = true; if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitting connected event", this.printStatus()); - connectedSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + connectedSink.emitEmpty(EmitFailureHandler.FAIL_FAST); if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitted connected event", this.printStatus()); } else { if (LOG.isDebugEnabled()) LOG.debug("{} Still not connected", this.printStatus()); diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java index 1a9e727..439cff7 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java @@ -1,5 +1,7 @@ package it.tdlight.reactiveapi.rsocket; +import static reactor.util.concurrent.Queues.XS_BUFFER_SIZE; + import com.google.common.net.HostAndPort; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; @@ -32,6 +34,7 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { private final Logger logger = LogManager.getLogger(this.getClass()); + private final int bufferSize; private final Mono serverCloseable; protected final Map> consumerRegistry = new ConcurrentHashMap<>(); @@ -39,6 +42,11 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { protected final Map> producerRegistry = new ConcurrentHashMap<>(); public MyRSocketServer(HostAndPort baseHost) { + this(baseHost, XS_BUFFER_SIZE); + } + + public MyRSocketServer(HostAndPort baseHost, int bufferSize) { + this.bufferSize = bufferSize; var serverMono = RSocketServer .create(new SocketAcceptor() { @Override @@ -71,7 +79,8 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { return Mono.error(new CancelledChannelException("Metadata is wrong")); } var channel = firstValue.getDataUtf8(); - var conn = MyRSocketServer.this.consumerRegistry.computeIfAbsent(channel, ConsumerConnection::new); + var conn = MyRSocketServer.this.consumerRegistry.computeIfAbsent(channel, + ch -> new ConsumerConnection<>(ch, bufferSize)); conn.registerRemote(flux.skip(1)); return conn.connectRemote().then(Mono.fromSupplier(() -> DefaultPayload.create("ok", "result"))); } else { @@ -84,7 +93,8 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { public @NotNull Flux requestStream(@NotNull Payload payload) { var channel = payload.getDataUtf8(); return Flux.defer(() -> { - var conn = MyRSocketServer.this.producerRegistry.computeIfAbsent(channel, ProducerConnection::new); + var conn = MyRSocketServer.this.producerRegistry.computeIfAbsent(channel, + ch -> new ProducerConnection<>(ch, bufferSize)); conn.registerRemote(); return conn.connectRemote(); }); @@ -105,8 +115,12 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { public Flux> consumeMessages() { return serverCloseable.flatMapMany(x -> { //noinspection unchecked - var conn = (ConsumerConnection) consumerRegistry.computeIfAbsent(channelName, ConsumerConnection::new); - conn.registerLocal(deserializer); + var conn = (ConsumerConnection) consumerRegistry.computeIfAbsent(channelName, + ch -> new ConsumerConnection<>(ch, bufferSize)); + Throwable ex = conn.registerLocal(deserializer); + if (ex != null) { + return Flux.error(ex); + } return conn.connectLocal(); }); } @@ -123,12 +137,13 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket { logger.error("Failed to create codec for channel \"{}\"", channelName, ex); throw new IllegalStateException("Failed to create codec for channel " + channelName); } - return new EventProducer() { + return new EventProducer<>() { @Override public Mono sendMessages(Flux eventsFlux) { return serverCloseable.flatMap(x -> { //noinspection unchecked - var conn = (ProducerConnection) producerRegistry.computeIfAbsent(channelName, ProducerConnection::new); + var conn = (ProducerConnection) producerRegistry.computeIfAbsent(channelName, + ch -> new ProducerConnection<>(ch, bufferSize)); conn.registerLocal(eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer))); return conn.connectLocal(); }); diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java index fee637c..70391f7 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java @@ -7,12 +7,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Signal; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; -import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.Empty; +import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; public class ProducerConnection { @@ -20,8 +19,8 @@ public class ProducerConnection { private static final Logger LOG = LogManager.getLogger(ProducerConnection.class); private final String channel; - - private Object remote; + private final int bufferSize; + private int remoteCount = 0; private Flux local; @@ -30,8 +29,9 @@ public class ProducerConnection { private Optional remoteTerminationState = null; private Empty remoteTerminationSink = Sinks.empty(); - public ProducerConnection(String channel) { + public ProducerConnection(String channel, int bufferSize) { this.channel = channel; + this.bufferSize = bufferSize; if (LOG.isDebugEnabled()) LOG.debug("{} Create new blank connection", this.printStatus()); } @@ -39,7 +39,7 @@ public class ProducerConnection { return "[\"%s\" (%d)%s%s%s]".formatted(channel, System.identityHashCode(this), local != null ? ", local" : "", - remote != null ? ", remote" : "", + remoteCount > 0 ? (remoteCount > 1 ? ", " + remoteCount + " remotes" : ", 1 remote") : "", connectedState ? ((remoteTerminationState != null) ? (remoteTerminationState.isPresent() ? ", done with error" : ", done") : ", connected") : ", waiting" ); } @@ -55,13 +55,7 @@ public class ProducerConnection { if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus()); return remoteTerminationSink.asMono().publishOn(Schedulers.parallel()); } - })).doFinally(s -> { - if (s != SignalType.ON_ERROR) { - synchronized (ProducerConnection.this) { - //reset(false); - } - } - }); + })); } public synchronized Flux connectRemote() { @@ -77,37 +71,50 @@ public class ProducerConnection { } })).doOnError(ex -> { synchronized (ProducerConnection.this) { - if (local != null && remoteTerminationState == null) { - remoteTerminationState = Optional.of(ex); - if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitting termination failure".formatted(this.printStatus()), ex); - var sink = remoteTerminationSink; - reset(true); - sink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitted termination failure".formatted(this.printStatus())); + if (remoteCount <= 1) { + if (local != null && remoteTerminationState == null) { + remoteTerminationState = Optional.of(ex); + if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitting termination failure".formatted(this.printStatus()), ex); + var sink = remoteTerminationSink; + reset(); + sink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitted termination failure".formatted(this.printStatus())); + } + } else { + remoteCount--; + if (LOG.isDebugEnabled()) { + LOG.debug("%s Remote connection ended with failure, but at least one remote is still online".formatted( + this.printStatus())); + } } } }).doFinally(s -> { if (s != SignalType.ON_ERROR) { synchronized (ProducerConnection.this) { - if (local != null && remoteTerminationState == null) { - assert connectedState; - remoteTerminationState = Optional.empty(); - if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitting termination complete", this.printStatus(), s); - if (s == SignalType.CANCEL) { - remoteTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.busyLooping(Duration.ofMillis(100))); - } else { - remoteTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ending with status {}", this.printStatus(), s); + if (remoteCount <= 1) { + if (local != null && remoteTerminationState == null) { + assert connectedState; + remoteTerminationState = Optional.empty(); + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitting termination complete", this.printStatus(), s); + if (s == SignalType.CANCEL) { + remoteTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } else { + remoteTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } } + reset(); if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this.printStatus(), s); + } else { + remoteCount--; + if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, but at least one remote is still online", this.printStatus(), s); } - reset(true); - } } }); } - public synchronized void reset(boolean resettingFromRemote) { + public synchronized void reset() { if (LOG.isDebugEnabled()) LOG.debug("{} Reset started", this.printStatus()); if (connectedState) { if (remoteTerminationState == null) { @@ -124,7 +131,7 @@ public class ProducerConnection { if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus()); } local = null; - remote = null; + remoteCount = 0; connectedState = false; connectedSink = Sinks.empty(); remoteTerminationState = null; @@ -134,11 +141,11 @@ public class ProducerConnection { public synchronized void registerRemote() { if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus()); - if (this.remote != null) { + if (this.remoteCount > 0) { if (LOG.isDebugEnabled()) LOG.debug("{} Remote was already registered", this.printStatus()); throw new IllegalStateException("Remote is already registered"); } - this.remote = new Object(); + this.remoteCount++; if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus()); onChanged(); } @@ -149,14 +156,14 @@ public class ProducerConnection { if (LOG.isDebugEnabled()) LOG.debug("{} Local was already registered", this.printStatus()); throw new IllegalStateException("Local is already registered"); } - this.local = local; + this.local = local.publish(bufferSize).refCount(1); if (LOG.isDebugEnabled()) LOG.debug("{} Local registered", this.printStatus()); onChanged(); } private synchronized void onChanged() { if (LOG.isDebugEnabled()) LOG.debug("{} Checking connection changes", this.printStatus()); - if (local != null && remote != null) { + if (local != null && remoteCount > 0) { connectedState = true; if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitting connected event", this.printStatus()); connectedSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index 20066b8..ac52744 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -15,6 +15,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.logging.Level; import java.util.stream.Collectors; @@ -262,12 +263,13 @@ public abstract class TestChannel { @Test public void testConsumeMidCancel() { var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + AtomicReference exRef = new AtomicReference<>(); var eventProducer = producer .sendMessages(dataFlux.map(Integer::toUnsignedString)) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .subscribeOn(Schedulers.parallel()) - .subscribe(n -> {}, ex -> Assertions.fail(ex)); + .subscribe(n -> {}, exRef::set); try { var receiver1 = consumer .consumeMessages() @@ -284,10 +286,14 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)) .block(Duration.ofSeconds(5)); + var ex = exRef.get(); + if (ex != null) { + Assertions.fail(ex); + } Assertions.assertNotNull(receiver1); Assertions.assertNotNull(receiver2); - receiver1.addAll(receiver2); - Assertions.assertEquals(data, receiver1); + Assertions.assertEquals(data.subList(0, 10), receiver1); + Assertions.assertEquals(data.subList(50, 100), receiver2.subList(receiver2.size() - 50, receiver2.size())); System.out.println(receiver1); } finally { eventProducer.dispose(); @@ -326,8 +332,9 @@ public abstract class TestChannel { .collect(Collectors.toCollection(IntArrayList::new)) .block(); Assertions.assertNotNull(receiver2); - data.removeElements(0, 11); - Assertions.assertEquals(data, receiver2); + Assertions.assertNotEquals(0, receiver2.getInt(0)); + Assertions.assertNotEquals(1, receiver2.getInt(1)); + Assertions.assertNotEquals(2, receiver2.getInt(2)); System.out.println(receiver2); } finally { eventProducer.dispose(); @@ -391,11 +398,10 @@ public abstract class TestChannel { producer .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) .block(Duration.ofSeconds(5)); - data.removeInt(10); - if (numbers.size() < data.size()) { - data.removeInt(data.size() - 1); - } - Assertions.assertEquals(data, List.copyOf(numbers)); + Assertions.assertTrue(numbers.contains(0)); + Assertions.assertTrue(numbers.contains(1)); + Assertions.assertTrue(numbers.contains(50)); + Assertions.assertTrue(numbers.contains(51)); } finally { eventConsumer.dispose(); } diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java b/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java index 2d4731b..52a860f 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java @@ -44,7 +44,7 @@ public class TestRSocket { @Test public void testServerConsumer() { - var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085)); + var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1); try { var rawClient = RSocketConnector.create() .setupPayload(DefaultPayload.create("client", "setup-info")) @@ -73,7 +73,7 @@ public class TestRSocket { @Test public void testServerProducer() { - var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085)); + var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1); try { var rawClient = RSocketConnector.create() .setupPayload(DefaultPayload.create("client", "setup-info")) @@ -173,7 +173,7 @@ public class TestRSocket { @Test public void testServerOnClose() { Assertions.assertThrows(IllegalStateException.class, () -> { - var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085)); + var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085), 1); try { server.onClose().block(Duration.ofSeconds(1)); } finally {