Improve support for local cancellation

This commit is contained in:
Andrea Cavalli 2022-10-12 14:31:41 +02:00
parent 171f07ccec
commit 58770ca649
3 changed files with 47 additions and 11 deletions

View File

@ -115,10 +115,24 @@ public class ConsumerConnection<T> {
} }
}); });
}), Integer.MAX_VALUE, bufferSize) }), Integer.MAX_VALUE, bufferSize)
.transform(remote -> RSocketUtils.deserialize(remote, local)) .transform(remote -> {
synchronized (ConsumerConnection.this) {
return RSocketUtils.deserialize(remote, local);
}
})
.map(element -> new Timestamped<>(System.currentTimeMillis(), element)); .map(element -> new Timestamped<>(System.currentTimeMillis(), element));
} }
})); })).doFinally(s -> {
if (s == SignalType.CANCEL) {
synchronized (ConsumerConnection.this) {
local = null;
var ex = new InterruptedException();
localTerminationState = Optional.of(ex);
if (LOG.isDebugEnabled()) LOG.debug("{} Local is cancelled", this.printStatus());
localTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
}
});
} }
public synchronized Mono<Void> connectRemote() { public synchronized Mono<Void> connectRemote() {

View File

@ -55,7 +55,14 @@ public class ProducerConnection<T> {
if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus()); if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus());
return remoteTerminationSink.asMono().publishOn(Schedulers.parallel()); return remoteTerminationSink.asMono().publishOn(Schedulers.parallel());
} }
})); })).doFinally(s -> {
if (s == SignalType.CANCEL) {
synchronized (ProducerConnection.this) {
local = null;
if (LOG.isDebugEnabled()) LOG.debug("{} Local is cancelled", this.printStatus());
}
}
});
} }
public synchronized Flux<Payload> connectRemote() { public synchronized Flux<Payload> connectRemote() {

View File

@ -303,12 +303,13 @@ public abstract class TestChannel {
@Test @Test
public void testConsumeMidFail() { public void testConsumeMidFail() {
var dataFlux = Flux.fromIterable(data).publish().autoConnect(); var dataFlux = Flux.fromIterable(data).publish().autoConnect();
var exRef = new AtomicReference<Throwable>();
var eventProducer = producer var eventProducer = producer
.sendMessages(dataFlux.map(Integer::toUnsignedString)) .sendMessages(dataFlux.map(Integer::toUnsignedString))
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1)))
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1)))
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> Assertions.fail(ex)); .subscribe(n -> {}, ex -> exRef.set(ex));
try { try {
Assertions.assertThrows(FakeException.class, () -> { Assertions.assertThrows(FakeException.class, () -> {
consumer consumer
@ -331,6 +332,7 @@ public abstract class TestChannel {
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.collect(Collectors.toCollection(IntArrayList::new)) .collect(Collectors.toCollection(IntArrayList::new))
.block(); .block();
Assertions.assertNull(exRef.get());
Assertions.assertNotNull(receiver2); Assertions.assertNotNull(receiver2);
Assertions.assertNotEquals(0, receiver2.getInt(0)); Assertions.assertNotEquals(0, receiver2.getInt(0));
Assertions.assertNotEquals(1, receiver2.getInt(1)); Assertions.assertNotEquals(1, receiver2.getInt(1));
@ -345,6 +347,7 @@ public abstract class TestChannel {
public void testProduceMidCancel() { public void testProduceMidCancel() {
var dataFlux = Flux.fromIterable(data).publish(1).autoConnect(); var dataFlux = Flux.fromIterable(data).publish(1).autoConnect();
var numbers = new ConcurrentLinkedDeque<Integer>(); var numbers = new ConcurrentLinkedDeque<Integer>();
var exRef = new AtomicReference<Throwable>();
var eventConsumer = consumer var eventConsumer = consumer
.consumeMessages() .consumeMessages()
.limitRate(1) .limitRate(1)
@ -354,7 +357,7 @@ public abstract class TestChannel {
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1)))
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1)))
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> Assertions.fail(ex)); .subscribe(n -> {}, ex -> exRef.set(ex));
try { try {
producer producer
.sendMessages(dataFlux.limitRate(1).take(10, true).map(Integer::toUnsignedString)) .sendMessages(dataFlux.limitRate(1).take(10, true).map(Integer::toUnsignedString))
@ -365,6 +368,7 @@ public abstract class TestChannel {
if (numbers.size() < data.size()) { if (numbers.size() < data.size()) {
data.removeInt(data.size() - 1); data.removeInt(data.size() - 1);
} }
Assertions.assertNull(exRef.get());
Assertions.assertEquals(data, List.copyOf(numbers)); Assertions.assertEquals(data, List.copyOf(numbers));
} finally { } finally {
eventConsumer.dispose(); eventConsumer.dispose();
@ -375,6 +379,7 @@ public abstract class TestChannel {
public void testProduceMidFail() { public void testProduceMidFail() {
var dataFlux = Flux.fromIterable(data).publish(1).autoConnect(); var dataFlux = Flux.fromIterable(data).publish(1).autoConnect();
var numbers = new ConcurrentLinkedDeque<Integer>(); var numbers = new ConcurrentLinkedDeque<Integer>();
var exRef = new AtomicReference<Throwable>();
var eventConsumer = consumer var eventConsumer = consumer
.consumeMessages() .consumeMessages()
.limitRate(1) .limitRate(1)
@ -384,12 +389,12 @@ public abstract class TestChannel {
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1)))
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1)))
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> Assertions.fail(ex)); .subscribe(n -> {}, ex -> exRef.set(ex));
try { try {
Assertions.assertThrows(FakeException.class, () -> { Assertions.assertThrows(FakeException.class, () -> {
producer producer
.sendMessages(dataFlux.limitRate(1).doOnNext(i -> { .sendMessages(dataFlux.limitRate(1).doOnNext(i -> {
if (i == 10) { if (i == 40) {
throw new FakeException(); throw new FakeException();
} }
}).map(Integer::toUnsignedString)) }).map(Integer::toUnsignedString))
@ -398,6 +403,7 @@ public abstract class TestChannel {
producer producer
.sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString))
.block(Duration.ofSeconds(5)); .block(Duration.ofSeconds(5));
Assertions.assertNull(exRef.get());
Assertions.assertTrue(numbers.contains(0)); Assertions.assertTrue(numbers.contains(0));
Assertions.assertTrue(numbers.contains(1)); Assertions.assertTrue(numbers.contains(1));
Assertions.assertTrue(numbers.contains(50)); Assertions.assertTrue(numbers.contains(50));
@ -410,11 +416,12 @@ public abstract class TestChannel {
@Test @Test
public void testResubscribe() throws InterruptedException { public void testResubscribe() throws InterruptedException {
var dataFlux = Flux.fromIterable(data).publish().autoConnect(); var dataFlux = Flux.fromIterable(data).publish().autoConnect();
var exRef = new AtomicReference<Throwable>();
var eventProducer = producer var eventProducer = producer
.sendMessages(dataFlux.map(Integer::toUnsignedString)) .sendMessages(dataFlux.map(Integer::toUnsignedString))
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1)))
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1)))
.subscribe(n -> {}, ex -> Assertions.fail(ex)); .subscribe(n -> {}, ex -> exRef.set(ex));
try { try {
consumer consumer
.consumeMessages() .consumeMessages()
@ -434,6 +441,8 @@ public abstract class TestChannel {
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.log("consumer-2", Level.INFO) .log("consumer-2", Level.INFO)
.blockLast(Duration.ofSeconds(5)); .blockLast(Duration.ofSeconds(5));
Assertions.assertNull(exRef.get());
} finally { } finally {
eventProducer.dispose(); eventProducer.dispose();
} }
@ -442,11 +451,13 @@ public abstract class TestChannel {
@Test @Test
public void testFailTwoSubscribers() { public void testFailTwoSubscribers() {
var dataFlux = Flux.fromIterable(data).publish().autoConnect(); var dataFlux = Flux.fromIterable(data).publish().autoConnect();
var exRef1 = new AtomicReference<Throwable>();
var exRef2 = new AtomicReference<Throwable>();
var eventProducer = producer var eventProducer = producer
.sendMessages(dataFlux.map(Integer::toUnsignedString)) .sendMessages(dataFlux.map(Integer::toUnsignedString))
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1)))
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1)))
.subscribe(n -> {}, ex -> Assertions.fail(ex)); .subscribe(n -> {}, ex -> exRef1.set(ex));
Assertions.assertThrows(IllegalStateException.class, () -> { Assertions.assertThrows(IllegalStateException.class, () -> {
try { try {
@ -457,7 +468,7 @@ public abstract class TestChannel {
.map(Timestamped::data) .map(Timestamped::data)
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.log("consumer-1", Level.INFO) .log("consumer-1", Level.INFO)
.doOnError(ex -> Assertions.fail(ex)), .doOnError(ex -> exRef2.set(ex)),
consumer consumer
.consumeMessages() .consumeMessages()
.limitRate(1) .limitRate(1)
@ -469,6 +480,8 @@ public abstract class TestChannel {
) )
) )
.block(); .block();
Assertions.assertNull(exRef1.get());
Assertions.assertNull(exRef2.get());
} catch (RuntimeException ex) { } catch (RuntimeException ex) {
throw Exceptions.unwrap(ex); throw Exceptions.unwrap(ex);
} finally { } finally {
@ -480,12 +493,13 @@ public abstract class TestChannel {
@Test @Test
public void testRepublish() throws InterruptedException { public void testRepublish() throws InterruptedException {
var dataFlux = Flux.fromIterable(data).publish().autoConnect(); var dataFlux = Flux.fromIterable(data).publish().autoConnect();
var exRef = new AtomicReference<Throwable>();
var eventConsumer = consumer.consumeMessages() var eventConsumer = consumer.consumeMessages()
.map(Timestamped::data) .map(Timestamped::data)
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1)))
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1)))
.subscribe(n -> {}, ex -> Assertions.fail(ex)); .subscribe(n -> {}, ex -> exRef.set(ex));
try { try {
producer producer
.sendMessages(dataFlux .sendMessages(dataFlux
@ -497,6 +511,7 @@ public abstract class TestChannel {
producer producer
.sendMessages(dataFlux.log("producer-2", Level.INFO).map(Integer::toUnsignedString)) .sendMessages(dataFlux.log("producer-2", Level.INFO).map(Integer::toUnsignedString))
.block(Duration.ofSeconds(5)); .block(Duration.ofSeconds(5));
Assertions.assertNull(exRef.get());
} finally { } finally {
eventConsumer.dispose(); eventConsumer.dispose();
} }