Remove Void*Promise (#11348)
Motivation: Sometime in the past we introduced the concept of Void*Promise. As it turned out this was not a good idea at all as basically each handler in the pipeline need to be very careful to correctly handle this. We should better just remove this "optimization". Modifications: - Remove Void*Promise and all the related APIs - Remove tests which were related to Void*Promise Result: Less error-prone API
This commit is contained in:
parent
1a28c26b4b
commit
abdaa769de
@ -260,9 +260,4 @@ abstract class DelegatingChannelHandlerContext implements ChannelHandlerContext
|
|||||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||||
return ctx.newFailedFuture(cause);
|
return ctx.newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return ctx.voidPromise();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ public class HttpServerKeepAliveHandler implements ChannelHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (msg instanceof LastHttpContent && !shouldKeepAlive()) {
|
if (msg instanceof LastHttpContent && !shouldKeepAlive()) {
|
||||||
promise = promise.unvoid().addListener(ChannelFutureListener.CLOSE);
|
promise.addListener(ChannelFutureListener.CLOSE);
|
||||||
}
|
}
|
||||||
ctx.write(msg, promise);
|
ctx.write(msg, promise);
|
||||||
}
|
}
|
||||||
|
@ -101,7 +101,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
|
|||||||
ReferenceCountUtil.release(msg);
|
ReferenceCountUtil.release(msg);
|
||||||
promise.setFailure(new ClosedChannelException());
|
promise.setFailure(new ClosedChannelException());
|
||||||
} else if (msg instanceof CloseWebSocketFrame) {
|
} else if (msg instanceof CloseWebSocketFrame) {
|
||||||
closeSent(promise.unvoid());
|
closeSent(promise);
|
||||||
ctx.write(msg).addListener(new ChannelPromiseNotifier(false, closeSent));
|
ctx.write(msg).addListener(new ChannelPromiseNotifier(false, closeSent));
|
||||||
} else {
|
} else {
|
||||||
ctx.write(msg, promise);
|
ctx.write(msg, promise);
|
||||||
|
@ -131,20 +131,6 @@ public class HttpServerKeepAliveHandlerTest {
|
|||||||
assertFalse(channel.finishAndReleaseAll());
|
assertFalse(channel.finishAndReleaseAll());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testConnectionCloseHeaderHandledCorrectlyForVoidPromise() throws Exception {
|
|
||||||
HttpResponse response = new DefaultFullHttpResponse(httpVersion, responseStatus);
|
|
||||||
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
|
|
||||||
setupMessageLength(response);
|
|
||||||
|
|
||||||
channel.writeAndFlush(response, channel.voidPromise());
|
|
||||||
HttpResponse writtenResponse = channel.readOutbound();
|
|
||||||
|
|
||||||
assertFalse(channel.isOpen());
|
|
||||||
ReferenceCountUtil.release(writtenResponse);
|
|
||||||
assertFalse(channel.finishAndReleaseAll());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_PipelineKeepAlive() {
|
public void test_PipelineKeepAlive() {
|
||||||
FullHttpRequest firstRequest = new DefaultFullHttpRequest(httpVersion, HttpMethod.GET, "/v1/foo/bar");
|
FullHttpRequest firstRequest = new DefaultFullHttpRequest(httpVersion, HttpMethod.GET, "/v1/foo/bar");
|
||||||
|
@ -33,7 +33,6 @@ import io.netty.channel.DefaultChannelPipeline;
|
|||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.MessageSizeEstimator;
|
import io.netty.channel.MessageSizeEstimator;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.VoidChannelPromise;
|
|
||||||
import io.netty.channel.WriteBufferWaterMark;
|
import io.netty.channel.WriteBufferWaterMark;
|
||||||
import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
|
import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
|
||||||
import io.netty.util.DefaultAttributeMap;
|
import io.netty.util.DefaultAttributeMap;
|
||||||
@ -110,7 +109,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
// Notify the child-channel and close it.
|
// Notify the child-channel and close it.
|
||||||
streamChannel.pipeline().fireExceptionCaught(cause);
|
streamChannel.pipeline().fireExceptionCaught(cause);
|
||||||
streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
|
streamChannel.unsafe().close(streamChannel.newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -505,11 +504,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
return pipeline().newFailedFuture(cause);
|
return pipeline().newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return pipeline().voidPromise();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return id().hashCode();
|
return id().hashCode();
|
||||||
@ -572,8 +566,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private final class Http2ChannelUnsafe implements Unsafe {
|
private final class Http2ChannelUnsafe implements Unsafe {
|
||||||
private final VoidChannelPromise unsafeVoidPromise =
|
|
||||||
new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private RecvByteBufAllocator.Handle recvHandle;
|
private RecvByteBufAllocator.Handle recvHandle;
|
||||||
private boolean writeDoneAndNoFlush;
|
private boolean writeDoneAndNoFlush;
|
||||||
@ -653,7 +645,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
if (closePromise.isDone()) {
|
if (closePromise.isDone()) {
|
||||||
// Closed already.
|
// Closed already.
|
||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
|
} else {
|
||||||
// This means close() was called before so we just register a listener and return
|
// This means close() was called before so we just register a listener and return
|
||||||
closePromise.addListener((ChannelFutureListener) future -> promise.setSuccess());
|
closePromise.addListener((ChannelFutureListener) future -> promise.setSuccess());
|
||||||
}
|
}
|
||||||
@ -672,7 +664,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
|
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
|
||||||
if (parent().isActive() && !readEOS && Http2CodecUtil.isStreamIdValid(stream.id())) {
|
if (parent().isActive() && !readEOS && Http2CodecUtil.isStreamIdValid(stream.id())) {
|
||||||
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
|
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
|
||||||
write(resetFrame, unsafe().voidPromise());
|
write(resetFrame, newPromise());
|
||||||
flush();
|
flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -692,12 +684,12 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
closePromise.setSuccess();
|
closePromise.setSuccess();
|
||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
|
|
||||||
fireChannelInactiveAndDeregister(voidPromise(), wasActive);
|
fireChannelInactiveAndDeregister(newPromise(), wasActive);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closeForcibly() {
|
public void closeForcibly() {
|
||||||
close(unsafe().voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -738,7 +730,7 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void safeSetSuccess(ChannelPromise promise) {
|
private void safeSetSuccess(ChannelPromise promise) {
|
||||||
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
|
if (!promise.trySuccess()) {
|
||||||
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1031,11 +1023,6 @@ abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements
|
|||||||
flush0(parentContext());
|
flush0(parentContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return unsafeVoidPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelOutboundBuffer outboundBuffer() {
|
public ChannelOutboundBuffer outboundBuffer() {
|
||||||
// Always return null as we not use the ChannelOutboundBuffer and not even support it.
|
// Always return null as we not use the ChannelOutboundBuffer and not even support it.
|
||||||
|
@ -124,8 +124,6 @@ public class DefaultHttp2Connection implements Http2Connection {
|
|||||||
if (closePromise != null) {
|
if (closePromise != null) {
|
||||||
if (closePromise == promise) {
|
if (closePromise == promise) {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
} else if ((promise instanceof ChannelPromise) && ((ChannelPromise) closePromise).isVoid()) {
|
|
||||||
closePromise = promise;
|
|
||||||
} else {
|
} else {
|
||||||
closePromise.addListener(new UnaryPromiseNotifier<>(promise));
|
closePromise.addListener(new UnaryPromiseNotifier<>(promise));
|
||||||
}
|
}
|
||||||
|
@ -120,7 +120,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Ht
|
|||||||
@Override
|
@Override
|
||||||
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
|
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
|
||||||
final boolean endOfStream, ChannelPromise promise) {
|
final boolean endOfStream, ChannelPromise promise) {
|
||||||
promise = promise.unvoid();
|
|
||||||
final Http2Stream stream;
|
final Http2Stream stream;
|
||||||
try {
|
try {
|
||||||
stream = requireStream(streamId);
|
stream = requireStream(streamId);
|
||||||
@ -227,7 +226,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Ht
|
|||||||
if (!endOfStream || !flowController.hasFlowControlled(stream)) {
|
if (!endOfStream || !flowController.hasFlowControlled(stream)) {
|
||||||
// The behavior here should mirror that in FlowControlledHeaders
|
// The behavior here should mirror that in FlowControlledHeaders
|
||||||
|
|
||||||
promise = promise.unvoid();
|
|
||||||
boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
|
boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
|
||||||
|
|
||||||
ChannelFuture future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
|
ChannelFuture future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
|
||||||
@ -349,7 +347,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Ht
|
|||||||
// Reserve the promised stream.
|
// Reserve the promised stream.
|
||||||
connection.local().reservePushStream(promisedStreamId, stream);
|
connection.local().reservePushStream(promisedStreamId, stream);
|
||||||
|
|
||||||
promise = promise.unvoid();
|
|
||||||
ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
|
ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
|
||||||
promise);
|
promise);
|
||||||
// Writing headers may fail during the encode state if they violate HPACK limits.
|
// Writing headers may fail during the encode state if they violate HPACK limits.
|
||||||
@ -550,7 +547,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Ht
|
|||||||
FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
|
FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
|
||||||
int streamDependency, short weight, boolean exclusive,
|
int streamDependency, short weight, boolean exclusive,
|
||||||
int padding, boolean endOfStream, ChannelPromise promise) {
|
int padding, boolean endOfStream, ChannelPromise promise) {
|
||||||
super(stream, padding, endOfStream, promise.unvoid());
|
super(stream, padding, endOfStream, promise);
|
||||||
this.headers = headers;
|
this.headers = headers;
|
||||||
this.hasPriorty = hasPriority;
|
this.hasPriorty = hasPriority;
|
||||||
this.streamDependency = streamDependency;
|
this.streamDependency = streamDependency;
|
||||||
|
@ -203,7 +203,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
|
|
||||||
// We need to remove all streams (not just the active ones).
|
// We need to remove all streams (not just the active ones).
|
||||||
// See https://github.com/netty/netty/issues/4838.
|
// See https://github.com/netty/netty/issues/4838.
|
||||||
connection().close(ctx.voidPromise());
|
connection().close(ctx.newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -457,7 +457,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
ctx.close(promise);
|
ctx.close(promise);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
promise = promise.unvoid();
|
|
||||||
// Avoid NotYetConnectedException and avoid sending before connection preface
|
// Avoid NotYetConnectedException and avoid sending before connection preface
|
||||||
if (!ctx.channel().isActive() || !prefaceSent()) {
|
if (!ctx.channel().isActive() || !prefaceSent()) {
|
||||||
ctx.close(promise);
|
ctx.close(promise);
|
||||||
@ -760,7 +759,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
ChannelPromise promise) {
|
ChannelPromise promise) {
|
||||||
final Http2Stream stream = connection().stream(streamId);
|
final Http2Stream stream = connection().stream(streamId);
|
||||||
if (stream == null) {
|
if (stream == null) {
|
||||||
return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
|
return resetUnknownStream(ctx, streamId, errorCode, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
return resetStream(ctx, stream, errorCode, promise);
|
return resetStream(ctx, stream, errorCode, promise);
|
||||||
@ -768,7 +767,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
|
|
||||||
private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
|
private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
|
||||||
long errorCode, ChannelPromise promise) {
|
long errorCode, ChannelPromise promise) {
|
||||||
promise = promise.unvoid();
|
|
||||||
if (stream.isResetSent()) {
|
if (stream.isResetSent()) {
|
||||||
// Don't write a RST_STREAM frame if we have already written one.
|
// Don't write a RST_STREAM frame if we have already written one.
|
||||||
return promise.setSuccess();
|
return promise.setSuccess();
|
||||||
@ -801,7 +799,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
|||||||
@Override
|
@Override
|
||||||
public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
|
public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
|
||||||
final ByteBuf debugData, ChannelPromise promise) {
|
final ByteBuf debugData, ChannelPromise promise) {
|
||||||
promise = promise.unvoid();
|
|
||||||
final Http2Connection connection = connection();
|
final Http2Connection connection = connection();
|
||||||
try {
|
try {
|
||||||
if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
|
if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
|
||||||
|
@ -106,7 +106,7 @@ final class Http2ControlFrameLimitEncoder extends DecoratingHttp2ConnectionEncod
|
|||||||
|
|
||||||
// We did not reach the limit yet, add the listener to decrement the number of outstanding control frames
|
// We did not reach the limit yet, add the listener to decrement the number of outstanding control frames
|
||||||
// once the promise was completed
|
// once the promise was completed
|
||||||
return promise.unvoid().addListener(outstandingControlFramesListener);
|
return promise.addListener(outstandingControlFramesListener);
|
||||||
}
|
}
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,6 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH
|
|||||||
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||||
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
|
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
|
||||||
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
|
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
|
||||||
import static io.netty.handler.codec.http2.Http2TestUtil.newVoidPromise;
|
|
||||||
import static io.netty.util.CharsetUtil.UTF_8;
|
import static io.netty.util.CharsetUtil.UTF_8;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
@ -268,35 +267,6 @@ public class DefaultHttp2ConnectionEncoderTest {
|
|||||||
assertTrue(promise2.isSuccess());
|
assertTrue(promise2.isSuccess());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void dataFramesShouldMergeUseVoidPromise() throws Exception {
|
|
||||||
createStream(STREAM_ID, false);
|
|
||||||
final ByteBuf data = dummyData().retain();
|
|
||||||
|
|
||||||
ChannelPromise promise1 = newVoidPromise(channel);
|
|
||||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise1);
|
|
||||||
ChannelPromise promise2 = newVoidPromise(channel);
|
|
||||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise2);
|
|
||||||
|
|
||||||
// Now merge the two payloads.
|
|
||||||
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
|
|
||||||
FlowControlled mergedPayload = capturedWrites.get(0);
|
|
||||||
mergedPayload.merge(ctx, capturedWrites.get(1));
|
|
||||||
assertEquals(16, mergedPayload.size());
|
|
||||||
assertFalse(promise1.isSuccess());
|
|
||||||
assertFalse(promise2.isSuccess());
|
|
||||||
|
|
||||||
// Write the merged payloads and verify it was written correctly.
|
|
||||||
mergedPayload.write(ctx, 16);
|
|
||||||
assertEquals(0, mergedPayload.size());
|
|
||||||
assertEquals("abcdefghabcdefgh", writtenData.get(0));
|
|
||||||
assertEquals(0, data.refCnt());
|
|
||||||
|
|
||||||
// The promises won't be set since there are no listeners.
|
|
||||||
assertFalse(promise1.isSuccess());
|
|
||||||
assertFalse(promise2.isSuccess());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void dataFramesDontMergeWithHeaders() throws Exception {
|
public void dataFramesDontMergeWithHeaders() throws Exception {
|
||||||
createStream(STREAM_ID, false);
|
createStream(STREAM_ID, false);
|
||||||
@ -315,26 +285,6 @@ public class DefaultHttp2ConnectionEncoderTest {
|
|||||||
assertEquals(0, data.refCnt());
|
assertEquals(0, data.refCnt());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void writeHeadersUsingVoidPromise() throws Exception {
|
|
||||||
final Throwable cause = new RuntimeException("fake exception");
|
|
||||||
when(writer.writeHeaders(eq(ctx), eq(STREAM_ID), any(Http2Headers.class),
|
|
||||||
anyInt(), anyBoolean(), any(ChannelPromise.class)))
|
|
||||||
.then((Answer<ChannelFuture>) invocationOnMock -> {
|
|
||||||
ChannelPromise promise = invocationOnMock.getArgument(5);
|
|
||||||
assertFalse(promise.isVoid());
|
|
||||||
return promise.setFailure(cause);
|
|
||||||
});
|
|
||||||
createStream(STREAM_ID, false);
|
|
||||||
// END_STREAM flag, so that a listener is added to the future.
|
|
||||||
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, newVoidPromise(channel));
|
|
||||||
|
|
||||||
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), any(Http2Headers.class),
|
|
||||||
anyInt(), anyBoolean(), any(ChannelPromise.class));
|
|
||||||
// When using a void promise, the error should be propagated via the channel pipeline.
|
|
||||||
verify(pipeline).fireExceptionCaught(cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Exception {
|
private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Exception {
|
||||||
createStream(STREAM_ID, false);
|
createStream(STREAM_ID, false);
|
||||||
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
|
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
|
||||||
|
@ -28,7 +28,6 @@ import io.netty.channel.ChannelPromise;
|
|||||||
import io.netty.channel.DefaultChannelConfig;
|
import io.netty.channel.DefaultChannelConfig;
|
||||||
import io.netty.channel.DefaultChannelPromise;
|
import io.netty.channel.DefaultChannelPromise;
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
|
|
||||||
import io.netty.handler.codec.http2.Http2Exception.ShutdownHint;
|
import io.netty.handler.codec.http2.Http2Exception.ShutdownHint;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
@ -54,7 +53,6 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
|||||||
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
|
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
|
||||||
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
|
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
|
||||||
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
|
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
|
||||||
import static io.netty.handler.codec.http2.Http2TestUtil.newVoidPromise;
|
|
||||||
import static io.netty.util.CharsetUtil.US_ASCII;
|
import static io.netty.util.CharsetUtil.US_ASCII;
|
||||||
import static io.netty.util.CharsetUtil.UTF_8;
|
import static io.netty.util.CharsetUtil.UTF_8;
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
@ -85,7 +83,6 @@ public class Http2ConnectionHandlerTest {
|
|||||||
|
|
||||||
private Http2ConnectionHandler handler;
|
private Http2ConnectionHandler handler;
|
||||||
private ChannelPromise promise;
|
private ChannelPromise promise;
|
||||||
private ChannelPromise voidPromise;
|
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private Http2Connection connection;
|
private Http2Connection connection;
|
||||||
@ -143,7 +140,6 @@ public class Http2ConnectionHandlerTest {
|
|||||||
MockitoAnnotations.initMocks(this);
|
MockitoAnnotations.initMocks(this);
|
||||||
|
|
||||||
promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
||||||
voidPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
|
||||||
|
|
||||||
when(channel.metadata()).thenReturn(new ChannelMetadata(false));
|
when(channel.metadata()).thenReturn(new ChannelMetadata(false));
|
||||||
DefaultChannelConfig config = new DefaultChannelConfig(channel);
|
DefaultChannelConfig config = new DefaultChannelConfig(channel);
|
||||||
@ -195,7 +191,6 @@ public class Http2ConnectionHandlerTest {
|
|||||||
when(ctx.channel()).thenReturn(channel);
|
when(ctx.channel()).thenReturn(channel);
|
||||||
when(ctx.newSucceededFuture()).thenReturn(future);
|
when(ctx.newSucceededFuture()).thenReturn(future);
|
||||||
when(ctx.newPromise()).thenReturn(promise);
|
when(ctx.newPromise()).thenReturn(promise);
|
||||||
when(ctx.voidPromise()).thenReturn(voidPromise);
|
|
||||||
when(ctx.write(any())).thenReturn(future);
|
when(ctx.write(any())).thenReturn(future);
|
||||||
when(ctx.executor()).thenReturn(executor);
|
when(ctx.executor()).thenReturn(executor);
|
||||||
doAnswer(in -> {
|
doAnswer(in -> {
|
||||||
@ -635,36 +630,6 @@ public class Http2ConnectionHandlerTest {
|
|||||||
verifyNoMoreInteractions(frameWriter);
|
verifyNoMoreInteractions(frameWriter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void canSendGoAwayUsingVoidPromise() throws Exception {
|
|
||||||
handler = newHandler();
|
|
||||||
ByteBuf data = dummyData();
|
|
||||||
long errorCode = Http2Error.INTERNAL_ERROR.code();
|
|
||||||
handler = newHandler();
|
|
||||||
final Throwable cause = new RuntimeException("fake exception");
|
|
||||||
doAnswer((Answer<ChannelFuture>) invocation -> {
|
|
||||||
ChannelPromise promise = invocation.getArgument(4);
|
|
||||||
assertFalse(promise.isVoid());
|
|
||||||
// This is what DefaultHttp2FrameWriter does... I hate mocking :-(.
|
|
||||||
SimpleChannelPromiseAggregator aggregatedPromise =
|
|
||||||
new SimpleChannelPromiseAggregator(promise, channel, ImmediateEventExecutor.INSTANCE);
|
|
||||||
aggregatedPromise.newPromise();
|
|
||||||
aggregatedPromise.doneAllocatingPromises();
|
|
||||||
return aggregatedPromise.setFailure(cause);
|
|
||||||
}).when(frameWriter).writeGoAway(
|
|
||||||
any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ByteBuf.class), any(ChannelPromise.class));
|
|
||||||
handler.goAway(ctx, STREAM_ID, errorCode, data, newVoidPromise(channel));
|
|
||||||
verify(pipeline).fireExceptionCaught(cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void canCloseStreamWithVoidPromise() throws Exception {
|
|
||||||
handler = newHandler();
|
|
||||||
handler.closeStream(stream, ctx.voidPromise());
|
|
||||||
verify(stream, times(1)).close();
|
|
||||||
verifyNoMoreInteractions(stream);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void channelReadCompleteTriggersFlush() throws Exception {
|
public void channelReadCompleteTriggersFlush() throws Exception {
|
||||||
handler = newHandler();
|
handler = newHandler();
|
||||||
@ -702,16 +667,6 @@ public class Http2ConnectionHandlerTest {
|
|||||||
verifyZeroInteractions(frameWriter);
|
verifyZeroInteractions(frameWriter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void writeRstStreamForUnknownStreamUsingVoidPromise() throws Exception {
|
|
||||||
writeRstStreamUsingVoidPromise(NON_EXISTANT_STREAM_ID);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void writeRstStreamForKnownStreamUsingVoidPromise() throws Exception {
|
|
||||||
writeRstStreamUsingVoidPromise(STREAM_ID);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void gracefulShutdownTimeoutWhenConnectionErrorHardShutdownTest() throws Exception {
|
public void gracefulShutdownTimeoutWhenConnectionErrorHardShutdownTest() throws Exception {
|
||||||
gracefulShutdownTimeoutWhenConnectionErrorTest0(ShutdownHint.HARD_SHUTDOWN);
|
gracefulShutdownTimeoutWhenConnectionErrorTest0(ShutdownHint.HARD_SHUTDOWN);
|
||||||
@ -788,21 +743,6 @@ public class Http2ConnectionHandlerTest {
|
|||||||
assertTrue(promise2.isSuccess());
|
assertTrue(promise2.isSuccess());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeRstStreamUsingVoidPromise(int streamId) throws Exception {
|
|
||||||
handler = newHandler();
|
|
||||||
final Throwable cause = new RuntimeException("fake exception");
|
|
||||||
when(stream.id()).thenReturn(STREAM_ID);
|
|
||||||
when(frameWriter.writeRstStream(eq(ctx), eq(streamId), anyLong(), any(ChannelPromise.class)))
|
|
||||||
.then((Answer<ChannelFuture>) invocationOnMock -> {
|
|
||||||
ChannelPromise promise = invocationOnMock.getArgument(3);
|
|
||||||
assertFalse(promise.isVoid());
|
|
||||||
return promise.setFailure(cause);
|
|
||||||
});
|
|
||||||
handler.resetStream(ctx, streamId, STREAM_CLOSED.code(), newVoidPromise(channel));
|
|
||||||
verify(frameWriter).writeRstStream(eq(ctx), eq(streamId), anyLong(), any(ChannelPromise.class));
|
|
||||||
verify(pipeline).fireExceptionCaught(cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ByteBuf dummyData() {
|
private static ByteBuf dummyData() {
|
||||||
return Unpooled.buffer().writeBytes("abcdefgh".getBytes(UTF_8));
|
return Unpooled.buffer().writeBytes("abcdefgh".getBytes(UTF_8));
|
||||||
}
|
}
|
||||||
|
@ -340,10 +340,5 @@ final class Http2FrameInboundWriter {
|
|||||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||||
return channel.newFailedFuture(cause);
|
return channel.newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return channel.voidPromise();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -856,7 +856,7 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
|||||||
// and verify that this only affect the writability of the parent channel while the child stays writable
|
// and verify that this only affect the writability of the parent channel while the child stays writable
|
||||||
// until it used all of its credits.
|
// until it used all of its credits.
|
||||||
parentChannel.unsafe().outboundBuffer().addMessage(
|
parentChannel.unsafe().outboundBuffer().addMessage(
|
||||||
Unpooled.buffer().writeZero(800), 800, parentChannel.voidPromise());
|
Unpooled.buffer().writeZero(800), 800, parentChannel.newPromise());
|
||||||
assertFalse(parentChannel.isWritable());
|
assertFalse(parentChannel.isWritable());
|
||||||
|
|
||||||
assertTrue(childChannel.isWritable());
|
assertTrue(childChannel.isWritable());
|
||||||
@ -959,7 +959,7 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
|||||||
public void callUnsafeCloseMultipleTimes() {
|
public void callUnsafeCloseMultipleTimes() {
|
||||||
LastInboundHandler inboundHandler = new LastInboundHandler();
|
LastInboundHandler inboundHandler = new LastInboundHandler();
|
||||||
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
|
Http2StreamChannel childChannel = newInboundStream(3, false, inboundHandler);
|
||||||
childChannel.unsafe().close(childChannel.voidPromise());
|
childChannel.unsafe().close(childChannel.newPromise());
|
||||||
|
|
||||||
ChannelPromise promise = childChannel.newPromise();
|
ChannelPromise promise = childChannel.newPromise();
|
||||||
childChannel.unsafe().close(promise);
|
childChannel.unsafe().close(promise);
|
||||||
|
@ -438,51 +438,6 @@ public final class Http2TestUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static ChannelPromise newVoidPromise(final Channel channel) {
|
|
||||||
return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE) {
|
|
||||||
@Override
|
|
||||||
public ChannelPromise addListener(
|
|
||||||
GenericFutureListener<? extends Future<? super Void>> listener) {
|
|
||||||
throw new AssertionFailedError();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise addListeners(
|
|
||||||
GenericFutureListener<? extends Future<? super Void>>... listeners) {
|
|
||||||
throw new AssertionFailedError();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isVoid() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean tryFailure(Throwable cause) {
|
|
||||||
channel().pipeline().fireExceptionCaught(cause);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise setFailure(Throwable cause) {
|
|
||||||
tryFailure(cause);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise unvoid() {
|
|
||||||
ChannelPromise promise =
|
|
||||||
new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
|
||||||
promise.addListener((ChannelFutureListener) future -> {
|
|
||||||
if (!future.isSuccess()) {
|
|
||||||
channel().pipeline().fireExceptionCaught(future.cause());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
static final class TestStreamByteDistributorStreamState implements StreamByteDistributor.StreamState {
|
static final class TestStreamByteDistributorStreamState implements StreamByteDistributor.StreamState {
|
||||||
private final Http2Stream stream;
|
private final Http2Stream stream;
|
||||||
boolean isWriteAllowed;
|
boolean isWriteAllowed;
|
||||||
|
@ -291,10 +291,6 @@ public final class BinaryMemcacheClientCodec extends
|
|||||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||||
return ctx.newFailedFuture(cause);
|
return ctx.newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return ctx.voidPromise();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -744,10 +744,5 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||||
return ctx.newFailedFuture(cause);
|
return ctx.newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return ctx.voidPromise();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -109,14 +109,8 @@ public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter {
|
|||||||
if (sizeMinusOne == 0) {
|
if (sizeMinusOne == 0) {
|
||||||
ctx.write(out.getUnsafe(0), promise);
|
ctx.write(out.getUnsafe(0), promise);
|
||||||
} else if (sizeMinusOne > 0) {
|
} else if (sizeMinusOne > 0) {
|
||||||
// Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
|
|
||||||
// See https://github.com/netty/netty/issues/2525
|
|
||||||
if (promise == ctx.voidPromise()) {
|
|
||||||
writeVoidPromise(ctx, out);
|
|
||||||
} else {
|
|
||||||
writePromiseCombiner(ctx, out, promise);
|
writePromiseCombiner(ctx, out, promise);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
out.recycle();
|
out.recycle();
|
||||||
}
|
}
|
||||||
@ -124,13 +118,6 @@ public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
|
|
||||||
final ChannelPromise voidPromise = ctx.voidPromise();
|
|
||||||
for (int i = 0; i < out.size(); i++) {
|
|
||||||
ctx.write(out.getUnsafe(i), voidPromise);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
|
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
|
||||||
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
|
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
|
||||||
for (int i = 0; i < out.size(); i++) {
|
for (int i = 0; i < out.size(); i++) {
|
||||||
|
@ -784,10 +784,6 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
|
|
||||||
private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
|
private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
|
||||||
if (pendingUnencryptedWrites.isEmpty()) {
|
if (pendingUnencryptedWrites.isEmpty()) {
|
||||||
// It's important to NOT use a voidPromise here as the user
|
|
||||||
// may want to add a ChannelFutureListener to the ChannelPromise later.
|
|
||||||
//
|
|
||||||
// See https://github.com/netty/netty/issues/3364
|
|
||||||
pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
|
pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
|
||||||
}
|
}
|
||||||
if (!handshakePromise.isDone()) {
|
if (!handshakePromise.isDone()) {
|
||||||
|
@ -300,7 +300,7 @@ public class IdleStateHandler implements ChannelHandler {
|
|||||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
// Allow writing with void promise if handler is only configured for read timeout events.
|
// Allow writing with void promise if handler is only configured for read timeout events.
|
||||||
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
|
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
|
||||||
ctx.write(msg, promise.unvoid()).addListener(writeListener);
|
ctx.write(msg, promise).addListener(writeListener);
|
||||||
} else {
|
} else {
|
||||||
ctx.write(msg, promise);
|
ctx.write(msg, promise);
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,6 @@ public class WriteTimeoutHandler implements ChannelHandler {
|
|||||||
@Override
|
@Override
|
||||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
if (timeoutNanos > 0) {
|
if (timeoutNanos > 0) {
|
||||||
promise = promise.unvoid();
|
|
||||||
scheduleTimeout(ctx, promise);
|
scheduleTimeout(ctx, promise);
|
||||||
}
|
}
|
||||||
ctx.write(msg, promise);
|
ctx.write(msg, promise);
|
||||||
|
@ -101,14 +101,14 @@ public class Http2FrameWriterDataBenchmark extends AbstractMicrobenchmark {
|
|||||||
@Benchmark
|
@Benchmark
|
||||||
@BenchmarkMode(Mode.AverageTime)
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
public void newWriter() {
|
public void newWriter() {
|
||||||
writer.writeData(ctx, 3, payload.retain(), padding, true, ctx.voidPromise());
|
writer.writeData(ctx, 3, payload.retain(), padding, true, ctx.newPromise());
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
@BenchmarkMode(Mode.AverageTime)
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
public void oldWriter() {
|
public void oldWriter() {
|
||||||
oldWriter.writeData(ctx, 3, payload.retain(), padding, true, ctx.voidPromise());
|
oldWriter.writeData(ctx, 3, payload.retain(), padding, true, ctx.newPromise());
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,9 +316,4 @@ public abstract class EmbeddedChannelHandlerContext implements ChannelHandlerCon
|
|||||||
public final ChannelFuture newFailedFuture(Throwable cause) {
|
public final ChannelFuture newFailedFuture(Throwable cause) {
|
||||||
return channel().newFailedFuture(cause);
|
return channel().newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final ChannelPromise voidPromise() {
|
|
||||||
return channel().voidPromise();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,7 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark {
|
|||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
if (msg instanceof ByteBuf) {
|
if (msg instanceof ByteBuf) {
|
||||||
ctx.writeAndFlush(msg, ctx.voidPromise());
|
ctx.writeAndFlush(msg);
|
||||||
} else {
|
} else {
|
||||||
throw new AssertionError();
|
throw new AssertionError();
|
||||||
}
|
}
|
||||||
@ -110,7 +110,7 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark {
|
|||||||
throw new IllegalStateException();
|
throw new IllegalStateException();
|
||||||
}
|
}
|
||||||
lastWritePromise = promise;
|
lastWritePromise = promise;
|
||||||
ctx.write(msg, ctx.voidPromise());
|
ctx.write(msg);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ public abstract class AbstractSslHandlerThroughputBenchmark extends AbstractSslH
|
|||||||
for (int i = 0; i < numWrites; ++i) {
|
for (int i = 0; i < numWrites; ++i) {
|
||||||
ByteBuf wrapSrcBuffer = this.wrapSrcBuffer.retainedSlice();
|
ByteBuf wrapSrcBuffer = this.wrapSrcBuffer.retainedSlice();
|
||||||
|
|
||||||
clientSslHandler.write(clientCtx, wrapSrcBuffer, clientCtx.voidPromise());
|
clientSslHandler.write(clientCtx, wrapSrcBuffer, clientCtx.newPromise());
|
||||||
}
|
}
|
||||||
clientSslHandler.flush(clientCtx);
|
clientSslHandler.flush(clientCtx);
|
||||||
return clientCtx.cumulation().retainedSlice();
|
return clientCtx.cumulation().retainedSlice();
|
||||||
|
@ -66,9 +66,6 @@ public class HttpObjectEncoderBenchmark extends AbstractMicrobenchmark {
|
|||||||
@Param({ "true", "false" })
|
@Param({ "true", "false" })
|
||||||
public boolean pooledAllocator;
|
public boolean pooledAllocator;
|
||||||
|
|
||||||
@Param({ "true", "false" })
|
|
||||||
public boolean voidPromise;
|
|
||||||
|
|
||||||
@Setup(Level.Trial)
|
@Setup(Level.Trial)
|
||||||
public void setup() {
|
public void setup() {
|
||||||
byte[] bytes = new byte[256];
|
byte[] bytes = new byte[256];
|
||||||
@ -121,6 +118,6 @@ public class HttpObjectEncoderBenchmark extends AbstractMicrobenchmark {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ChannelPromise newPromise() {
|
private ChannelPromise newPromise() {
|
||||||
return voidPromise ? context.voidPromise() : context.newPromise();
|
return context.newPromise();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,9 +56,6 @@ public class RedisEncoderBenchmark extends AbstractMicrobenchmark {
|
|||||||
@Param({ "true", "false" })
|
@Param({ "true", "false" })
|
||||||
public boolean pooledAllocator;
|
public boolean pooledAllocator;
|
||||||
|
|
||||||
@Param({ "true", "false" })
|
|
||||||
public boolean voidPromise;
|
|
||||||
|
|
||||||
@Param({ "50", "200", "1000" })
|
@Param({ "50", "200", "1000" })
|
||||||
public int arraySize;
|
public int arraySize;
|
||||||
|
|
||||||
@ -96,6 +93,6 @@ public class RedisEncoderBenchmark extends AbstractMicrobenchmark {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ChannelPromise newPromise() {
|
private ChannelPromise newPromise() {
|
||||||
return voidPromise ? context.voidPromise() : context.newPromise();
|
return context.newPromise();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -58,9 +58,6 @@ public class StompEncoderBenchmark extends AbstractMicrobenchmark {
|
|||||||
@Param({ "true", "false" })
|
@Param({ "true", "false" })
|
||||||
public boolean pooledAllocator;
|
public boolean pooledAllocator;
|
||||||
|
|
||||||
@Param({ "true", "false" })
|
|
||||||
public boolean voidPromise;
|
|
||||||
|
|
||||||
@Param
|
@Param
|
||||||
public ExampleStompHeadersSubframe.HeadersType headersType;
|
public ExampleStompHeadersSubframe.HeadersType headersType;
|
||||||
|
|
||||||
@ -100,7 +97,7 @@ public class StompEncoderBenchmark extends AbstractMicrobenchmark {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ChannelPromise newPromise() {
|
private ChannelPromise newPromise() {
|
||||||
return voidPromise? context.voidPromise() : context.newPromise();
|
return context.newPromise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1230,7 +1230,6 @@ public class DnsNameResolver extends InetNameResolver {
|
|||||||
boolean flush,
|
boolean flush,
|
||||||
ChannelPromise writePromise,
|
ChannelPromise writePromise,
|
||||||
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
|
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
|
||||||
assert !writePromise.isVoid();
|
|
||||||
|
|
||||||
final Promise<AddressedEnvelope<DnsResponse, InetSocketAddress>> castPromise = cast(
|
final Promise<AddressedEnvelope<DnsResponse, InetSocketAddress>> castPromise = cast(
|
||||||
requireNonNull(promise, "promise"));
|
requireNonNull(promise, "promise"));
|
||||||
|
@ -34,7 +34,6 @@ import org.junit.jupiter.api.TestInfo;
|
|||||||
import org.junit.jupiter.api.Timeout;
|
import org.junit.jupiter.api.Timeout;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
|
||||||
|
|
||||||
public class SocketEchoTest extends AbstractSocketTest {
|
public class SocketEchoTest extends AbstractSocketTest {
|
||||||
|
|
||||||
@ -52,7 +51,7 @@ public class SocketEchoTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
testSimpleEcho0(sb, cb, false, true);
|
testSimpleEcho0(sb, cb, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -62,29 +61,11 @@ public class SocketEchoTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
testSimpleEcho0(sb, cb, false, false);
|
testSimpleEcho0(sb, cb, false);
|
||||||
}
|
|
||||||
|
|
||||||
@Test//(timeout = 30000)
|
|
||||||
public void testSimpleEchoWithVoidPromise(TestInfo testInfo) throws Throwable {
|
|
||||||
run(testInfo, this::testSimpleEchoWithVoidPromise);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
testSimpleEcho0(sb, cb, true, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test//(timeout = 30000)
|
|
||||||
public void testSimpleEchoWithVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
|
|
||||||
run(testInfo, this::testSimpleEchoWithVoidPromiseNotAutoRead);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
testSimpleEcho0(sb, cb, true, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void testSimpleEcho0(
|
private static void testSimpleEcho0(
|
||||||
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, boolean autoRead)
|
ServerBootstrap sb, Bootstrap cb, boolean autoRead)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
|
|
||||||
final EchoHandler sh = new EchoHandler(autoRead);
|
final EchoHandler sh = new EchoHandler(autoRead);
|
||||||
@ -107,11 +88,7 @@ public class SocketEchoTest extends AbstractSocketTest {
|
|||||||
for (int i = 0; i < data.length;) {
|
for (int i = 0; i < data.length;) {
|
||||||
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
|
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
|
||||||
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
|
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
|
||||||
if (voidPromise) {
|
cc.writeAndFlush(buf);
|
||||||
assertEquals(cc.voidPromise(), cc.writeAndFlush(buf, cc.voidPromise()));
|
|
||||||
} else {
|
|
||||||
assertNotEquals(cc.voidPromise(), cc.writeAndFlush(buf));
|
|
||||||
}
|
|
||||||
i += length;
|
i += length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
|
||||||
|
|
||||||
public class SocketFileRegionTest extends AbstractSocketTest {
|
public class SocketFileRegionTest extends AbstractSocketTest {
|
||||||
|
|
||||||
@ -68,39 +67,21 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
|||||||
run(testInfo, this::testFileRegionNotAutoRead);
|
run(testInfo, this::testFileRegionNotAutoRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
|
|
||||||
run(testInfo, this::testFileRegionVoidPromise);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
|
|
||||||
run(testInfo, this::testFileRegionVoidPromiseNotAutoRead);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
|
public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
|
||||||
run(testInfo, this::testFileRegionCountLargerThenFile);
|
run(testInfo, this::testFileRegionCountLargerThenFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
testFileRegion0(sb, cb, false, true, true);
|
testFileRegion0(sb, cb, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
testFileRegion0(sb, cb, false, true, false);
|
testFileRegion0(sb, cb, true, false);
|
||||||
}
|
|
||||||
|
|
||||||
public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
testFileRegion0(sb, cb, true, true, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
testFileRegion0(sb, cb, false, false, true);
|
testFileRegion0(sb, cb, false, true);
|
||||||
}
|
|
||||||
|
|
||||||
public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
|
||||||
testFileRegion0(sb, cb, true, false, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
@ -132,7 +113,7 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void testFileRegion0(
|
private static void testFileRegion0(
|
||||||
ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
|
ServerBootstrap sb, Bootstrap cb, final boolean autoRead, boolean defaultFileRegion)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
sb.childOption(ChannelOption.AUTO_READ, autoRead);
|
sb.childOption(ChannelOption.AUTO_READ, autoRead);
|
||||||
cb.option(ChannelOption.AUTO_READ, autoRead);
|
cb.option(ChannelOption.AUTO_READ, autoRead);
|
||||||
@ -198,15 +179,9 @@ public class SocketFileRegionTest extends AbstractSocketTest {
|
|||||||
//
|
//
|
||||||
// See https://github.com/netty/netty/issues/2769
|
// See https://github.com/netty/netty/issues/2769
|
||||||
// https://github.com/netty/netty/issues/2964
|
// https://github.com/netty/netty/issues/2964
|
||||||
if (voidPromise) {
|
cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize));
|
||||||
assertEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize), cc.voidPromise()));
|
cc.write(emptyRegion);
|
||||||
assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
|
cc.writeAndFlush(region);
|
||||||
assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
|
|
||||||
} else {
|
|
||||||
assertNotEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize)));
|
|
||||||
assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
|
|
||||||
assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
|
|
||||||
}
|
|
||||||
|
|
||||||
while (sh.counter < data.length) {
|
while (sh.counter < data.length) {
|
||||||
if (sh.exception.get() != null) {
|
if (sh.exception.get() != null) {
|
||||||
|
@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
import static io.netty.buffer.Unpooled.compositeBuffer;
|
import static io.netty.buffer.Unpooled.compositeBuffer;
|
||||||
import static io.netty.buffer.Unpooled.wrappedBuffer;
|
import static io.netty.buffer.Unpooled.wrappedBuffer;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class SocketGatheringWriteTest extends AbstractSocketTest {
|
public class SocketGatheringWriteTest extends AbstractSocketTest {
|
||||||
@ -143,7 +142,6 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ChannelFuture cf = cc.writeAndFlush(Unpooled.EMPTY_BUFFER);
|
ChannelFuture cf = cc.writeAndFlush(Unpooled.EMPTY_BUFFER);
|
||||||
assertNotEquals(cc.voidPromise(), cf);
|
|
||||||
try {
|
try {
|
||||||
assertTrue(cf.await(60000));
|
assertTrue(cf.await(60000));
|
||||||
cf.sync();
|
cf.sync();
|
||||||
|
@ -489,7 +489,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
clearFlag(Native.EPOLLRDHUP);
|
clearFlag(Native.EPOLLRDHUP);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
pipeline().fireExceptionCaught(e);
|
pipeline().fireExceptionCaught(e);
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -513,7 +513,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
clearEpollIn();
|
clearEpollIn();
|
||||||
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||||
} else {
|
} else {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
} else if (!rdHup) {
|
} else if (!rdHup) {
|
||||||
inputClosedSeenErrorOnRead = true;
|
inputClosedSeenErrorOnRead = true;
|
||||||
@ -523,7 +523,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
|
|
||||||
private void fireEventAndClose(Object evt) {
|
private void fireEventAndClose(Object evt) {
|
||||||
pipeline().fireUserEventTriggered(evt);
|
pipeline().fireUserEventTriggered(evt);
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -574,7 +574,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
// When this happens there is something completely wrong with either the filedescriptor or epoll,
|
// When this happens there is something completely wrong with either the filedescriptor or epoll,
|
||||||
// so fire the exception through the pipeline and close the Channel.
|
// so fire the exception through the pipeline and close the Channel.
|
||||||
pipeline().fireExceptionCaught(e);
|
pipeline().fireExceptionCaught(e);
|
||||||
unsafe().close(unsafe().voidPromise());
|
unsafe().close(newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -605,7 +605,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
if (connectPromise != null && !connectPromise.isDone()
|
if (connectPromise != null && !connectPromise.isDone()
|
||||||
&& connectPromise.tryFailure(new ConnectTimeoutException(
|
&& connectPromise.tryFailure(new ConnectTimeoutException(
|
||||||
"connection timed out: " + remoteAddress))) {
|
"connection timed out: " + remoteAddress))) {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
@ -616,7 +616,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
connectTimeoutFuture.cancel(false);
|
connectTimeoutFuture.cancel(false);
|
||||||
}
|
}
|
||||||
connectPromise = null;
|
connectPromise = null;
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -649,7 +649,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
|
|
||||||
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
||||||
if (!promiseSet) {
|
if (!promiseSet) {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,7 +168,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
|
|||||||
case 0:
|
case 0:
|
||||||
break readLoop;
|
break readLoop;
|
||||||
case -1:
|
case -1:
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
return;
|
return;
|
||||||
default:
|
default:
|
||||||
allocHandle.incMessagesRead(1);
|
allocHandle.incMessagesRead(1);
|
||||||
|
@ -375,7 +375,7 @@ public class EpollHandler implements IoHandler {
|
|||||||
AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
|
AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
|
||||||
|
|
||||||
for (AbstractEpollChannel ch: localChannels) {
|
for (AbstractEpollChannel ch: localChannels) {
|
||||||
ch.unsafe().close(ch.unsafe().voidPromise());
|
ch.unsafe().close(ch.newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,7 +460,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
|
|||||||
}
|
}
|
||||||
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||||
} else {
|
} else {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
} else if (!readEOF) {
|
} else if (!readEOF) {
|
||||||
inputClosedSeenErrorOnRead = true;
|
inputClosedSeenErrorOnRead = true;
|
||||||
@ -520,13 +520,13 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
|
|||||||
// When this happens there is something completely wrong with either the filedescriptor or epoll,
|
// When this happens there is something completely wrong with either the filedescriptor or epoll,
|
||||||
// so fire the exception through the pipeline and close the Channel.
|
// so fire the exception through the pipeline and close the Channel.
|
||||||
pipeline().fireExceptionCaught(e);
|
pipeline().fireExceptionCaught(e);
|
||||||
unsafe().close(unsafe().voidPromise());
|
unsafe().close(newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fireEventAndClose(Object evt) {
|
private void fireEventAndClose(Object evt) {
|
||||||
pipeline().fireUserEventTriggered(evt);
|
pipeline().fireUserEventTriggered(evt);
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -556,7 +556,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
|
|||||||
if (connectPromise != null && !connectPromise.isDone()
|
if (connectPromise != null && !connectPromise.isDone()
|
||||||
&& connectPromise.tryFailure(new ConnectTimeoutException(
|
&& connectPromise.tryFailure(new ConnectTimeoutException(
|
||||||
"connection timed out: " + remoteAddress))) {
|
"connection timed out: " + remoteAddress))) {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
@ -567,7 +567,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
|
|||||||
connectTimeoutFuture.cancel(false);
|
connectTimeoutFuture.cancel(false);
|
||||||
}
|
}
|
||||||
connectPromise = null;
|
connectPromise = null;
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -600,7 +600,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
|
|||||||
|
|
||||||
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
||||||
if (!promiseSet) {
|
if (!promiseSet) {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,7 +163,7 @@ public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel
|
|||||||
break readLoop;
|
break readLoop;
|
||||||
case -1:
|
case -1:
|
||||||
allocHandle.lastBytesRead(-1);
|
allocHandle.lastBytesRead(-1);
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
return;
|
return;
|
||||||
default:
|
default:
|
||||||
allocHandle.lastBytesRead(1);
|
allocHandle.lastBytesRead(1);
|
||||||
|
@ -342,7 +342,7 @@ public final class KQueueHandler implements IoHandler {
|
|||||||
AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
|
AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
|
||||||
|
|
||||||
for (AbstractKQueueChannel ch: localChannels) {
|
for (AbstractKQueueChannel ch: localChannels) {
|
||||||
ch.unsafe().close(ch.unsafe().voidPromise());
|
ch.unsafe().close(ch.newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
private final Unsafe unsafe;
|
private final Unsafe unsafe;
|
||||||
private final ChannelPipeline pipeline;
|
private final ChannelPipeline pipeline;
|
||||||
private final ChannelFuture succeedFuture;
|
private final ChannelFuture succeedFuture;
|
||||||
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
|
|
||||||
private final CloseFuture closeFuture;
|
private final CloseFuture closeFuture;
|
||||||
|
|
||||||
private volatile SocketAddress localAddress;
|
private volatile SocketAddress localAddress;
|
||||||
@ -438,11 +437,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
return strVal;
|
return strVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final ChannelPromise voidPromise() {
|
|
||||||
return pipeline.voidPromise();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final void readIfIsAutoRead() {
|
protected final void readIfIsAutoRead() {
|
||||||
if (config().isAutoRead()) {
|
if (config().isAutoRead()) {
|
||||||
read();
|
read();
|
||||||
@ -679,7 +673,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
if (closeFuture.isDone()) {
|
if (closeFuture.isDone()) {
|
||||||
// Closed already.
|
// Closed already.
|
||||||
safeSetSuccess(promise);
|
safeSetSuccess(promise);
|
||||||
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
|
} else {
|
||||||
// This means close() was called before so we just register a listener and return
|
// This means close() was called before so we just register a listener and return
|
||||||
closeFuture.addListener((ChannelFutureListener) future -> promise.setSuccess());
|
closeFuture.addListener((ChannelFutureListener) future -> promise.setSuccess());
|
||||||
}
|
}
|
||||||
@ -740,7 +734,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
|
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
|
||||||
deregister(voidPromise(), wasActive && !isActive());
|
deregister(newPromise(), wasActive && !isActive());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -825,7 +819,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
doBeginRead();
|
doBeginRead();
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
invokeLater(() -> pipeline.fireExceptionCaught(e));
|
invokeLater(() -> pipeline.fireExceptionCaught(e));
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -936,13 +930,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
* may still return {@code true} even if the channel should be closed as result of the exception.
|
* may still return {@code true} even if the channel should be closed as result of the exception.
|
||||||
*/
|
*/
|
||||||
initialCloseCause = t;
|
initialCloseCause = t;
|
||||||
close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
|
close(newPromise(), t, newClosedChannelException(t, "flush0()"), false);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
shutdownOutput(voidPromise(), t);
|
shutdownOutput(newPromise(), t);
|
||||||
} catch (Throwable t2) {
|
} catch (Throwable t2) {
|
||||||
initialCloseCause = t;
|
initialCloseCause = t;
|
||||||
close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
|
close(newPromise(), t2, newClosedChannelException(t, "flush0()"), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -956,13 +950,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
return exception;
|
return exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final ChannelPromise voidPromise() {
|
|
||||||
assertEventLoop();
|
|
||||||
|
|
||||||
return unsafeVoidPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final boolean ensureOpen(ChannelPromise promise) {
|
protected final boolean ensureOpen(ChannelPromise promise) {
|
||||||
if (isOpen()) {
|
if (isOpen()) {
|
||||||
return true;
|
return true;
|
||||||
@ -976,7 +963,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
|
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
|
||||||
*/
|
*/
|
||||||
protected final void safeSetSuccess(ChannelPromise promise) {
|
protected final void safeSetSuccess(ChannelPromise promise) {
|
||||||
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
|
if (!promise.trySuccess()) {
|
||||||
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -985,7 +972,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
* Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message.
|
* Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message.
|
||||||
*/
|
*/
|
||||||
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
|
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
|
||||||
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
if (!promise.tryFailure(cause)) {
|
||||||
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -994,7 +981,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
|||||||
if (isOpen()) {
|
if (isOpen()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void invokeLater(Runnable task) {
|
private void invokeLater(Runnable task) {
|
||||||
|
@ -54,7 +54,7 @@ public abstract class AbstractCoalescingBufferQueue {
|
|||||||
* @param promise to complete when all the bytes have been consumed and written, can be void.
|
* @param promise to complete when all the bytes have been consumed and written, can be void.
|
||||||
*/
|
*/
|
||||||
public final void addFirst(ByteBuf buf, ChannelPromise promise) {
|
public final void addFirst(ByteBuf buf, ChannelPromise promise) {
|
||||||
addFirst(buf, toChannelFutureListener(promise));
|
addFirst(buf, (ChannelFutureListener) new DelegatingChannelPromiseNotifier(promise));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
|
private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
|
||||||
@ -81,7 +81,7 @@ public abstract class AbstractCoalescingBufferQueue {
|
|||||||
public final void add(ByteBuf buf, ChannelPromise promise) {
|
public final void add(ByteBuf buf, ChannelPromise promise) {
|
||||||
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||||
// before we complete it's promise.
|
// before we complete it's promise.
|
||||||
add(buf, toChannelFutureListener(promise));
|
add(buf, (ChannelFutureListener) new DelegatingChannelPromiseNotifier(promise));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -230,7 +230,10 @@ public abstract class AbstractCoalescingBufferQueue {
|
|||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
if (previousBuf != null) {
|
if (previousBuf != null) {
|
||||||
decrementReadableBytes(previousBuf.readableBytes());
|
decrementReadableBytes(previousBuf.readableBytes());
|
||||||
ctx.write(previousBuf, ctx.voidPromise());
|
// If the write fails we want to at least propagate the exception through the ChannelPipeline
|
||||||
|
// as otherwise the user will not be made aware of the failure at all.
|
||||||
|
ctx.write(previousBuf).addListener(
|
||||||
|
ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -238,7 +241,10 @@ public abstract class AbstractCoalescingBufferQueue {
|
|||||||
if (entry instanceof ByteBufConvertible) {
|
if (entry instanceof ByteBufConvertible) {
|
||||||
if (previousBuf != null) {
|
if (previousBuf != null) {
|
||||||
decrementReadableBytes(previousBuf.readableBytes());
|
decrementReadableBytes(previousBuf.readableBytes());
|
||||||
ctx.write(previousBuf, ctx.voidPromise());
|
// If the write fails we want to at least propagate the exception through the ChannelPipeline
|
||||||
|
// as otherwise the user will not be made aware of the failure at all.
|
||||||
|
ctx.write(previousBuf).addListener(
|
||||||
|
ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
||||||
}
|
}
|
||||||
previousBuf = ((ByteBufConvertible) entry).asByteBuf();
|
previousBuf = ((ByteBufConvertible) entry).asByteBuf();
|
||||||
} else if (entry instanceof ChannelPromise) {
|
} else if (entry instanceof ChannelPromise) {
|
||||||
@ -380,8 +386,4 @@ public abstract class AbstractCoalescingBufferQueue {
|
|||||||
tracker.decrementPendingOutboundBytes(decrement);
|
tracker.decrementPendingOutboundBytes(decrement);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
|
|
||||||
return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -203,7 +203,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl
|
|||||||
* <li>{@link #closeForcibly()}</li>
|
* <li>{@link #closeForcibly()}</li>
|
||||||
* <li>{@link #register(ChannelPromise)}</li>
|
* <li>{@link #register(ChannelPromise)}</li>
|
||||||
* <li>{@link #deregister(ChannelPromise)}</li>
|
* <li>{@link #deregister(ChannelPromise)}</li>
|
||||||
* <li>{@link #voidPromise()}</li>
|
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
interface Unsafe {
|
interface Unsafe {
|
||||||
@ -287,13 +286,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl
|
|||||||
*/
|
*/
|
||||||
void flush();
|
void flush();
|
||||||
|
|
||||||
/**
|
|
||||||
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
|
|
||||||
* It will never be notified of a success or error and so is only a placeholder for operations
|
|
||||||
* that take a {@link ChannelPromise} as argument but for which you not want to get notified.
|
|
||||||
*/
|
|
||||||
ChannelPromise voidPromise();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
|
* Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
|
||||||
*/
|
*/
|
||||||
|
@ -193,20 +193,4 @@ public interface ChannelFuture extends Future<Void> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
ChannelFuture awaitUninterruptibly();
|
ChannelFuture awaitUninterruptibly();
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
|
|
||||||
* following methods:
|
|
||||||
* <ul>
|
|
||||||
* <li>{@link #addListener(GenericFutureListener)}</li>
|
|
||||||
* <li>{@link #addListeners(GenericFutureListener[])}</li>
|
|
||||||
* <li>{@link #await()}</li>
|
|
||||||
* <li>{@link #await(long, TimeUnit)} ()}</li>
|
|
||||||
* <li>{@link #await(long)} ()}</li>
|
|
||||||
* <li>{@link #awaitUninterruptibly()}</li>
|
|
||||||
* <li>{@link #sync()}</li>
|
|
||||||
* <li>{@link #syncUninterruptibly()}</li>
|
|
||||||
* </ul>
|
|
||||||
*/
|
|
||||||
boolean isVoid();
|
|
||||||
}
|
}
|
||||||
|
@ -702,15 +702,11 @@ public final class ChannelOutboundBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void safeSuccess(ChannelPromise promise) {
|
private static void safeSuccess(ChannelPromise promise) {
|
||||||
// Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
|
PromiseNotificationUtil.trySuccess(promise, null, logger);
|
||||||
// false.
|
|
||||||
PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
||||||
// Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
|
PromiseNotificationUtil.tryFailure(promise, cause, logger);
|
||||||
// false.
|
|
||||||
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@ -279,20 +279,4 @@ public interface ChannelOutboundInvoker {
|
|||||||
* every call of blocking methods will just return without blocking.
|
* every call of blocking methods will just return without blocking.
|
||||||
*/
|
*/
|
||||||
ChannelFuture newFailedFuture(Throwable cause);
|
ChannelFuture newFailedFuture(Throwable cause);
|
||||||
|
|
||||||
/**
|
|
||||||
* Return a special ChannelPromise which can be reused for different operations.
|
|
||||||
* <p>
|
|
||||||
* It's only supported to use
|
|
||||||
* it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)}.
|
|
||||||
* </p>
|
|
||||||
* <p>
|
|
||||||
* Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used
|
|
||||||
* if you want to save an object allocation for every write operation. You will not be able to detect if the
|
|
||||||
* operation was complete, only if it failed as the implementation will call
|
|
||||||
* {@link ChannelPipeline#fireExceptionCaught(Throwable)} in this case.
|
|
||||||
* </p>
|
|
||||||
* <strong>Be aware this is an expert feature and should be used with care!</strong>
|
|
||||||
*/
|
|
||||||
ChannelPromise voidPromise();
|
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,4 @@ public interface ChannelProgressivePromise extends ProgressivePromise<Void>, Cha
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
ChannelProgressivePromise setProgress(long progress, long total);
|
ChannelProgressivePromise setProgress(long progress, long total);
|
||||||
|
|
||||||
@Override
|
|
||||||
ChannelProgressivePromise unvoid();
|
|
||||||
}
|
}
|
||||||
|
@ -60,9 +60,4 @@ public interface ChannelPromise extends ChannelFuture, Promise<Void> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
ChannelPromise awaitUninterruptibly();
|
ChannelPromise awaitUninterruptibly();
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
|
|
||||||
*/
|
|
||||||
ChannelPromise unvoid();
|
|
||||||
}
|
}
|
||||||
|
@ -558,11 +558,6 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
|
|||||||
return ctx.newFailedFuture(cause);
|
return ctx.newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return ctx.voidPromise();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> Attribute<T> attr(AttributeKey<T> key) {
|
public <T> Attribute<T> attr(AttributeKey<T> key) {
|
||||||
return ctx.channel().attr(key);
|
return ctx.channel().attr(key);
|
||||||
|
@ -104,9 +104,4 @@ abstract class CompleteChannelFuture extends CompleteFuture<Void> implements Cha
|
|||||||
public Void getNow() {
|
public Void getNow() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isVoid() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -456,7 +456,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
@Override
|
@Override
|
||||||
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
|
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
|
||||||
requireNonNull(localAddress, "localAddress");
|
requireNonNull(localAddress, "localAddress");
|
||||||
if (isNotValidPromise(promise, false)) {
|
if (isNotValidPromise(promise)) {
|
||||||
// cancelled
|
// cancelled
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
@ -496,7 +496,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
public ChannelFuture connect(
|
public ChannelFuture connect(
|
||||||
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
||||||
requireNonNull(remoteAddress, "remoteAddress");
|
requireNonNull(remoteAddress, "remoteAddress");
|
||||||
if (isNotValidPromise(promise, false)) {
|
if (isNotValidPromise(promise)) {
|
||||||
// cancelled
|
// cancelled
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
@ -535,7 +535,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
return close(promise);
|
return close(promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isNotValidPromise(promise, false)) {
|
if (isNotValidPromise(promise)) {
|
||||||
// cancelled
|
// cancelled
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
@ -568,7 +568,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture close(final ChannelPromise promise) {
|
public ChannelFuture close(final ChannelPromise promise) {
|
||||||
if (isNotValidPromise(promise, false)) {
|
if (isNotValidPromise(promise)) {
|
||||||
// cancelled
|
// cancelled
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
@ -601,7 +601,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture register(final ChannelPromise promise) {
|
public ChannelFuture register(final ChannelPromise promise) {
|
||||||
if (isNotValidPromise(promise, false)) {
|
if (isNotValidPromise(promise)) {
|
||||||
// cancelled
|
// cancelled
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
@ -634,7 +634,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture deregister(final ChannelPromise promise) {
|
public ChannelFuture deregister(final ChannelPromise promise) {
|
||||||
if (isNotValidPromise(promise, false)) {
|
if (isNotValidPromise(promise)) {
|
||||||
// cancelled
|
// cancelled
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
@ -733,7 +733,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
findAndInvokeFlush();
|
findAndInvokeFlush();
|
||||||
} else {
|
} else {
|
||||||
Tasks tasks = invokeTasks();
|
Tasks tasks = invokeTasks();
|
||||||
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null);
|
safeExecute(executor, tasks.invokeFlushTask,
|
||||||
|
// If flush throws we want to at least propagate the exception through the ChannelPipeline
|
||||||
|
// as otherwise the user will not be made aware of the failure at all.
|
||||||
|
newPromise().addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
@ -768,7 +771,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
||||||
requireNonNull(msg, "msg");
|
requireNonNull(msg, "msg");
|
||||||
try {
|
try {
|
||||||
if (isNotValidPromise(promise, true)) {
|
if (isNotValidPromise(promise)) {
|
||||||
ReferenceCountUtil.release(msg);
|
ReferenceCountUtil.release(msg);
|
||||||
// cancelled
|
// cancelled
|
||||||
return;
|
return;
|
||||||
@ -815,9 +818,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
|
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
|
||||||
// Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
|
PromiseNotificationUtil.tryFailure(promise, cause, logger);
|
||||||
// false.
|
|
||||||
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -840,7 +841,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
return pipeline().newFailedFuture(cause);
|
return pipeline().newFailedFuture(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
|
private boolean isNotValidPromise(ChannelPromise promise) {
|
||||||
requireNonNull(promise, "promise");
|
requireNonNull(promise, "promise");
|
||||||
|
|
||||||
if (promise.isDone()) {
|
if (promise.isDone()) {
|
||||||
@ -863,11 +864,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (promise instanceof AbstractChannel.CloseFuture) {
|
if (promise instanceof AbstractChannel.CloseFuture) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
|
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
|
||||||
@ -897,11 +893,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return channel().voidPromise();
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean setAddComplete() {
|
boolean setAddComplete() {
|
||||||
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
|
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
|
||||||
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
|
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
|
||||||
@ -977,8 +968,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
|||||||
ReferenceCountUtil.release(msg);
|
ReferenceCountUtil.release(msg);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (promise != null) {
|
||||||
promise.setFailure(cause);
|
promise.setFailure(cause);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
private final Channel channel;
|
private final Channel channel;
|
||||||
private final ChannelFuture succeededFuture;
|
private final ChannelFuture succeededFuture;
|
||||||
private final VoidChannelPromise voidPromise;
|
|
||||||
private final boolean touch = ResourceLeakDetector.isEnabled();
|
private final boolean touch = ResourceLeakDetector.isEnabled();
|
||||||
private final List<DefaultChannelHandlerContext> handlers = new ArrayList<>(4);
|
private final List<DefaultChannelHandlerContext> handlers = new ArrayList<>(4);
|
||||||
|
|
||||||
@ -77,7 +76,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
public DefaultChannelPipeline(Channel channel) {
|
public DefaultChannelPipeline(Channel channel) {
|
||||||
this.channel = requireNonNull(channel, "channel");
|
this.channel = requireNonNull(channel, "channel");
|
||||||
succeededFuture = new SucceededChannelFuture(channel, channel.eventLoop());
|
succeededFuture = new SucceededChannelFuture(channel, channel.eventLoop());
|
||||||
voidPromise = new VoidChannelPromise(channel, true);
|
|
||||||
|
|
||||||
tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER);
|
tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER);
|
||||||
head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER);
|
head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER);
|
||||||
@ -953,11 +951,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
return new FailedChannelFuture(channel(), executor(), cause);
|
return new FailedChannelFuture(channel(), executor(), cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public final ChannelPromise voidPromise() {
|
|
||||||
return voidPromise;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
|
* Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
|
||||||
* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
|
* in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
|
||||||
|
@ -158,14 +158,4 @@ public class DefaultChannelProgressivePromise
|
|||||||
super.checkDeadLock();
|
super.checkDeadLock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelProgressivePromise unvoid() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isVoid() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -149,14 +149,4 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
|
|||||||
super.checkDeadLock();
|
super.checkDeadLock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise unvoid() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isVoid() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ public final class DelegatingChannelPromiseNotifier implements ChannelPromise, C
|
|||||||
private final boolean logNotifyFailure;
|
private final boolean logNotifyFailure;
|
||||||
|
|
||||||
public DelegatingChannelPromiseNotifier(ChannelPromise delegate) {
|
public DelegatingChannelPromiseNotifier(ChannelPromise delegate) {
|
||||||
this(delegate, !(delegate instanceof VoidChannelPromise));
|
this(delegate, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DelegatingChannelPromiseNotifier(ChannelPromise delegate, boolean logNotifyFailure) {
|
public DelegatingChannelPromiseNotifier(ChannelPromise delegate, boolean logNotifyFailure) {
|
||||||
@ -143,16 +143,6 @@ public final class DelegatingChannelPromiseNotifier implements ChannelPromise, C
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isVoid() {
|
|
||||||
return delegate.isVoid();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise unvoid() {
|
|
||||||
return isVoid() ? new DelegatingChannelPromiseNotifier(delegate.unvoid()) : this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
|
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
return delegate.await(timeout, unit);
|
return delegate.await(timeout, unit);
|
||||||
|
@ -141,9 +141,7 @@ public final class PendingWriteQueue {
|
|||||||
Object msg = write.msg;
|
Object msg = write.msg;
|
||||||
ChannelPromise promise = write.promise;
|
ChannelPromise promise = write.promise;
|
||||||
recycle(write, false);
|
recycle(write, false);
|
||||||
if (!(promise instanceof VoidChannelPromise)) {
|
|
||||||
combiner.add(promise);
|
combiner.add(promise);
|
||||||
}
|
|
||||||
ctx.write(msg, promise);
|
ctx.write(msg, promise);
|
||||||
write = next;
|
write = next;
|
||||||
}
|
}
|
||||||
@ -276,7 +274,7 @@ public final class PendingWriteQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
||||||
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
if (!promise.tryFailure(cause)) {
|
||||||
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,241 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2012 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package io.netty.channel;
|
|
||||||
|
|
||||||
import static java.util.Objects.requireNonNull;
|
|
||||||
|
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
|
||||||
import io.netty.util.concurrent.Future;
|
|
||||||
import io.netty.util.concurrent.GenericFutureListener;
|
|
||||||
import io.netty.util.internal.UnstableApi;
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
@UnstableApi
|
|
||||||
public final class VoidChannelPromise implements ChannelPromise {
|
|
||||||
|
|
||||||
private final Channel channel;
|
|
||||||
// Will be null if we should not propagate exceptions through the pipeline on failure case.
|
|
||||||
private final ChannelFutureListener fireExceptionListener;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new instance.
|
|
||||||
*
|
|
||||||
* @param channel the {@link Channel} associated with this future
|
|
||||||
*/
|
|
||||||
public VoidChannelPromise(final Channel channel, boolean fireException) {
|
|
||||||
requireNonNull(channel, "channel");
|
|
||||||
this.channel = channel;
|
|
||||||
if (fireException) {
|
|
||||||
fireExceptionListener = future -> {
|
|
||||||
Throwable cause = future.cause();
|
|
||||||
if (cause != null) {
|
|
||||||
fireException0(cause);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
fireExceptionListener = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventExecutor executor() {
|
|
||||||
return channel.eventLoop();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
|
|
||||||
fail();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
|
|
||||||
fail();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
|
|
||||||
// NOOP
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
|
|
||||||
// NOOP
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise await() throws InterruptedException {
|
|
||||||
if (Thread.interrupted()) {
|
|
||||||
throw new InterruptedException();
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean await(long timeout, TimeUnit unit) {
|
|
||||||
fail();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean await(long timeoutMillis) {
|
|
||||||
fail();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise awaitUninterruptibly() {
|
|
||||||
fail();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
|
|
||||||
fail();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean awaitUninterruptibly(long timeoutMillis) {
|
|
||||||
fail();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Channel channel() {
|
|
||||||
return channel;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDone() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isSuccess() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean setUncancellable() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isCancellable() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isCancelled() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Throwable cause() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise sync() {
|
|
||||||
fail();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise syncUninterruptibly() {
|
|
||||||
fail();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise setFailure(Throwable cause) {
|
|
||||||
fireException0(cause);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise setSuccess() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean tryFailure(Throwable cause) {
|
|
||||||
fireException0(cause);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritDoc}
|
|
||||||
*
|
|
||||||
* @param mayInterruptIfRunning this value has no effect in this implementation.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean trySuccess() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void fail() {
|
|
||||||
throw new IllegalStateException("void future");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public VoidChannelPromise setSuccess(Void result) {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean trySuccess(Void result) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Void getNow() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise unvoid() {
|
|
||||||
ChannelPromise promise = new DefaultChannelPromise(channel);
|
|
||||||
if (fireExceptionListener != null) {
|
|
||||||
promise.addListener(fireExceptionListener);
|
|
||||||
}
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isVoid() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void fireException0(Throwable cause) {
|
|
||||||
// Only fire the exception if the channel is open and registered
|
|
||||||
// if not the pipeline is not setup and so it would hit the tail
|
|
||||||
// of the pipeline.
|
|
||||||
// See https://github.com/netty/netty/issues/1517
|
|
||||||
if (fireExceptionListener != null && channel.isRegistered()) {
|
|
||||||
channel.pipeline().fireExceptionCaught(cause);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -342,7 +342,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
p.fireChannelRead(m);
|
p.fireChannelRead(m);
|
||||||
}
|
}
|
||||||
|
|
||||||
flushInbound(false, voidPromise());
|
flushInbound(false, null);
|
||||||
return isNotEmpty(inboundMessages);
|
return isNotEmpty(inboundMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -366,7 +366,8 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
if (checkOpen(true)) {
|
if (checkOpen(true)) {
|
||||||
pipeline().fireChannelRead(msg);
|
pipeline().fireChannelRead(msg);
|
||||||
}
|
}
|
||||||
return checkException(promise);
|
checkException(promise);
|
||||||
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -375,7 +376,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
* @see #flushOutbound()
|
* @see #flushOutbound()
|
||||||
*/
|
*/
|
||||||
public EmbeddedChannel flushInbound() {
|
public EmbeddedChannel flushInbound() {
|
||||||
flushInbound(true, voidPromise());
|
flushInbound(true, null);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -386,7 +387,8 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
runPendingTasks();
|
runPendingTasks();
|
||||||
}
|
}
|
||||||
|
|
||||||
return checkException(promise);
|
checkException(promise);
|
||||||
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -450,7 +452,8 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
if (checkOpen(true)) {
|
if (checkOpen(true)) {
|
||||||
return write(msg, promise);
|
return write(msg, promise);
|
||||||
}
|
}
|
||||||
return checkException(promise);
|
checkException(promise);
|
||||||
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -462,7 +465,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
if (checkOpen(true)) {
|
if (checkOpen(true)) {
|
||||||
flushOutbound0();
|
flushOutbound0();
|
||||||
}
|
}
|
||||||
checkException(voidPromise());
|
checkException(null);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -640,26 +643,27 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
/**
|
/**
|
||||||
* Checks for the presence of an {@link Exception}.
|
* Checks for the presence of an {@link Exception}.
|
||||||
*/
|
*/
|
||||||
private ChannelFuture checkException(ChannelPromise promise) {
|
private void checkException(ChannelPromise promise) {
|
||||||
Throwable t = lastException;
|
Throwable t = lastException;
|
||||||
if (t != null) {
|
if (t != null) {
|
||||||
lastException = null;
|
lastException = null;
|
||||||
|
|
||||||
if (promise.isVoid()) {
|
if (promise == null) {
|
||||||
PlatformDependent.throwException(t);
|
PlatformDependent.throwException(t);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
return promise.setFailure(t);
|
promise.setFailure(t);
|
||||||
|
} else if (promise != null) {
|
||||||
|
promise.setSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
return promise.setSuccess();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if there was any {@link Throwable} received and if so rethrow it.
|
* Check if there was any {@link Throwable} received and if so rethrow it.
|
||||||
*/
|
*/
|
||||||
public void checkException() {
|
public void checkException() {
|
||||||
checkException(voidPromise());
|
checkException(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -848,11 +852,6 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
mayRunPendingTasks();
|
mayRunPendingTasks();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelPromise voidPromise() {
|
|
||||||
return EmbeddedUnsafe.this.voidPromise();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelOutboundBuffer outboundBuffer() {
|
public ChannelOutboundBuffer outboundBuffer() {
|
||||||
return EmbeddedUnsafe.this.outboundBuffer();
|
return EmbeddedUnsafe.this.outboundBuffer();
|
||||||
|
@ -132,22 +132,6 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
|
|||||||
*/
|
*/
|
||||||
ChannelGroupFuture write(Object message, ChannelMatcher matcher);
|
ChannelGroupFuture write(Object message, ChannelMatcher matcher);
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes the specified {@code message} to all {@link Channel}s in this
|
|
||||||
* group that are matched by the given {@link ChannelMatcher}. If the specified {@code message} is an instance of
|
|
||||||
* {@link ByteBuf}, it is automatically
|
|
||||||
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
|
|
||||||
* condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as
|
|
||||||
* {@link Channel#write(Object)} is.
|
|
||||||
*
|
|
||||||
* If {@code voidPromise} is {@code true} {@link Channel#voidPromise()} is used for the writes and so the same
|
|
||||||
* restrictions to the returned {@link ChannelGroupFuture} apply as to a void promise.
|
|
||||||
*
|
|
||||||
* @return the {@link ChannelGroupFuture} instance that notifies when
|
|
||||||
* the operation is done for all channels
|
|
||||||
*/
|
|
||||||
ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush all {@link Channel}s in this
|
* Flush all {@link Channel}s in this
|
||||||
* group. If the specified {@code messages} are an instance of
|
* group. If the specified {@code messages} are an instance of
|
||||||
@ -191,12 +175,6 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
|
|||||||
*/
|
*/
|
||||||
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher);
|
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher);
|
||||||
|
|
||||||
/**
|
|
||||||
* Shortcut for calling {@link #write(Object, ChannelMatcher, boolean)} and {@link #flush()} and only act on
|
|
||||||
* {@link Channel}s that are matched by the {@link ChannelMatcher}.
|
|
||||||
*/
|
|
||||||
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Use {@link #writeAndFlush(Object, ChannelMatcher)} instead.
|
* @deprecated Use {@link #writeAndFlush(Object, ChannelMatcher)} instead.
|
||||||
*/
|
*/
|
||||||
|
@ -246,31 +246,16 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
|
public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
|
||||||
return write(message, matcher, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise) {
|
|
||||||
requireNonNull(message, "message");
|
requireNonNull(message, "message");
|
||||||
requireNonNull(matcher, "matcher");
|
requireNonNull(matcher, "matcher");
|
||||||
|
|
||||||
final ChannelGroupFuture future;
|
|
||||||
if (voidPromise) {
|
|
||||||
for (Channel c: nonServerChannels.values()) {
|
|
||||||
if (matcher.matches(c)) {
|
|
||||||
c.write(safeDuplicate(message), c.voidPromise());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
future = voidFuture;
|
|
||||||
} else {
|
|
||||||
Map<Channel, ChannelFuture> futures = new LinkedHashMap<>(nonServerChannels.size());
|
Map<Channel, ChannelFuture> futures = new LinkedHashMap<>(nonServerChannels.size());
|
||||||
for (Channel c: nonServerChannels.values()) {
|
for (Channel c: nonServerChannels.values()) {
|
||||||
if (matcher.matches(c)) {
|
if (matcher.matches(c)) {
|
||||||
futures.put(c, c.write(safeDuplicate(message)));
|
futures.put(c, c.write(safeDuplicate(message)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
future = new DefaultChannelGroupFuture(this, futures, executor);
|
final ChannelGroupFuture future = new DefaultChannelGroupFuture(this, futures, executor);
|
||||||
}
|
|
||||||
ReferenceCountUtil.release(message);
|
ReferenceCountUtil.release(message);
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
@ -380,30 +365,15 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
|
public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
|
||||||
return writeAndFlush(message, matcher, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise) {
|
|
||||||
requireNonNull(message, "message");
|
requireNonNull(message, "message");
|
||||||
|
|
||||||
final ChannelGroupFuture future;
|
|
||||||
if (voidPromise) {
|
|
||||||
for (Channel c: nonServerChannels.values()) {
|
|
||||||
if (matcher.matches(c)) {
|
|
||||||
c.writeAndFlush(safeDuplicate(message), c.voidPromise());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
future = voidFuture;
|
|
||||||
} else {
|
|
||||||
Map<Channel, ChannelFuture> futures = new LinkedHashMap<>(nonServerChannels.size());
|
Map<Channel, ChannelFuture> futures = new LinkedHashMap<>(nonServerChannels.size());
|
||||||
for (Channel c: nonServerChannels.values()) {
|
for (Channel c: nonServerChannels.values()) {
|
||||||
if (matcher.matches(c)) {
|
if (matcher.matches(c)) {
|
||||||
futures.put(c, c.writeAndFlush(safeDuplicate(message)));
|
futures.put(c, c.writeAndFlush(safeDuplicate(message)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
future = new DefaultChannelGroupFuture(this, futures, executor);
|
final ChannelGroupFuture future = new DefaultChannelGroupFuture(this, futures, executor);
|
||||||
}
|
|
||||||
ReferenceCountUtil.release(message);
|
ReferenceCountUtil.release(message);
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ public class LocalChannel extends AbstractChannel {
|
|||||||
|
|
||||||
private void tryClose(boolean isActive) {
|
private void tryClose(boolean isActive) {
|
||||||
if (isActive) {
|
if (isActive) {
|
||||||
unsafe().close(unsafe().voidPromise());
|
unsafe().close(newPromise());
|
||||||
} else {
|
} else {
|
||||||
releaseInboundBuffers();
|
releaseInboundBuffers();
|
||||||
|
|
||||||
@ -419,7 +419,7 @@ public class LocalChannel extends AbstractChannel {
|
|||||||
doBind(localAddress);
|
doBind(localAddress);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
safeSetFailure(promise, t);
|
safeSetFailure(promise, t);
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -428,7 +428,7 @@ public class LocalChannel extends AbstractChannel {
|
|||||||
if (!(boundChannel instanceof LocalServerChannel)) {
|
if (!(boundChannel instanceof LocalServerChannel)) {
|
||||||
Exception cause = new ConnectException("connection refused: " + remoteAddress);
|
Exception cause = new ConnectException("connection refused: " + remoteAddress);
|
||||||
safeSetFailure(promise, cause);
|
safeSetFailure(promise, cause);
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -472,5 +472,10 @@ public class LocalChannel extends AbstractChannel {
|
|||||||
@Override
|
@Override
|
||||||
public void deregister0() {
|
public void deregister0() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPromise newPromise() {
|
||||||
|
return LocalChannel.this.newPromise();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,8 +16,10 @@
|
|||||||
package io.netty.channel.local;
|
package io.netty.channel.local;
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelPromise;
|
||||||
|
|
||||||
interface LocalChannelUnsafe extends Channel.Unsafe {
|
interface LocalChannelUnsafe extends Channel.Unsafe {
|
||||||
void register0();
|
void register0();
|
||||||
void deregister0();
|
void deregister0();
|
||||||
|
ChannelPromise newPromise();
|
||||||
}
|
}
|
||||||
|
@ -75,7 +75,7 @@ public final class LocalHandler implements IoHandler {
|
|||||||
@Override
|
@Override
|
||||||
public void prepareToDestroy() {
|
public void prepareToDestroy() {
|
||||||
for (LocalChannelUnsafe unsafe : registeredChannels) {
|
for (LocalChannelUnsafe unsafe : registeredChannels) {
|
||||||
unsafe.close(unsafe.voidPromise());
|
unsafe.close(unsafe.newPromise());
|
||||||
}
|
}
|
||||||
registeredChannels.clear();
|
registeredChannels.clear();
|
||||||
}
|
}
|
||||||
|
@ -171,5 +171,10 @@ public class LocalServerChannel extends AbstractServerChannel {
|
|||||||
@Override
|
@Override
|
||||||
public void deregister0() {
|
public void deregister0() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelPromise newPromise() {
|
||||||
|
return LocalServerChannel.this.newPromise();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
shutdownInput();
|
shutdownInput();
|
||||||
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||||
} else {
|
} else {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
inputClosedSeenErrorOnRead = true;
|
inputClosedSeenErrorOnRead = true;
|
||||||
|
@ -243,7 +243,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
|||||||
if (connectPromise != null && !connectPromise.isDone()
|
if (connectPromise != null && !connectPromise.isDone()
|
||||||
&& connectPromise.tryFailure(new ConnectTimeoutException(
|
&& connectPromise.tryFailure(new ConnectTimeoutException(
|
||||||
"connection timed out: " + remoteAddress))) {
|
"connection timed out: " + remoteAddress))) {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
@ -254,7 +254,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
|||||||
connectTimeoutFuture.cancel(false);
|
connectTimeoutFuture.cancel(false);
|
||||||
}
|
}
|
||||||
connectPromise = null;
|
connectPromise = null;
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -286,7 +286,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
|||||||
|
|
||||||
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
||||||
if (!promiseSet) {
|
if (!promiseSet) {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
|
|||||||
if (closed) {
|
if (closed) {
|
||||||
inputShutdown = true;
|
inputShutdown = true;
|
||||||
if (isOpen()) {
|
if (isOpen()) {
|
||||||
close(voidPromise());
|
close(newPromise());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
readIfIsAutoRead();
|
readIfIsAutoRead();
|
||||||
|
@ -329,7 +329,7 @@ public final class NioHandler implements IoHandler {
|
|||||||
logger.warn("Failed to re-register a Channel to the new Selector.", e);
|
logger.warn("Failed to re-register a Channel to the new Selector.", e);
|
||||||
if (a instanceof AbstractNioChannel) {
|
if (a instanceof AbstractNioChannel) {
|
||||||
AbstractNioChannel ch = (AbstractNioChannel) a;
|
AbstractNioChannel ch = (AbstractNioChannel) a;
|
||||||
ch.unsafe().close(ch.unsafe().voidPromise());
|
ch.unsafe().close(ch.newPromise());
|
||||||
} else {
|
} else {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
|
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
|
||||||
@ -573,7 +573,7 @@ public final class NioHandler implements IoHandler {
|
|||||||
if (!k.isValid()) {
|
if (!k.isValid()) {
|
||||||
|
|
||||||
// close the channel if the key is not valid anymore
|
// close the channel if the key is not valid anymore
|
||||||
unsafe.close(unsafe.voidPromise());
|
unsafe.close(ch.newPromise());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -603,7 +603,7 @@ public final class NioHandler implements IoHandler {
|
|||||||
unsafe.read();
|
unsafe.read();
|
||||||
}
|
}
|
||||||
} catch (CancelledKeyException ignored) {
|
} catch (CancelledKeyException ignored) {
|
||||||
unsafe.close(unsafe.voidPromise());
|
unsafe.close(ch.newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -651,7 +651,7 @@ public final class NioHandler implements IoHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (AbstractNioChannel ch: channels) {
|
for (AbstractNioChannel ch: channels) {
|
||||||
ch.unsafe().close(ch.unsafe().voidPromise());
|
ch.unsafe().close(ch.newPromise());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ public class ChannelOutboundBufferTest {
|
|||||||
|
|
||||||
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
|
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
|
||||||
ByteBuffer nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes());
|
ByteBuffer nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes());
|
||||||
buffer.addMessage(buf, buf.readableBytes(), channel.voidPromise());
|
buffer.addMessage(buf, buf.readableBytes(), channel.newPromise());
|
||||||
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
||||||
buffer.addFlush();
|
buffer.addFlush();
|
||||||
ByteBuffer[] buffers = buffer.nioBuffers();
|
ByteBuffer[] buffers = buffer.nioBuffers();
|
||||||
@ -82,7 +82,7 @@ public class ChannelOutboundBufferTest {
|
|||||||
|
|
||||||
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
|
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
|
||||||
for (int i = 0; i < 64; i++) {
|
for (int i = 0; i < 64; i++) {
|
||||||
buffer.addMessage(buf.copy(), buf.readableBytes(), channel.voidPromise());
|
buffer.addMessage(buf.copy(), buf.readableBytes(), channel.newPromise());
|
||||||
}
|
}
|
||||||
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
||||||
buffer.addFlush();
|
buffer.addFlush();
|
||||||
@ -106,7 +106,7 @@ public class ChannelOutboundBufferTest {
|
|||||||
for (int i = 0; i < 65; i++) {
|
for (int i = 0; i < 65; i++) {
|
||||||
comp.addComponent(true, buf.copy());
|
comp.addComponent(true, buf.copy());
|
||||||
}
|
}
|
||||||
buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise());
|
buffer.addMessage(comp, comp.readableBytes(), channel.newPromise());
|
||||||
|
|
||||||
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
||||||
buffer.addFlush();
|
buffer.addFlush();
|
||||||
@ -135,7 +135,7 @@ public class ChannelOutboundBufferTest {
|
|||||||
comp.addComponent(true, buf.copy());
|
comp.addComponent(true, buf.copy());
|
||||||
}
|
}
|
||||||
assertEquals(65, comp.nioBufferCount());
|
assertEquals(65, comp.nioBufferCount());
|
||||||
buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise());
|
buffer.addMessage(comp, comp.readableBytes(), channel.newPromise());
|
||||||
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
assertEquals(0, buffer.nioBufferCount(), "Should still be 0 as not flushed yet");
|
||||||
buffer.addFlush();
|
buffer.addFlush();
|
||||||
final int maxCount = 10; // less than comp.nioBufferCount()
|
final int maxCount = 10; // less than comp.nioBufferCount()
|
||||||
|
@ -58,7 +58,7 @@ public class CoalescingBufferQueueTest {
|
|||||||
mouseSuccess = future.isSuccess();
|
mouseSuccess = future.isSuccess();
|
||||||
};
|
};
|
||||||
emptyPromise = newPromise();
|
emptyPromise = newPromise();
|
||||||
voidPromise = channel.voidPromise();
|
voidPromise = channel.newPromise();
|
||||||
|
|
||||||
cat = Unpooled.wrappedBuffer("cat".getBytes(CharsetUtil.US_ASCII));
|
cat = Unpooled.wrappedBuffer("cat".getBytes(CharsetUtil.US_ASCII));
|
||||||
mouse = Unpooled.wrappedBuffer("mouse".getBytes(CharsetUtil.US_ASCII));
|
mouse = Unpooled.wrappedBuffer("mouse".getBytes(CharsetUtil.US_ASCII));
|
||||||
|
@ -743,19 +743,6 @@ public class DefaultChannelPipelineTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUnexpectedVoidChannelPromise() throws Exception {
|
|
||||||
ChannelPipeline pipeline = newLocalChannel().pipeline();
|
|
||||||
pipeline.channel().register().sync();
|
|
||||||
|
|
||||||
try {
|
|
||||||
ChannelPromise promise = new VoidChannelPromise(pipeline.channel(), false);
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> pipeline.close(promise));
|
|
||||||
} finally {
|
|
||||||
pipeline.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnexpectedVoidChannelPromiseCloseFuture() throws Exception {
|
public void testUnexpectedVoidChannelPromiseCloseFuture() throws Exception {
|
||||||
ChannelPipeline pipeline = newLocalChannel().pipeline();
|
ChannelPipeline pipeline = newLocalChannel().pipeline();
|
||||||
@ -1183,36 +1170,6 @@ public class DefaultChannelPipelineTest {
|
|||||||
pipeline.addBefore("test", null, newHandler());
|
pipeline.addBefore("test", null, newHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
@Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
|
|
||||||
public void testVoidPromiseNotify() throws Throwable {
|
|
||||||
EventLoopGroup defaultGroup = new MultithreadEventLoopGroup(1, LocalHandler.newFactory());
|
|
||||||
EventLoop eventLoop1 = defaultGroup.next();
|
|
||||||
ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline();
|
|
||||||
|
|
||||||
final Promise<Throwable> promise = eventLoop1.newPromise();
|
|
||||||
final Exception exception = new IllegalArgumentException();
|
|
||||||
try {
|
|
||||||
pipeline1.channel().register().syncUninterruptibly();
|
|
||||||
pipeline1.addLast(new ChannelHandler() {
|
|
||||||
@Override
|
|
||||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
|
||||||
throw exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
|
||||||
promise.setSuccess(cause);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
pipeline1.write("test", pipeline1.voidPromise());
|
|
||||||
assertSame(exception, promise.syncUninterruptibly().getNow());
|
|
||||||
} finally {
|
|
||||||
pipeline1.channel().close().syncUninterruptibly();
|
|
||||||
defaultGroup.shutdownGracefully();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test for https://github.com/netty/netty/issues/8676.
|
// Test for https://github.com/netty/netty/issues/8676.
|
||||||
@Test
|
@Test
|
||||||
public void testHandlerRemovedOnlyCalledWhenHandlerAddedCalled() throws Exception {
|
public void testHandlerRemovedOnlyCalledWhenHandlerAddedCalled() throws Exception {
|
||||||
|
@ -267,32 +267,6 @@ public class PendingWriteQueueTest {
|
|||||||
assertEquals(3L, (long) channel.readOutbound());
|
assertEquals(3L, (long) channel.readOutbound());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveAndWriteAllWithVoidPromise() {
|
|
||||||
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() {
|
|
||||||
@Override
|
|
||||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
|
||||||
// Convert to writeAndFlush(...) so the promise will be notified by the transport.
|
|
||||||
ctx.writeAndFlush(msg, promise);
|
|
||||||
}
|
|
||||||
}, new ChannelHandler() { });
|
|
||||||
|
|
||||||
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().lastContext());
|
|
||||||
|
|
||||||
ChannelPromise promise = channel.newPromise();
|
|
||||||
channel.eventLoop().execute(() -> {
|
|
||||||
queue.add(1L, promise);
|
|
||||||
queue.add(2L, channel.voidPromise());
|
|
||||||
queue.removeAndWriteAll();
|
|
||||||
});
|
|
||||||
|
|
||||||
assertTrue(channel.finish());
|
|
||||||
assertTrue(promise.isDone());
|
|
||||||
assertTrue(promise.isSuccess());
|
|
||||||
assertEquals(1L, (long) channel.readOutbound());
|
|
||||||
assertEquals(2L, (long) channel.readOutbound());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Disabled("Need to verify and think about if the assumptions made by this test are valid at all.")
|
@Disabled("Need to verify and think about if the assumptions made by this test are valid at all.")
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveAndFailAllReentrantWrite() {
|
public void testRemoveAndFailAllReentrantWrite() {
|
||||||
|
Loading…
Reference in New Issue
Block a user