Initial implementation of the Streaming API

This pull request provides a framework for exchanging a very large
stream between handlers, typically between a decoder and an inbound
handler (or between a handler that writes a message and an encoder that
encodes that message).

For example, an HTTP decoder, previously, generates multiple
micro-messages to decode an HTTP message (i.e. HttpRequest +
HttpChunks). With the streaming API, The HTTP decoder can simply
generate a single HTTP message whose content is a Stream. And then the
inbound handler can consume the Stream via the buffer you created when
you begin to read the stream. If you create a buffer whose capacity is
bounded, you can handle a very large stream without allocating a lot of
memory. If you just want to wait until the whole content is ready, you
can also do that with an unbounded buffer.

The streaming API also supports a limited form of communication between
a producer (i.e. decoder) and a consumer. A producer can abort the
stream if the stream is not valid anymore. A consumer can choose to
reject or discard the stream, where rejection is for unrecoverable
failure and discard is for recoverable failure.

P.S. Special thanks to @jpinner for the initial input.
This commit is contained in:
Trustin Lee 2013-03-09 20:54:42 +09:00
parent b4bf565ad9
commit 32efba34d8
10 changed files with 915 additions and 0 deletions

View File

@ -0,0 +1,497 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
import io.netty.buffer.Buf;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Random;
public abstract class AbstractStream<T extends Buf> implements Stream<T> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractStream.class);
private static final Random random = new Random();
private static final ThreadLocal<Boolean> IN_CONSUMER = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
/**
* Generate a random string that can be used for correlating a group of log messages.
*/
private static String nextLogKey() {
return Long.toHexString(random.nextInt() & 0xFFFFFFFFL);
}
private final StreamProducerContextImpl producerCtx;
private StreamConsumerContextImpl consumerCtx;
private Runnable invokeStreamConsumedTask;
/** 0 - init, 1 - accepted, 2 - discarded, 3 - rejected, 4 - closed */
int state;
T buffer;
@SuppressWarnings("unchecked")
protected AbstractStream(EventExecutor executor, StreamProducer<? super T> producer) {
producerCtx = new StreamProducerContextImpl(executor, producer);
}
T buffer() {
T buffer = this.buffer;
if (buffer == null) {
fail();
}
return buffer;
}
@Override
public void accept(EventExecutor executor, StreamConsumer<? super T> consumer) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (consumer == null) {
throw new NullPointerException("handler");
}
if (state != 0) {
fail();
}
StreamConsumerContextImpl consumerCtx = new StreamConsumerContextImpl(executor, consumer);
@SuppressWarnings("unchecked")
StreamConsumer<T> h = (StreamConsumer<T>) consumer;
try {
buffer = h.newStreamBuffer(consumerCtx);
} catch (Throwable t) {
PlatformDependent.throwException(t);
}
this.consumerCtx = consumerCtx;
state = 1;
fireStreamAccepted();
}
private void fireStreamAccepted() {
EventExecutor e = producerCtx.executor;
if (e.inEventLoop()) {
invokeStreamAccepted();
} else {
e.execute(new Runnable() {
@Override
public void run() {
invokeStreamAccepted();
}
});
}
}
private void invokeStreamAccepted() {
StreamProducerContextImpl producerCtx = this.producerCtx;
try {
producerCtx.producer.streamAccepted(producerCtx);
} catch (Throwable t) {
safeAbort(producerCtx, t);
return;
}
if (consumerCtx.nextCalled) {
consumerCtx.invokeStreamConsumed();
}
}
void safeAbort(StreamProducerContextImpl producerCtx, Throwable cause) {
try {
producerCtx.abort(cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
String key = nextLogKey();
logger.warn("[{}] Failed to auto-abort a stream.", key, t);
logger.warn("[{}] .. when invoked with the following cause:", key, cause);
}
}
}
@Override
public void discard() {
StreamConsumerContextImpl consumerCtx = this.consumerCtx;
if (consumerCtx == null) {
fail();
}
consumerCtx.discard();
}
@Override
public void reject(Throwable cause) {
StreamConsumerContextImpl consumerCtx = this.consumerCtx;
if (consumerCtx == null) {
fail();
}
consumerCtx.reject(cause);
}
private void fail() {
switch (state) {
case 0:
throw new IllegalStateException("stream not accepted yet");
case 1:
throw new IllegalStateException("stream accepted already");
case 2:
throw new IllegalStateException("stream discarded already");
case 3:
throw new IllegalStateException("stream rejected already");
case 4:
throw new IllegalStateException("stream closed already");
default:
throw new Error();
}
}
private final class StreamProducerContextImpl implements StreamProducerContext<T> {
final EventExecutor executor;
final StreamProducer<T> producer;
boolean invokedStreamOpen;
Runnable invokeStreamUpdatedTask;
@SuppressWarnings("unchecked")
StreamProducerContextImpl(EventExecutor executor, StreamProducer<? super T> producer) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (producer == null) {
throw new NullPointerException("producer");
}
this.executor = executor;
this.producer = (StreamProducer<T>) producer;
}
@Override
public EventExecutor executor() {
return executor;
}
@Override
public T buffer() {
return AbstractStream.this.buffer();
}
@Override
public StreamProducerContext<T> update() {
if (state != 1) {
fail();
}
fireStreamUpdated();
return this;
}
private void fireStreamUpdated() {
EventExecutor e = consumerCtx.executor;
if (e.inEventLoop()) {
invokeStreamUpdated();
} else {
Runnable task = invokeStreamUpdatedTask;
if (task == null) {
invokeStreamUpdatedTask = task = new Runnable() {
@Override
public void run() {
invokeStreamUpdated();
}
};
}
e.execute(task);
}
}
private void invokeStreamUpdated() {
StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
StreamConsumer<T> consumer = consumerCtx.consumer;
if (consumerCtx.singleThreaded) {
IN_CONSUMER.set(Boolean.TRUE);
}
try {
if (!invokedStreamOpen) {
invokedStreamOpen = true;
try {
consumer.streamOpen(consumerCtx);
} catch (Throwable t) {
safeAbort(producerCtx, new StreamConsumerException(t));
return;
}
}
try {
consumer.streamUpdated(consumerCtx);
} catch (Throwable t) {
safeAbort(producerCtx, new StreamConsumerException(t));
}
} finally {
if (consumerCtx.singleThreaded) {
IN_CONSUMER.set(Boolean.FALSE);
}
}
}
@Override
public void close() {
if (state == 1) {
state = 4;
fireStreamClosed();
}
}
private void fireStreamClosed() {
EventExecutor e = consumerCtx.executor;
if (e.inEventLoop()) {
invokeStreamClosed();
} else {
e.execute(new Runnable() {
@Override
public void run() {
invokeStreamClosed();
}
});
}
}
private void invokeStreamClosed() {
StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
try {
consumerCtx.consumer.streamClosed(consumerCtx);
} catch (Throwable t) {
logger.warn("StreamConsumer.streamClosed() raised an exception.", t);
}
}
@Override
public void abort(Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
if (state != 1) {
fail();
}
fireStreamAborted(cause);
}
private void fireStreamAborted(final Throwable cause) {
StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
EventExecutor e = consumerCtx.executor;
if (e.inEventLoop()) {
invokeStreamAborted(cause);
} else {
e.execute(new Runnable() {
@Override
public void run() {
invokeStreamAborted(cause);
}
});
}
}
private void invokeStreamAborted(Throwable cause) {
StreamConsumerContextImpl consumerCtx = AbstractStream.this.consumerCtx;
try {
consumerCtx.consumer.streamAborted(consumerCtx, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
String key = nextLogKey();
logger.warn("[{}] StreamConsumer.streamAborted() raised an exception.", key, t);
logger.warn("[{}] .. when invoked with the following cause:", key, cause);
}
} finally {
invokeStreamClosed();
}
}
}
private final class StreamConsumerContextImpl implements StreamConsumerContext<T> {
final EventExecutor executor;
final StreamConsumer<T> consumer;
final boolean singleThreaded;
boolean nextCalled;
@SuppressWarnings("unchecked")
StreamConsumerContextImpl(EventExecutor executor, StreamConsumer<? super T> consumer) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (consumer == null) {
throw new NullPointerException("consumer");
}
this.executor = executor;
this.consumer = (StreamConsumer<T>) consumer;
singleThreaded = executor == producerCtx.executor;
}
@Override
public EventExecutor executor() {
return executor;
}
@Override
public T buffer() {
return AbstractStream.this.buffer();
}
@Override
public void discard() {
switch (state) {
case 2:
return;
case 3:
case 4:
fail();
}
T buffer = AbstractStream.this.buffer;
if (buffer != null) {
buffer.release();
}
state = 2;
fireStreamDiscarded();
}
private void fireStreamDiscarded() {
EventExecutor e = producerCtx.executor;
if (e.inEventLoop()) {
invokeStreamDiscarded();
} else {
e.execute(new Runnable() {
@Override
public void run() {
invokeStreamDiscarded();
}
});
}
}
private void invokeStreamDiscarded() {
StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx;
try {
producerCtx.producer.streamDiscarded(producerCtx);
} catch (Throwable t) {
logger.warn("StreamProducer.streamDiscarded() raised an exception.", t);
}
}
@Override
public void reject(Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
if (state != 1) {
fail();
}
T buffer = AbstractStream.this.buffer;
if (buffer != null) {
buffer.release();
}
state = 3;
fireStreamRejected(cause);
}
private void fireStreamRejected(final Throwable cause) {
EventExecutor e = producerCtx.executor;
if (e.inEventLoop()) {
invokeStreamRejected(cause);
} else {
e.execute(new Runnable() {
@Override
public void run() {
invokeStreamRejected(cause);
}
});
}
}
private void invokeStreamRejected(Throwable cause) {
StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx;
try {
producerCtx.producer.streamRejected(producerCtx, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
String key = nextLogKey();
logger.warn("[{}] StreamProducer.streamRejected() raised an exception.", key, t);
logger.warn("[{}] .. when invoked with the following cause:", key, cause);
}
}
}
@Override
public void next() {
if (state != 1) {
fail();
}
if (singleThreaded && IN_CONSUMER.get()) {
nextCalled = true;
} else {
fireStreamConsumed();
}
}
private void fireStreamConsumed() {
EventExecutor e = producerCtx.executor;
if (e.inEventLoop()) {
invokeStreamConsumed();
} else {
Runnable task = invokeStreamConsumedTask;
if (task == null) {
invokeStreamConsumedTask = task = new Runnable() {
@Override
public void run() {
invokeStreamConsumed();
}
};
}
e.execute(task);
}
}
private void invokeStreamConsumed() {
StreamProducerContextImpl producerCtx = AbstractStream.this.producerCtx;
try {
do {
consumerCtx.nextCalled = false;
producerCtx.producer.streamConsumed(producerCtx);
} while (consumerCtx.nextCalled);
} catch (Throwable t) {
reject(new StreamProducerException(t));
}
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
import io.netty.buffer.Buf;
import io.netty.util.concurrent.EventExecutor;
public interface Stream<T extends Buf> {
void accept(EventExecutor executor, StreamConsumer<? super T> consumer);
void discard();
void reject(Throwable cause);
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
import io.netty.buffer.Buf;
public interface StreamConsumer<T extends Buf> {
T newStreamBuffer(StreamConsumerContext<T> ctx) throws Exception;
void streamOpen(StreamConsumerContext<T> ctx) throws Exception;
void streamUpdated(StreamConsumerContext<T> ctx) throws Exception;
void streamAborted(StreamConsumerContext<T> ctx, Throwable cause) throws Exception;
void streamClosed(StreamConsumerContext<T> ctx) throws Exception;
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
import io.netty.buffer.Buf;
import io.netty.util.concurrent.EventExecutor;
public interface StreamConsumerContext<T extends Buf> {
EventExecutor executor();
T buffer();
void next();
void discard();
void reject(Throwable cause);
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
public class StreamConsumerException extends StreamException {
private static final long serialVersionUID = -7688455132621735741L;
public StreamConsumerException() { }
public StreamConsumerException(String message) {
super(message);
}
public StreamConsumerException(String message, Throwable cause) {
super(message, cause);
}
public StreamConsumerException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
public class StreamException extends RuntimeException {
private static final long serialVersionUID = -8529582560484984135L;
public StreamException() { }
public StreamException(String message) {
super(message);
}
public StreamException(String message, Throwable cause) {
super(message, cause);
}
public StreamException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
import io.netty.buffer.Buf;
public interface StreamProducer<T extends Buf> {
void streamAccepted(StreamProducerContext<T> ctx) throws Exception;
void streamConsumed(StreamProducerContext<T> ctx) throws Exception;
void streamDiscarded(StreamProducerContext<T> ctx) throws Exception;
void streamRejected(StreamProducerContext<T> ctx, Throwable cause) throws Exception;
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
import io.netty.buffer.Buf;
import io.netty.util.concurrent.EventExecutor;
public interface StreamProducerContext<T extends Buf> {
EventExecutor executor();
T buffer();
StreamProducerContext<T> update();
void close();
void abort(Throwable cause);
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
public class StreamProducerException extends StreamException {
private static final long serialVersionUID = 8830744325775707415L;
public StreamProducerException() { }
public StreamProducerException(String message) {
super(message);
}
public StreamProducerException(String message, Throwable cause) {
super(message, cause);
}
public StreamProducerException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,174 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.stream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class StreamTest {
private static EventExecutor executorA;
private static EventExecutor executorB;
@BeforeClass
public static void setUp() {
executorA = new DefaultEventExecutorGroup(1).next();
executorB = new DefaultEventExecutorGroup(1).next();
}
@AfterClass
public static void tearDown() {
executorB.shutdown();
executorA.shutdown();
}
@Test(timeout = 5000)
public void testSimpleWithSameExecutor() throws Exception {
final long size = 1048576L * 1024L * 1024L; // Transfer whooping 1 TiB of garbage
testSimple0(executorA, executorA, size);
}
@Test(timeout = 10000)
public void testSimpleWithDifferentExecutors() throws Exception {
final long size = 1048576L * 1024L * 32L; // Transfer 32 GiB of garbage
testSimple0(executorA, executorB, size);
}
private static void testSimple0(EventExecutor executorA, EventExecutor executorB, long size) throws Exception {
LargeByteStreamConsumer consumer = new LargeByteStreamConsumer(size);
Stream<ByteBuf> stream = new LargeByteStream(executorA, size);
stream.accept(executorB, consumer);
for (;;) {
if (consumer.future.await(1000)) {
break;
}
System.err.println(consumer.counter + " / " + size);
}
consumer.future.sync();
}
private static class LargeByteStream extends AbstractStream<ByteBuf> {
LargeByteStream(EventExecutor executor, long size) {
super(executor, new LargeByteStreamProducer(size));
}
}
private static class LargeByteStreamProducer implements StreamProducer<ByteBuf> {
private final long size;
private long counter;
LargeByteStreamProducer(long size) {
this.size = size;
}
@Override
public void streamAccepted(StreamProducerContext<ByteBuf> ctx) throws Exception {
generate(ctx);
}
@Override
public void streamConsumed(StreamProducerContext<ByteBuf> ctx) throws Exception {
generate(ctx);
}
private void generate(StreamProducerContext<ByteBuf> ctx) {
ByteBuf buf = ctx.buffer();
int chunkSize = (int) Math.min(size - counter, buf.maxWritableBytes());
buf.ensureWritable(chunkSize);
buf.writerIndex(buf.writerIndex() + chunkSize);
ctx.update();
counter += chunkSize;
if (counter == size) {
ctx.close();
return;
}
if (counter > size) {
throw new AssertionError("counter > size");
}
}
@Override
public void streamDiscarded(StreamProducerContext<ByteBuf> ctx) throws Exception {
throw new AssertionError("stream discarded");
}
@Override
public void streamRejected(StreamProducerContext<ByteBuf> ctx, Throwable cause) throws Exception {
throw new AssertionError("stream rejected", cause);
}
}
private static class LargeByteStreamConsumer implements StreamConsumer<ByteBuf> {
private final long size;
volatile Promise future;
volatile long counter;
LargeByteStreamConsumer(long size) {
this.size = size;
}
@Override
public ByteBuf newStreamBuffer(StreamConsumerContext<ByteBuf> ctx) throws Exception {
future = new DefaultPromise(ctx.executor());
return Unpooled.buffer(0, 65536); // Only use 64 KiB at max.
}
@Override
public void streamOpen(StreamConsumerContext<ByteBuf> ctx) throws Exception { }
@Override
public void streamUpdated(StreamConsumerContext<ByteBuf> ctx) throws Exception {
ByteBuf buf = ctx.buffer();
int chunkSize = buf.readableBytes();
buf.skipBytes(chunkSize);
buf.discardReadBytes();
counter += chunkSize;
if (counter == size) {
future.setSuccess();
} else if (counter > size) {
AssertionError error = new AssertionError("counter > size");
ctx.reject(error);
future.setFailure(error);
} else {
ctx.next();
}
}
@Override
public void streamAborted(StreamConsumerContext<ByteBuf> ctx, Throwable cause) throws Exception {
future.setFailure(cause);
}
@Override
public void streamClosed(StreamConsumerContext<ByteBuf> ctx) throws Exception {
future.tryFailure(new AssertionError("tryFailure() must return false."));
}
}
}