From 58770ca649b7623ec3ace045de5be146a0ec53f3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 12 Oct 2022 14:31:41 +0200 Subject: [PATCH] Improve support for local cancellation --- .../rsocket/ConsumerConnection.java | 18 +++++++++-- .../rsocket/ProducerConnection.java | 9 +++++- .../tdlight/reactiveapi/test/TestChannel.java | 31 ++++++++++++++----- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java index 5eec093..dbbfbdb 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java @@ -115,10 +115,24 @@ public class ConsumerConnection { } }); }), Integer.MAX_VALUE, bufferSize) - .transform(remote -> RSocketUtils.deserialize(remote, local)) + .transform(remote -> { + synchronized (ConsumerConnection.this) { + return RSocketUtils.deserialize(remote, local); + } + }) .map(element -> new Timestamped<>(System.currentTimeMillis(), element)); } - })); + })).doFinally(s -> { + if (s == SignalType.CANCEL) { + synchronized (ConsumerConnection.this) { + local = null; + var ex = new InterruptedException(); + localTerminationState = Optional.of(ex); + if (LOG.isDebugEnabled()) LOG.debug("{} Local is cancelled", this.printStatus()); + localTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + } + }); } public synchronized Mono connectRemote() { diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java index 70391f7..36dfa8b 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java @@ -55,7 +55,14 @@ public class ProducerConnection { if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus()); return remoteTerminationSink.asMono().publishOn(Schedulers.parallel()); } - })); + })).doFinally(s -> { + if (s == SignalType.CANCEL) { + synchronized (ProducerConnection.this) { + local = null; + if (LOG.isDebugEnabled()) LOG.debug("{} Local is cancelled", this.printStatus()); + } + } + }); } public synchronized Flux connectRemote() { diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index ac52744..46e21cd 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -303,12 +303,13 @@ public abstract class TestChannel { @Test public void testConsumeMidFail() { var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var exRef = new AtomicReference(); var eventProducer = producer .sendMessages(dataFlux.map(Integer::toUnsignedString)) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .subscribeOn(Schedulers.parallel()) - .subscribe(n -> {}, ex -> Assertions.fail(ex)); + .subscribe(n -> {}, ex -> exRef.set(ex)); try { Assertions.assertThrows(FakeException.class, () -> { consumer @@ -331,6 +332,7 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)) .block(); + Assertions.assertNull(exRef.get()); Assertions.assertNotNull(receiver2); Assertions.assertNotEquals(0, receiver2.getInt(0)); Assertions.assertNotEquals(1, receiver2.getInt(1)); @@ -345,6 +347,7 @@ public abstract class TestChannel { public void testProduceMidCancel() { var dataFlux = Flux.fromIterable(data).publish(1).autoConnect(); var numbers = new ConcurrentLinkedDeque(); + var exRef = new AtomicReference(); var eventConsumer = consumer .consumeMessages() .limitRate(1) @@ -354,7 +357,7 @@ public abstract class TestChannel { .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .subscribeOn(Schedulers.parallel()) - .subscribe(n -> {}, ex -> Assertions.fail(ex)); + .subscribe(n -> {}, ex -> exRef.set(ex)); try { producer .sendMessages(dataFlux.limitRate(1).take(10, true).map(Integer::toUnsignedString)) @@ -365,6 +368,7 @@ public abstract class TestChannel { if (numbers.size() < data.size()) { data.removeInt(data.size() - 1); } + Assertions.assertNull(exRef.get()); Assertions.assertEquals(data, List.copyOf(numbers)); } finally { eventConsumer.dispose(); @@ -375,6 +379,7 @@ public abstract class TestChannel { public void testProduceMidFail() { var dataFlux = Flux.fromIterable(data).publish(1).autoConnect(); var numbers = new ConcurrentLinkedDeque(); + var exRef = new AtomicReference(); var eventConsumer = consumer .consumeMessages() .limitRate(1) @@ -384,12 +389,12 @@ public abstract class TestChannel { .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .subscribeOn(Schedulers.parallel()) - .subscribe(n -> {}, ex -> Assertions.fail(ex)); + .subscribe(n -> {}, ex -> exRef.set(ex)); try { Assertions.assertThrows(FakeException.class, () -> { producer .sendMessages(dataFlux.limitRate(1).doOnNext(i -> { - if (i == 10) { + if (i == 40) { throw new FakeException(); } }).map(Integer::toUnsignedString)) @@ -398,6 +403,7 @@ public abstract class TestChannel { producer .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) .block(Duration.ofSeconds(5)); + Assertions.assertNull(exRef.get()); Assertions.assertTrue(numbers.contains(0)); Assertions.assertTrue(numbers.contains(1)); Assertions.assertTrue(numbers.contains(50)); @@ -410,11 +416,12 @@ public abstract class TestChannel { @Test public void testResubscribe() throws InterruptedException { var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var exRef = new AtomicReference(); var eventProducer = producer .sendMessages(dataFlux.map(Integer::toUnsignedString)) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) - .subscribe(n -> {}, ex -> Assertions.fail(ex)); + .subscribe(n -> {}, ex -> exRef.set(ex)); try { consumer .consumeMessages() @@ -434,6 +441,8 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .log("consumer-2", Level.INFO) .blockLast(Duration.ofSeconds(5)); + + Assertions.assertNull(exRef.get()); } finally { eventProducer.dispose(); } @@ -442,11 +451,13 @@ public abstract class TestChannel { @Test public void testFailTwoSubscribers() { var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var exRef1 = new AtomicReference(); + var exRef2 = new AtomicReference(); var eventProducer = producer .sendMessages(dataFlux.map(Integer::toUnsignedString)) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) - .subscribe(n -> {}, ex -> Assertions.fail(ex)); + .subscribe(n -> {}, ex -> exRef1.set(ex)); Assertions.assertThrows(IllegalStateException.class, () -> { try { @@ -457,7 +468,7 @@ public abstract class TestChannel { .map(Timestamped::data) .map(Integer::parseUnsignedInt) .log("consumer-1", Level.INFO) - .doOnError(ex -> Assertions.fail(ex)), + .doOnError(ex -> exRef2.set(ex)), consumer .consumeMessages() .limitRate(1) @@ -469,6 +480,8 @@ public abstract class TestChannel { ) ) .block(); + Assertions.assertNull(exRef1.get()); + Assertions.assertNull(exRef2.get()); } catch (RuntimeException ex) { throw Exceptions.unwrap(ex); } finally { @@ -480,12 +493,13 @@ public abstract class TestChannel { @Test public void testRepublish() throws InterruptedException { var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var exRef = new AtomicReference(); var eventConsumer = consumer.consumeMessages() .map(Timestamped::data) .map(Integer::parseUnsignedInt) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) - .subscribe(n -> {}, ex -> Assertions.fail(ex)); + .subscribe(n -> {}, ex -> exRef.set(ex)); try { producer .sendMessages(dataFlux @@ -497,6 +511,7 @@ public abstract class TestChannel { producer .sendMessages(dataFlux.log("producer-2", Level.INFO).map(Integer::toUnsignedString)) .block(Duration.ofSeconds(5)); + Assertions.assertNull(exRef.get()); } finally { eventConsumer.dispose(); }