From b57d9f307f61161b62a335734438fb0c49260c84 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 10 Jul 2013 13:00:42 +0200 Subject: [PATCH] Allow per-write promises and disallow promises on flush() - write() now accepts a ChannelPromise and returns ChannelFuture as most users expected. It makes the user's life much easier because it is now much easier to get notified when a specific message has been written. - flush() does not create a ChannelPromise nor returns ChannelFuture. It is now similar to what read() looks like. --- .gitignore | 4 +- .../codec/http/HttpContentEncoder.java | 3 + .../handler/codec/http/HttpObjectEncoder.java | 6 +- .../websocketx/WebSocket08FrameDecoder.java | 3 +- .../websocketx/WebSocketClientHandshaker.java | 4 +- .../websocketx/WebSocketProtocolHandler.java | 2 +- .../websocketx/WebSocketServerHandshaker.java | 4 +- .../WebSocketServerHandshaker00.java | 2 +- .../WebSocketServerProtocolHandler.java | 4 +- ...bSocketServerProtocolHandshakeHandler.java | 2 +- .../codec/spdy/SpdySessionHandler.java | 22 ++-- .../WebSocketServerProtocolHandlerTest.java | 6 +- .../codec/spdy/SpdyFrameDecoderTest.java | 2 +- .../codec/spdy/SpdySessionHandlerTest.java | 14 +-- .../handler/codec/ByteToMessageCodec.java | 5 +- .../handler/codec/MessageToByteEncoder.java | 17 ++- .../handler/codec/MessageToMessageCodec.java | 5 +- .../codec/MessageToMessageEncoder.java | 27 +++- .../codec/compression/JZlibEncoder.java | 2 +- .../codec/compression/JdkZlibEncoder.java | 2 +- .../example/discard/DiscardClientHandler.java | 2 +- .../netty/example/echo/EchoClientHandler.java | 2 +- .../factorial/FactorialClientHandler.java | 13 +- .../factorial/FactorialServerHandler.java | 2 +- .../file/HttpStaticFileServerHandler.java | 11 +- .../HttpHelloWorldServerHandler.java | 6 +- .../example/http/upload/HttpUploadClient.java | 6 +- .../http/upload/HttpUploadServerHandler.java | 2 +- .../autobahn/AutobahnServerHandler.java | 2 +- .../html5/CustomTextFrameHandler.java | 2 +- .../server/WebSocketServerHandler.java | 2 +- .../sslserver/WebSocketSslServerHandler.java | 2 +- .../io/netty/example/localecho/LocalEcho.java | 2 +- .../proxy/HexDumpProxyBackendHandler.java | 2 +- .../proxy/HexDumpProxyFrontendHandler.java | 4 +- .../example/qotm/QuoteOfTheMomentClient.java | 4 +- .../example/securechat/SecureChatClient.java | 2 +- .../example/socksproxy/RelayHandler.java | 2 +- .../socksproxy/SocksServerConnectHandler.java | 2 +- .../example/socksproxy/SocksServerUtils.java | 2 +- .../io/netty/example/telnet/TelnetClient.java | 2 +- .../example/telnet/TelnetServerHandler.java | 2 +- .../netty/handler/logging/LoggingHandler.java | 8 +- .../java/io/netty/handler/ssl/SslHandler.java | 41 +++---- .../handler/stream/ChunkedWriteHandler.java | 115 ++++++++++-------- .../handler/timeout/IdleStateHandler.java | 4 +- .../handler/timeout/WriteTimeoutHandler.java | 4 +- .../AbstractTrafficShapingHandler.java | 5 +- .../stream/ChunkedWriteHandlerTest.java | 4 +- .../transport/sctp/SctpEchoTest.java | 4 +- .../socket/DatagramMulticastTest.java | 4 +- .../transport/socket/DatagramUnicastTest.java | 2 +- .../socket/SocketBufReleaseTest.java | 2 +- .../transport/socket/SocketEchoTest.java | 4 +- .../socket/SocketFileRegionTest.java | 4 +- .../socket/SocketFixedLengthEchoTest.java | 2 +- .../socket/SocketGatheringWriteTest.java | 2 +- .../socket/SocketObjectEchoTest.java | 2 +- .../SocketShutdownOutputBySelfTest.java | 2 +- .../transport/socket/SocketSpdyEchoTest.java | 2 +- .../transport/socket/SocketSslEchoTest.java | 4 +- .../socket/SocketStringEchoTest.java | 2 +- .../socket/WriteBeforeRegisteredTest.java | 2 +- .../netty/test/udt/util/EchoByteHandler.java | 4 +- .../test/udt/util/EchoMessageHandler.java | 4 +- .../io/netty/channel/AbstractChannel.java | 69 ++++------- .../netty/channel/AbstractServerChannel.java | 11 +- .../main/java/io/netty/channel/Channel.java | 6 +- .../netty/channel/ChannelDuplexHandler.java | 8 +- .../netty/channel/ChannelHandlerContext.java | 2 +- .../netty/channel/ChannelOutboundBuffer.java | 64 ++++------ .../netty/channel/ChannelOutboundHandler.java | 9 +- .../ChannelOutboundHandlerAdapter.java | 14 ++- .../netty/channel/ChannelOutboundInvoker.java | 22 ++-- .../io/netty/channel/ChannelPipeline.java | 6 +- .../channel/CombinedChannelDuplexHandler.java | 8 +- .../channel/DefaultChannelHandlerContext.java | 109 +++++++---------- .../netty/channel/DefaultChannelPipeline.java | 22 ++-- .../java/io/netty/channel/MessageList.java | 40 +++--- .../channel/embedded/EmbeddedChannel.java | 36 ++++-- .../io/netty/channel/group/ChannelGroup.java | 4 +- .../channel/group/DefaultChannelGroup.java | 16 +-- .../channel/DefaultChannelPipelineTest.java | 4 +- .../netty/channel/local/LocalChannelTest.java | 2 +- .../local/LocalTransportThreadModelTest.java | 23 ++-- .../local/LocalTransportThreadModelTest2.java | 4 +- .../local/LocalTransportThreadModelTest3.java | 4 +- 87 files changed, 470 insertions(+), 452 deletions(-) diff --git a/.gitignore b/.gitignore index 6db0f78ba2..6416fff4ad 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ /target */target /reports -*/reports \ No newline at end of file +*/reports +.DS_Store + diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java index 0f9eed2c0f..07ed61fc22 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentEncoder.java @@ -275,6 +275,9 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec out) throws Exception { if (frame instanceof PingWebSocketFrame) { frame.content().retain(); - ctx.channel().write(new PongWebSocketFrame(frame.content())).flush(); + ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content())); return; } if (frame instanceof PongWebSocketFrame) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java index 8b0fe91795..e144258f09 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker.java @@ -158,7 +158,7 @@ public abstract class WebSocketServerHandshaker { logger.debug(String.format("%s WS Version %s server handshake", channel, version())); } FullHttpResponse response = newHandshakeResponse(req, responseHeaders); - channel.write(response).flush().addListener(new ChannelFutureListener() { + channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { @@ -225,7 +225,7 @@ public abstract class WebSocketServerHandshaker { if (channel == null) { throw new NullPointerException("channel"); } - return channel.write(frame).flush(promise).addListener(ChannelFutureListener.CLOSE); + return channel.writeAndFlush(frame, promise).addListener(ChannelFutureListener.CLOSE); } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java index b96f5d5f2e..0200b30b4e 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java @@ -177,7 +177,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker { */ @Override public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) { - return channel.write(frame).flush(promise); + return channel.writeAndFlush(frame, promise); } @Override diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java index c5bdde5c92..d2998289c9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java @@ -107,7 +107,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler { if (cause instanceof WebSocketHandshakeException) { FullHttpResponse response = new DefaultFullHttpResponse( HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(cause.getMessage().getBytes())); - ctx.channel().write(response).flush().addListener(ChannelFutureListener.CLOSE); + ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } else { ctx.close(); } @@ -128,7 +128,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler { if (msg instanceof FullHttpRequest) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN); - ctx.channel().write(response).flush(); + ctx.channel().writeAndFlush(response); } else { ctx.fireChannelRead(msg); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java index 041d6fba2a..5dfb065c5c 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java @@ -82,7 +82,7 @@ class WebSocketServerProtocolHandshakeHandler } private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { - ChannelFuture f = ctx.channel().write(res).flush(); + ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java index 1085717a2d..f13dc51f6e 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java @@ -159,7 +159,7 @@ public class SpdySessionHandler while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) { SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId, spdyDataFrame.content().readSlice(initialReceiveWindowSize).retain()); - ctx.write(partialDataFrame).flush(); + ctx.writeAndFlush(partialDataFrame); } } @@ -169,7 +169,7 @@ public class SpdySessionHandler spdySession.updateReceiveWindowSize(streamId, deltaWindowSize); SpdyWindowUpdateFrame spdyWindowUpdateFrame = new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize); - ctx.write(spdyWindowUpdateFrame).flush(); + ctx.writeAndFlush(spdyWindowUpdateFrame); } } @@ -307,7 +307,7 @@ public class SpdySessionHandler SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg; if (isRemoteInitiatedID(spdyPingFrame.getId())) { - ctx.write(spdyPingFrame).flush(); + ctx.writeAndFlush(spdyPingFrame); return; } @@ -392,7 +392,7 @@ public class SpdySessionHandler } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof SpdyDataFrame || msg instanceof SpdySynStreamFrame || msg instanceof SpdySynReplyFrame || @@ -403,13 +403,13 @@ public class SpdySessionHandler msg instanceof SpdyHeadersFrame || msg instanceof SpdyWindowUpdateFrame) { - handleOutboundMessage(ctx, msg); + handleOutboundMessage(ctx, msg, promise); } else { - ctx.write(msg); + ctx.write(msg, promise); } } - private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg) throws Exception { + private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof SpdyDataFrame) { SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg; @@ -470,7 +470,7 @@ public class SpdySessionHandler // } //}); - ctx.write(partialDataFrame); + ctx.write(partialDataFrame, promise); return; } else { // Window size is large enough to send entire data frame @@ -599,7 +599,7 @@ public class SpdySessionHandler throw PROTOCOL_EXCEPTION; } - ctx.write(msg); + ctx.write(msg, promise); } /* @@ -633,7 +633,7 @@ public class SpdySessionHandler removeStream(ctx, streamId); SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status); - ctx.write(spdyRstStreamFrame).flush(); + ctx.writeAndFlush(spdyRstStreamFrame); if (fireChannelRead) { ctx.fireChannelRead(spdyRstStreamFrame); } @@ -827,7 +827,7 @@ public class SpdySessionHandler if (!sentGoAwayFrame) { sentGoAwayFrame = true; SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status); - return ctx.write(spdyGoAwayFrame).flush(); + return ctx.writeAndFlush(spdyGoAwayFrame); } else { return ctx.newSucceededFuture(); } diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java index 704c6f0981..78373c73df 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java @@ -146,13 +146,13 @@ public class WebSocketServerProtocolHandlerTest { private class MockOutboundHandler extends ChannelOutboundHandlerAdapter { @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { responses.add((FullHttpResponse) msg); + promise.setSuccess(); } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - promise.setSuccess(); + public void flush(ChannelHandlerContext ctx) throws Exception { } } diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java index 33e10049dd..03e2e0b205 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java @@ -103,7 +103,7 @@ public class SpdyFrameDecoderTest { } private static void sendAndWaitForFrame(Channel cc, SpdyFrame frame, CaptureHandler handler) { - cc.write(frame).flush(); + cc.writeAndFlush(frame); long theFuture = System.currentTimeMillis() + 3000; while (handler.message == null && System.currentTimeMillis() < theFuture) { try { diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java index 15f0e9dcbd..3e06701db2 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java @@ -288,18 +288,18 @@ public class SpdySessionHandlerTest { SpdySynStreamFrame spdySynStreamFrame = new DefaultSpdySynStreamFrame(streamId, 0, (byte) 0); spdySynStreamFrame.setLast(true); - ctx.write(spdySynStreamFrame).flush(); + ctx.writeAndFlush(spdySynStreamFrame); spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2); - ctx.write(spdySynStreamFrame).flush(); + ctx.writeAndFlush(spdySynStreamFrame); spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2); - ctx.write(spdySynStreamFrame).flush(); + ctx.writeAndFlush(spdySynStreamFrame); spdySynStreamFrame.setStreamId(spdySynStreamFrame.getStreamId() + 2); - ctx.write(spdySynStreamFrame).flush(); + ctx.writeAndFlush(spdySynStreamFrame); // Limit the number of concurrent streams to 3 SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 3); - ctx.write(spdySettingsFrame).flush(); + ctx.writeAndFlush(spdySettingsFrame); } @Override @@ -315,7 +315,7 @@ public class SpdySessionHandlerTest { spdySynReplyFrame.headers().add(entry.getKey(), entry.getValue()); } - ctx.write(spdySynReplyFrame).flush(); + ctx.writeAndFlush(spdySynReplyFrame); } return; } @@ -328,7 +328,7 @@ public class SpdySessionHandlerTest { msg instanceof SpdyPingFrame || msg instanceof SpdyHeadersFrame) { - ctx.write(msg).flush(); + ctx.writeAndFlush(msg); return; } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java index 637a319a7a..8c7f6cdd1d 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java @@ -18,6 +18,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.util.internal.TypeParameterMatcher; import java.util.List; @@ -83,8 +84,8 @@ public abstract class ByteToMessageCodec extends ChannelDuplexHandler { } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { - encoder.write(ctx, msg); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + encoder.write(ctx, msg, promise); } /** diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java index ff0aec5d8f..87b17515f8 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java @@ -16,8 +16,10 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.TypeParameterMatcher; @@ -67,7 +69,7 @@ public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdap } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { @@ -85,13 +87,16 @@ public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdap } finally { ReferenceCountUtil.release(cast); } - } else { - ctx.write(msg); - } - if (buf != null && buf.isReadable()) { - ctx.write(buf); + if (buf.isReadable()) { + ctx.write(buf, promise); + } else { + buf.release(); + ctx.write(Unpooled.EMPTY_BUFFER, promise); + } buf = null; + } else { + ctx.write(msg, promise); } } catch (EncoderException e) { throw e; diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java index 5b796fe28e..c5cd73e13d 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageCodec.java @@ -17,6 +17,7 @@ package io.netty.handler.codec; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCounted; import io.netty.util.internal.TypeParameterMatcher; @@ -101,8 +102,8 @@ public abstract class MessageToMessageCodec extends Cha } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { - encoder.write(ctx, msg); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + encoder.write(ctx, msg, promise); } /** diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 94eb3d0066..47422ce93b 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -17,9 +17,11 @@ package io.netty.handler.codec; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.internal.RecyclableArrayList; +import io.netty.util.internal.StringUtil; import io.netty.util.internal.TypeParameterMatcher; import java.util.List; @@ -62,10 +64,11 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandlerA } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { - RecyclableArrayList out = RecyclableArrayList.newInstance(); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + RecyclableArrayList out = null; try { if (acceptOutboundMessage(msg)) { + out = RecyclableArrayList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { @@ -73,18 +76,30 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandlerA } finally { ReferenceCountUtil.release(cast); } + + if (out.isEmpty()) { + out.recycle(); + out = null; + + throw new EncoderException( + StringUtil.simpleClassName(this) + " must produce at least one message."); + } } else { - out.add(msg); + ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable t) { throw new EncoderException(t); } finally { - for (int i = 0; i < out.size(); i ++) { - ctx.write(out.get(i)); + if (out != null) { + final int sizeMinusOne = out.size() - 1; + for (int i = 0; i < sizeMinusOne; i ++) { + ctx.write(out.get(i)); + } + ctx.write(out.get(sizeMinusOne), promise); + out.recycle(); } - out.recycle(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index e75db5eeb0..c451875b4c 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -385,7 +385,7 @@ public class JZlibEncoder extends ZlibEncoder { } } - return ctx.write(footer).flush(promise); + return ctx.writeAndFlush(footer, promise); } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index d3957f4e65..4e4dce040e 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -255,7 +255,7 @@ public class JdkZlibEncoder extends ZlibEncoder { deflater.end(); } - return ctx.write(footer).flush(promise); + return ctx.writeAndFlush(footer, promise); } @Override diff --git a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java index e4ed4b3b05..396c43049d 100644 --- a/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java +++ b/example/src/main/java/io/netty/example/discard/DiscardClientHandler.java @@ -82,7 +82,7 @@ public class DiscardClientHandler extends SimpleChannelInboundHandler { private void generateTraffic() { // Flush the outbound buffer to the socket. // Once flushed, generate the same amount of traffic again. - ctx.write(content.duplicate().retain()).flush().addListener(trafficGenerator); + ctx.writeAndFlush(content.duplicate().retain()).addListener(trafficGenerator); } private final ChannelFutureListener trafficGenerator = new ChannelFutureListener() { diff --git a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java index a6d18784cd..a70c0570b5 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java +++ b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java @@ -50,7 +50,7 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { - ctx.write(firstMessage).flush(); + ctx.writeAndFlush(firstMessage); } @Override diff --git a/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java b/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java index 244041984c..1e1264a9ef 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialClientHandler.java @@ -94,21 +94,18 @@ public class FactorialClientHandler extends SimpleChannelInboundHandler> entries = headers.entries(); - channel.write(request).flush().sync(); + channel.writeAndFlush(request).sync(); // Wait for the server to close the connection. channel.closeFuture().sync(); @@ -276,7 +276,7 @@ public class HttpUploadClient { if (bodyRequestEncoder.isChunked()) { // could do either request.isChunked() // either do it through ChunkedWriteHandler - channel.write(bodyRequestEncoder).flush().awaitUninterruptibly(); + channel.writeAndFlush(bodyRequestEncoder).awaitUninterruptibly(); } // Do not clear here since we will reuse the InterfaceHttpData on the @@ -351,7 +351,7 @@ public class HttpUploadClient { // test if request was chunked and if so, finish the write if (bodyRequestEncoder.isChunked()) { - channel.write(bodyRequestEncoder).flush().awaitUninterruptibly(); + channel.writeAndFlush(bodyRequestEncoder).awaitUninterruptibly(); } // Now no more use of file representation (and list of HttpData) diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java index 4f70df8377..7f92458f1b 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java @@ -313,7 +313,7 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler } // Send the response and close the connection if necessary. - ChannelFuture f = ctx.channel().write(res).flush(); + ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } diff --git a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java index d79cfda853..b60ce3fdbd 100644 --- a/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java +++ b/example/src/main/java/io/netty/example/http/websocketx/sslserver/WebSocketSslServerHandler.java @@ -137,7 +137,7 @@ public class WebSocketSslServerHandler extends SimpleChannelInboundHandler { // We do not need to write a ChannelBuffer here. // We know the encoder inserted at TelnetPipelineFactory will do the conversion. - ChannelFuture future = ctx.write(response).flush(); + ChannelFuture future = ctx.writeAndFlush(response); // Close the connection after sending 'Have a good day!' // if the client has sent 'bye'. diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index 75ad56881f..fecd112da9 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -297,17 +297,17 @@ public class LoggingHandler extends ChannelDuplexHandler { } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { logMessage(ctx, "WRITE", msg); - ctx.write(msg); + ctx.write(msg, promise); } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + public void flush(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "FLUSH")); } - ctx.flush(promise); + ctx.flush(); } private void logMessage(ChannelHandlerContext ctx, String eventName, Object msg) { diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 2abd7c4a79..83ef85bbb3 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -27,7 +27,6 @@ import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.EventExecutor; @@ -322,7 +321,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH engine.closeOutbound(); future.addListener(closeNotifyWriteListener); try { - flush(ctx, future); + write(ctx, Unpooled.EMPTY_BUFFER, future); + flush(ctx); } catch (Exception e) { if (!future.tryFailure(e)) { logger.warn("flush() raised a masked exception.", e); @@ -395,7 +395,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + public void flush(ChannelHandlerContext ctx) throws Exception { // Do not encrypt the first write request if this handler is // created with startTLS flag turned on. if (startTls && !sentFirstMessage) { @@ -405,29 +405,21 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH if (pendingWrite == null) { break; } - ctx.write(pendingWrite.buf); - assert pendingWrite.promise == null; + ctx.write(pendingWrite.buf, pendingWrite.promise); } - ctx.flush(promise); + ctx.flush(); return; } if (pendingUnencryptedWrites.isEmpty()) { - pendingUnencryptedWrites.add(new PendingWrite(Unpooled.EMPTY_BUFFER, promise)); - } else { - PendingWrite write = pendingUnencryptedWrites.peekLast(); - if (write.promise == null) { - write.promise = promise; - } else { - write.promise.addListener(new ChannelPromiseNotifier(promise)); - } + pendingUnencryptedWrites.add(new PendingWrite(Unpooled.EMPTY_BUFFER, null)); } flush0(ctx); } @Override - public void write(final ChannelHandlerContext ctx, Object msg) + public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - pendingUnencryptedWrites.add(new PendingWrite((ByteBuf) msg)); + pendingUnencryptedWrites.add(new PendingWrite((ByteBuf) msg, promise)); } private void flush0(ChannelHandlerContext ctx) throws SSLException { @@ -472,13 +464,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } else { switch (result.getHandshakeStatus()) { case NEED_WRAP: - ctx.write(out); - if (promise != null) { - ctx.flush(promise); + ctx.writeAndFlush(out, promise); promise = null; } else { - ctx.flush(); + ctx.writeAndFlush(out); } out = ctx.alloc().buffer(); continue; @@ -518,12 +508,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH throw e; } finally { if (out != null && out.isReadable()) { - ctx.write(out); - if (promise != null) { - ctx.flush(promise); + ctx.writeAndFlush(out, promise); } else { - ctx.flush(); + ctx.writeAndFlush(out); } out = null; } else if (promise != null) { @@ -1003,7 +991,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH engine.closeOutbound(); ChannelPromise closeNotifyFuture = ctx.newPromise().addListener(closeNotifyWriteListener); - flush(ctx, closeNotifyFuture); + write(ctx, Unpooled.EMPTY_BUFFER, closeNotifyFuture); + flush(ctx); safeClose(ctx, closeNotifyFuture, promise); } @@ -1140,7 +1129,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH private static final class PendingWrite { final ByteBuf buf; - ChannelPromise promise; + final ChannelPromise promise; PendingWrite(ByteBuf buf, ChannelPromise promise) { this.buf = buf; diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 8fbc65725e..558defa2c1 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -69,12 +69,11 @@ public class ChunkedWriteHandler private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); - private final Queue queue = new ArrayDeque(); + private final Queue queue = new ArrayDeque(); private final int maxPendingWrites; private volatile ChannelHandlerContext ctx; private final AtomicInteger pendingWrites = new AtomicInteger(); - private Object currentEvent; - + private PendingWrite currentWrite; public ChunkedWriteHandler() { this(4); } @@ -136,13 +135,12 @@ public class ChunkedWriteHandler } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { - queue.add(msg); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + queue.add(new PendingWrite(msg, promise)); } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - queue.add(promise); + public void flush(ChannelHandlerContext ctx) throws Exception { if (isWritable() || !ctx.channel().isActive()) { doFlush(ctx); } @@ -155,48 +153,45 @@ public class ChunkedWriteHandler } private void discard(final ChannelHandlerContext ctx, Throwable cause) { - - boolean fireExceptionCaught = false; - boolean success = true; for (;;) { - Object currentEvent = this.currentEvent; + PendingWrite currentWrite = this.currentWrite; - if (this.currentEvent == null) { - currentEvent = queue.poll(); + if (this.currentWrite == null) { + currentWrite = queue.poll(); } else { - this.currentEvent = null; + this.currentWrite = null; } - if (currentEvent == null) { + if (currentWrite == null) { break; } - - if (currentEvent instanceof ChunkedInput) { - ChunkedInput in = (ChunkedInput) currentEvent; + Object message = currentWrite.msg; + if (message instanceof ChunkedInput) { + ChunkedInput in = (ChunkedInput) message; try { if (!in.isEndOfInput()) { - success = false; + if (cause == null) { + cause = new ClosedChannelException(); + } + currentWrite.fail(cause); + } else { + currentWrite.promise.setSuccess(); } + closeInput(in); } catch (Exception e) { - success = false; + currentWrite.fail(e); logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e); + closeInput(in); } - closeInput(in); - } else if (currentEvent instanceof ChannelPromise) { - ChannelPromise f = (ChannelPromise) currentEvent; - if (!success) { - fireExceptionCaught = true; - if (cause == null) { - cause = new ClosedChannelException(); - } - f.setFailure(cause); - } else { - f.setSuccess(); + } else { + if (cause == null) { + cause = new ClosedChannelException(); } + currentWrite.fail(cause); } } - if (fireExceptionCaught) { + if (cause != null) { ctx.fireExceptionCaught(cause); } } @@ -207,21 +202,21 @@ public class ChunkedWriteHandler discard(ctx, null); return; } + boolean needsFlush; while (isWritable()) { - if (currentEvent == null) { - currentEvent = queue.poll(); + if (currentWrite == null) { + currentWrite = queue.poll(); } - if (currentEvent == null) { + if (currentWrite == null) { break; } + needsFlush = true; + final PendingWrite currentWrite = this.currentWrite; + final Object pendingMessage = currentWrite.msg; - final Object currentEvent = this.currentEvent; - if (currentEvent instanceof ChannelPromise) { - this.currentEvent = null; - ctx.flush((ChannelPromise) currentEvent); - } else if (currentEvent instanceof ChunkedInput) { - final ChunkedInput chunks = (ChunkedInput) currentEvent; + if (pendingMessage instanceof ChunkedInput) { + final ChunkedInput chunks = (ChunkedInput) pendingMessage; boolean endOfInput; boolean suspend; Object message = null; @@ -236,12 +231,13 @@ public class ChunkedWriteHandler suspend = false; } } catch (final Throwable t) { - this.currentEvent = null; + this.currentWrite = null; if (message != null) { ReferenceCountUtil.release(message); } + currentWrite.fail(t); if (ctx.executor().inEventLoop()) { ctx.fireExceptionCaught(t); } else { @@ -265,9 +261,9 @@ public class ChunkedWriteHandler } pendingWrites.incrementAndGet(); - ChannelFuture f = ctx.write(message).flush(); + ChannelFuture f = ctx.write(message); if (endOfInput) { - this.currentEvent = null; + this.currentWrite = null; // Register a listener which will close the input once the write is complete. // This is needed because the Chunk may have some resource bound that can not @@ -278,6 +274,7 @@ public class ChunkedWriteHandler @Override public void operationComplete(ChannelFuture future) throws Exception { pendingWrites.decrementAndGet(); + currentWrite.promise.setSuccess(); closeInput(chunks); } }); @@ -287,7 +284,8 @@ public class ChunkedWriteHandler public void operationComplete(ChannelFuture future) throws Exception { pendingWrites.decrementAndGet(); if (!future.isSuccess()) { - closeInput((ChunkedInput) currentEvent); + closeInput((ChunkedInput) pendingMessage); + currentWrite.fail(future.cause()); } } }); @@ -297,7 +295,8 @@ public class ChunkedWriteHandler public void operationComplete(ChannelFuture future) throws Exception { pendingWrites.decrementAndGet(); if (!future.isSuccess()) { - closeInput((ChunkedInput) currentEvent); + closeInput((ChunkedInput) pendingMessage); + currentWrite.fail(future.cause()); } else if (isWritable()) { resumeTransfer(); } @@ -305,10 +304,13 @@ public class ChunkedWriteHandler }); } } else { - ctx.write(currentEvent); - this.currentEvent = null; + ctx.write(pendingMessage, currentWrite.promise); + this.currentWrite = null; } + if (needsFlush) { + ctx.flush(); + } if (!channel.isActive()) { discard(ctx, new ClosedChannelException()); return; @@ -325,4 +327,21 @@ public class ChunkedWriteHandler } } } + + private static final class PendingWrite { + final Object msg; + final ChannelPromise promise; + + PendingWrite(Object msg, ChannelPromise promise) { + this.msg = msg; + this.promise = promise; + } + + void fail(Throwable cause) { + ReferenceCountUtil.release(msg); + if (promise != null) { + promise.setFailure(cause); + } + } + } } diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 2b3c349f99..0c174b483c 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -254,7 +254,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -262,7 +262,7 @@ public class IdleStateHandler extends ChannelDuplexHandler { firstWriterIdleEvent = firstAllIdleEvent = true; } }); - ctx.flush(promise); + ctx.write(msg, promise); } private void initialize(ChannelHandlerContext ctx) { diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 368cd1e6f5..19c41496f8 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -101,9 +101,9 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { scheduleTimeout(ctx, promise); - ctx.flush(promise); + ctx.write(msg, promise); } private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise future) { diff --git a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java index 1ced7bf111..1e0300157c 100644 --- a/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/AbstractTrafficShapingHandler.java @@ -18,6 +18,7 @@ package io.netty.handler.traffic; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -275,7 +276,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler } @Override - public void write(final ChannelHandlerContext ctx, final Object msg) + public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception { long curtime = System.currentTimeMillis(); long size = ((ByteBuf) msg).readableBytes(); @@ -301,7 +302,7 @@ public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler return; } } - ctx.write(msg); + ctx.write(msg, promise); } /** diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index b985a6d398..343d3fe5c4 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -134,7 +134,7 @@ public class ChunkedWriteHandlerTest { }; EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); - ch.write(input).flush().addListener(listener).syncUninterruptibly(); + ch.writeAndFlush(input).addListener(listener).syncUninterruptibly(); ch.checkException(); ch.finish(); @@ -172,7 +172,7 @@ public class ChunkedWriteHandlerTest { }; EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); - ch.write(input).flush().syncUninterruptibly(); + ch.writeAndFlush(input).syncUninterruptibly(); ch.checkException(); assertTrue(ch.finish()); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java index f140b4e5fb..0b126b8a13 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/sctp/SctpEchoTest.java @@ -86,7 +86,7 @@ public class SctpEchoTest extends AbstractSctpTest { for (int i = 0; i < data.length;) { int length = Math.min(random.nextInt(1024 * 64), data.length - i); - cc.write(Unpooled.wrappedBuffer(data, i, length)).flush(); + cc.writeAndFlush(Unpooled.wrappedBuffer(data, i, length)); i += length; } @@ -159,7 +159,7 @@ public class SctpEchoTest extends AbstractSctpTest { } if (channel.parent() != null) { - channel.write(Unpooled.wrappedBuffer(actual)).flush(); + channel.writeAndFlush(Unpooled.wrappedBuffer(actual)); } counter += actual.length; diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java index de67fcec99..aab1447727 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramMulticastTest.java @@ -73,7 +73,7 @@ public class DatagramMulticastTest extends AbstractDatagramTest { cc.joinGroup(groupAddress, NetUtil.LOOPBACK_IF).sync(); - sc.write(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).flush().sync(); + sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync(); assertTrue(mhandler.await()); // leave the group @@ -83,7 +83,7 @@ public class DatagramMulticastTest extends AbstractDatagramTest { Thread.sleep(1000); // we should not receive a message anymore as we left the group before - sc.write(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).flush().sync(); + sc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), groupAddress)).sync(); mhandler.await(); sc.close().awaitUninterruptibly(); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java index 9161f99470..fab3d6e211 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java @@ -56,7 +56,7 @@ public class DatagramUnicastTest extends AbstractDatagramTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.bind().sync().channel(); - cc.write(new DatagramPacket(Unpooled.copyInt(1), addr)).flush().sync(); + cc.writeAndFlush(new DatagramPacket(Unpooled.copyInt(1), addr)).sync(); assertTrue(latch.await(10, TimeUnit.SECONDS)); sc.close().sync(); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java index 194a2419b9..8cdb7564a0 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java @@ -86,7 +86,7 @@ public class SocketBufReleaseTest extends AbstractSocketTest { buf = ctx.alloc().buffer(); buf.writeBytes(data); - ctx.channel().write(buf).flush().addListener(new ChannelFutureListener() { + ctx.channel().writeAndFlush(buf).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { latch.countDown(); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 44059e121e..738f4f831d 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -132,9 +132,9 @@ public class SocketEchoTest extends AbstractSocketTest { int length = Math.min(random.nextInt(1024 * 64), data.length - i); ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); if (voidPromise) { - assertEquals(cc.voidPromise(), cc.write(buf).flush(cc.voidPromise())); + assertEquals(cc.voidPromise(), cc.writeAndFlush(buf, cc.voidPromise())); } else { - assertNotEquals(cc.voidPromise(), cc.write(buf).flush()); + assertNotEquals(cc.voidPromise(), cc.writeAndFlush(buf)); } i += length; } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java index 9ab063a286..aaf3708787 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java @@ -90,9 +90,9 @@ public class SocketFileRegionTest extends AbstractSocketTest { Channel cc = cb.connect().sync().channel(); FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), 0L, file.length()); if (voidPromise) { - assertEquals(cc.voidPromise(), cc.write(region).flush(cc.voidPromise())); + assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise())); } else { - assertNotEquals(cc.voidPromise(), cc.write(region).flush()); + assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region)); } while (sh.counter < data.length) { if (sh.exception.get() != null) { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java index ed1e74a150..38ca0bca50 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java @@ -71,7 +71,7 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest { Channel cc = cb.connect().sync().channel(); for (int i = 0; i < data.length;) { int length = Math.min(random.nextInt(1024 * 3), data.length - i); - cc.write(Unpooled.wrappedBuffer(data, i, length)).flush(); + cc.writeAndFlush(Unpooled.wrappedBuffer(data, i, length)); i += length; } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java index f3a1cbc433..59135f5021 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketGatheringWriteTest.java @@ -85,7 +85,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest { } i += length; } - assertNotEquals(cc.voidPromise(), cc.flush().sync()); + assertNotEquals(cc.voidPromise(), cc.writeAndFlush(Unpooled.EMPTY_BUFFER).sync()); while (sh.counter < data.length) { if (sh.exception.get() != null) { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java index aad49240c2..750ce8422b 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java @@ -82,7 +82,7 @@ public class SocketObjectEchoTest extends AbstractSocketTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); for (String element : data) { - cc.write(element).flush(); + cc.writeAndFlush(element); } while (ch.counter < data.length) { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java index ea90ebe397..65f369f996 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketShutdownOutputBySelfTest.java @@ -48,7 +48,7 @@ public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest { assertFalse(ch.isOutputShutdown()); s = ss.accept(); - ch.write(Unpooled.wrappedBuffer(new byte[] { 1 })).flush().sync(); + ch.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 })).sync(); assertEquals(1, s.getInputStream().read()); assertTrue(h.ch.isOpen()); diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java index c6023e5b0e..aeac4a01d6 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSpdyEchoTest.java @@ -198,7 +198,7 @@ public class SocketSpdyEchoTest extends AbstractSocketTest { int port = ((InetSocketAddress) sc.localAddress()).getPort(); Channel cc = cb.remoteAddress(NetUtil.LOCALHOST, port).connect().sync().channel(); - cc.write(frames).flush(); + cc.writeAndFlush(frames); while (ch.counter < frames.writerIndex() - ignoredBytes) { if (sh.exception.get() != null) { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index c0677656ee..7e1ddc2e9a 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -101,7 +101,7 @@ public class SocketSslEchoTest extends AbstractSocketTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); Future hf = cc.pipeline().get(SslHandler.class).handshakeFuture(); - cc.write(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)).flush(); + cc.writeAndFlush(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)); final AtomicBoolean firstByteWriteFutureDone = new AtomicBoolean(); hf.sync(); @@ -110,7 +110,7 @@ public class SocketSslEchoTest extends AbstractSocketTest { for (int i = FIRST_MESSAGE_SIZE; i < data.length;) { int length = Math.min(random.nextInt(1024 * 64), data.length - i); - ChannelFuture future = cc.write(Unpooled.wrappedBuffer(data, i, length)).flush(); + ChannelFuture future = cc.writeAndFlush(Unpooled.wrappedBuffer(data, i, length)); future.sync(); i += length; } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java index 8b329a62c0..960930ae20 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java @@ -85,7 +85,7 @@ public class SocketStringEchoTest extends AbstractSocketTest { Channel cc = cb.connect().sync().channel(); for (String element : data) { String delimiter = random.nextBoolean() ? "\r\n" : "\n"; - cc.write(element + delimiter).flush(); + cc.writeAndFlush(element + delimiter); } while (ch.counter < data.length) { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/WriteBeforeRegisteredTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/WriteBeforeRegisteredTest.java index ced8330268..b902c91d76 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/WriteBeforeRegisteredTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/WriteBeforeRegisteredTest.java @@ -34,7 +34,7 @@ public class WriteBeforeRegisteredTest extends AbstractClientSocketTest { SocketChannel ch = null; try { ch = (SocketChannel) cb.handler(h).connect().channel(); - ch.write(Unpooled.wrappedBuffer(new byte[] { 1 })).flush(); + ch.writeAndFlush(Unpooled.wrappedBuffer(new byte[] { 1 })); } finally { if (ch != null) { ch.close(); diff --git a/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java b/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java index 2f96bf851a..c4deaa6f06 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java +++ b/transport-udt/src/test/java/io/netty/test/udt/util/EchoByteHandler.java @@ -58,7 +58,7 @@ public class EchoByteHandler extends ChannelInboundHandlerAdapter { log.info("ECHO active {}", NioUdtProvider.socketUDT(ctx.channel()).toStringOptions()); - ctx.write(message).flush(); + ctx.writeAndFlush(message); } @Override @@ -67,7 +67,7 @@ public class EchoByteHandler extends ChannelInboundHandlerAdapter { if (meter != null) { meter.mark(buf.readableBytes()); } - ctx.write(msg).flush(); + ctx.writeAndFlush(msg); } @Override diff --git a/transport-udt/src/test/java/io/netty/test/udt/util/EchoMessageHandler.java b/transport-udt/src/test/java/io/netty/test/udt/util/EchoMessageHandler.java index 9d3107302d..c48603a4e8 100644 --- a/transport-udt/src/test/java/io/netty/test/udt/util/EchoMessageHandler.java +++ b/transport-udt/src/test/java/io/netty/test/udt/util/EchoMessageHandler.java @@ -57,7 +57,7 @@ public class EchoMessageHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { log.info("ECHO active {}", NioUdtProvider.socketUDT(ctx.channel()).toStringOptions()); - ctx.write(message).flush(); + ctx.writeAndFlush(message); } @Override @@ -72,6 +72,6 @@ public class EchoMessageHandler extends ChannelInboundHandlerAdapter { if (meter != null) { meter.mark(udtMsg.content().readableBytes()); } - ctx.write(msg).flush(); + ctx.writeAndFlush(msg); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 4d60f70742..73b798b621 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufHolder; import io.netty.util.DefaultAttributeMap; -import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.logging.InternalLogger; @@ -178,8 +177,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public ChannelFuture flush() { - return pipeline.flush(); + public Channel flush() { + pipeline.flush(); + return this; } @Override @@ -219,14 +219,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public Channel write(Object msg) { - pipeline.write(msg); - return this; + public ChannelFuture write(Object msg) { + return pipeline.write(msg); } @Override - public ChannelFuture flush(ChannelPromise promise) { - return pipeline.flush(promise); + public ChannelFuture write(Object msg, ChannelPromise promise) { + return pipeline.write(msg, promise); } @Override @@ -589,13 +588,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void write(Object msg) { - outboundBuffer.addMessage(msg); + public void write(Object msg, ChannelPromise promise) { + outboundBuffer.addMessage(msg, promise); } @Override - public void flush(final ChannelPromise promise) { - outboundBuffer.addPromise(promise); + public void flush() { + outboundBuffer.addFlush(); if (!inFlushNow) { // Avoid re-entrance try { @@ -615,7 +614,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha eventLoop().execute(new Runnable() { @Override public void run() { - flush(promise); + flush(); } }); } @@ -645,50 +644,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { for (;;) { - ChannelPromise promise = outboundBuffer.currentPromise; - if (promise == null) { - if (!outboundBuffer.next()) { - break; - } - promise = outboundBuffer.currentPromise; - } - MessageList messages = outboundBuffer.currentMessages; - - // Make sure the message list is not empty. if (messages == null) { - promise.trySuccess(); if (!outboundBuffer.next()) { break; - } else { - continue; } + messages = outboundBuffer.currentMessages; } int messageIndex = outboundBuffer.currentMessageIndex; int messageCount = messages.size(); - Object[] messageArray = messages.array(); - - // Make sure the promise has not been cancelled. - if (promise.isCancelled()) { - // If cancelled, release all unwritten messages and recycle. - for (int i = messageIndex; i < messageCount; i ++) { - ReferenceCountUtil.release(messageArray[i]); - } - messages.recycle(); - if (!outboundBuffer.next()) { - break; - } else { - continue; - } - } + Object[] messageArray = messages.messages(); + ChannelPromise[] promiseArray = messages.promises(); // Write the messages. - int writtenMessages = doWrite(messageArray, messageCount, messageIndex); - outboundBuffer.currentMessageIndex = messageIndex += writtenMessages; + final int writtenMessages = doWrite(messageArray, messageCount, messageIndex); + + // Notify the promises. + final int newMessageIndex = messageIndex + writtenMessages; + for (int i = messageIndex; i < newMessageIndex; i ++) { + promiseArray[i].trySuccess(); + } + + // Update the index variable and decide what to do next. + outboundBuffer.currentMessageIndex = messageIndex = newMessageIndex; if (messageIndex >= messageCount) { messages.recycle(); - promise.trySuccess(); if (!outboundBuffer.next()) { break; } diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 753e2bff16..17b70a5927 100755 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -25,8 +25,8 @@ import java.net.SocketAddress; *
    *
  • {@link #connect(SocketAddress, ChannelPromise)}
  • *
  • {@link #disconnect(ChannelPromise)}
  • - *
  • {@link #write(Object)}
  • - *
  • {@link #flush(ChannelPromise)}
  • + *
  • {@link #write(Object, ChannelPromise)}
  • + *
  • {@link #flush()}
  • *
  • and the shortcut methods which calls the methods mentioned above *
*/ @@ -78,13 +78,14 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S private final class DefaultServerUnsafe extends AbstractUnsafe { @Override - public void write(Object msg) { + public void write(Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); + reject(promise); } @Override - public void flush(ChannelPromise promise) { - reject(promise); + public void flush() { + // ignore } @Override diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index c6eda10298..7656bcb117 100755 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -148,7 +148,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr boolean isWritable(); @Override - Channel write(Object msg); + Channel flush(); @Override Channel read(); @@ -237,12 +237,12 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr /** * Schedules a write operation. */ - void write(Object msg); + void write(Object msg, ChannelPromise promise); /** * Flush out all scheduled writes. */ - void flush(ChannelPromise promise); + void flush(); /** * Flush out all schedules writes immediately. diff --git a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java index 3246301419..05994d6784 100644 --- a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java @@ -90,12 +90,12 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { - ctx.write(msg); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ctx.write(msg, promise); } @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - ctx.flush(promise); + public void flush(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index fe1a83a5df..57b9881c37 100755 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -185,5 +185,5 @@ public interface ChannelHandlerContext ChannelHandlerContext fireChannelWritabilityChanged(); @Override - ChannelHandlerContext write(Object msg); + ChannelHandlerContext flush(); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 4a8df4f6ff..5fd612d068 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -31,12 +31,10 @@ final class ChannelOutboundBuffer { private static final int MIN_INITIAL_CAPACITY = 8; - ChannelPromise currentPromise; MessageList currentMessages; int currentMessageIndex; private long currentMessageListSize; - private ChannelPromise[] promises; private MessageList[] messages; private long[] messageListSizes; @@ -79,29 +77,28 @@ final class ChannelOutboundBuffer { initialCapacity = MIN_INITIAL_CAPACITY; } - promises = new ChannelPromise[initialCapacity]; messages = new MessageList[initialCapacity]; messageListSizes = new long[initialCapacity]; this.channel = channel; } - void addMessage(Object msg) { + void addMessage(Object msg, ChannelPromise promise) { int tail = this.tail; MessageList msgs = messages[tail]; if (msgs == null) { messages[tail] = msgs = MessageList.newInstance(); } - msgs.add(msg); + + msgs.add(msg, promise); int size = channel.calculateMessageSize(msg); messageListSizes[tail] += size; incrementPendingOutboundBytes(size); } - void addPromise(ChannelPromise promise) { + void addFlush() { int tail = this.tail; - promises[tail] = promise; - if ((this.tail = tail + 1 & promises.length - 1) == head) { + if ((this.tail = tail + 1 & messages.length - 1) == head) { doubleCapacity(); } } @@ -141,28 +138,23 @@ final class ChannelOutboundBuffer { assert head == tail; int p = head; - int n = promises.length; + int n = messages.length; int r = n - p; // number of elements to the right of p int newCapacity = n << 1; if (newCapacity < 0) { throw new IllegalStateException("Sorry, deque too big"); } - ChannelPromise[] a1 = new ChannelPromise[newCapacity]; - System.arraycopy(promises, p, a1, 0, r); - System.arraycopy(promises, 0, a1, r, p); - promises = a1; - @SuppressWarnings("unchecked") - MessageList[] a2 = new MessageList[newCapacity]; - System.arraycopy(messages, p, a2, 0, r); - System.arraycopy(messages, 0, a2, r, p); - messages = a2; + MessageList[] a1 = new MessageList[newCapacity]; + System.arraycopy(messages, p, a1, 0, r); + System.arraycopy(messages, 0, a1, r, p); + messages = a1; - long[] a3 = new long[newCapacity]; - System.arraycopy(messageListSizes, p, a3, 0, r); - System.arraycopy(messageListSizes, 0, a3, r, p); - messageListSizes = a3; + long[] a2 = new long[newCapacity]; + System.arraycopy(messageListSizes, p, a2, 0, r); + System.arraycopy(messageListSizes, 0, a2, r, p); + messageListSizes = a2; head = 0; tail = n; @@ -175,24 +167,21 @@ final class ChannelOutboundBuffer { int h = head; - ChannelPromise e = promises[h]; // Element is null if deque empty + MessageList e = messages[h]; // Element is null if deque empty if (e == null) { currentMessageListSize = 0; - currentPromise = null; currentMessages = null; return false; } - currentPromise = e; currentMessages = messages[h]; currentMessageIndex = 0; currentMessageListSize = messageListSizes[h]; - promises[h] = null; messages[h] = null; messageListSizes[h] = 0; - head = h + 1 & promises.length - 1; + head = h + 1 & messages.length - 1; return true; } @@ -201,7 +190,7 @@ final class ChannelOutboundBuffer { } int size() { - return tail - head & promises.length - 1; + return tail - head & messages.length - 1; } boolean isEmpty() { @@ -213,10 +202,9 @@ final class ChannelOutboundBuffer { int tail = this.tail; if (head != tail) { this.head = this.tail = 0; - final int mask = promises.length - 1; + final int mask = messages.length - 1; int i = head; do { - promises[i] = null; messages[i] = null; messageListSizes[i] = 0; i = i + 1 & mask; @@ -236,25 +224,25 @@ final class ChannelOutboundBuffer { try { inFail = true; - if (currentPromise == null) { + if (currentMessages == null) { if (!next()) { return; } } do { - if (!(currentPromise instanceof VoidChannelPromise) && !currentPromise.tryFailure(cause)) { - logger.warn("Promise done already: {} - new exception is:", currentPromise, cause); - } - if (currentMessages != null) { // Release all failed messages. - Object[] array = currentMessages.array(); + Object[] messages = currentMessages.messages(); + ChannelPromise[] promises = currentMessages.promises(); final int size = currentMessages.size(); try { for (int i = currentMessageIndex; i < size; i++) { - Object msg = array[i]; - ReferenceCountUtil.release(msg); + ReferenceCountUtil.release(messages[i]); + ChannelPromise p = promises[i]; + if (!(p instanceof VoidChannelPromise) && !p.tryFailure(cause)) { + logger.warn("Promise done already: {} - new exception is:", p, cause); + } } } finally { currentMessages.recycle(); diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java index e49ece48c8..6c92cf7890 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java @@ -76,12 +76,7 @@ public interface ChannelOutboundHandler extends ChannelHandler { */ void read(ChannelHandlerContext ctx) throws Exception; - /** - * Called once a flush operation is made and so the outbound data should be written. - * - * @param ctx the {@link ChannelHandlerContext} for which the flush operation is made - */ - void write(ChannelHandlerContext ctx, Object msg) throws Exception; + void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; - void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; + void flush(ChannelHandlerContext ctx) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index 7ffb31dbd0..c0a6897123 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -100,12 +100,18 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme * Sub-classes may override this method to change behavior. */ @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { - ctx.write(msg); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + ctx.write(msg, promise); } + /** + * Calls {@link ChannelHandlerContext#flush()} to forward + * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ @Override - public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - ctx.flush(promise); + public void flush(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java index eb9029be59..604fe8dbb2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -99,8 +99,6 @@ interface ChannelOutboundInvoker { */ ChannelFuture deregister(); - ChannelFuture flush(); - /** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. @@ -204,19 +202,25 @@ interface ChannelOutboundInvoker { /** * Request to write a message via this ChannelOutboundInvoker through the {@link ChannelPipeline}. - * This method will not request to actual flush, so be sure to call {@link #flush()} or - * {@link #flush(ChannelPromise)} once you want to request to flush all pending data to the actual transport. + * This method will not request to actual flush, so be sure to call {@link #flush()} + * once you want to request to flush all pending data to the actual transport. */ - ChannelOutboundInvoker write(Object msg); + ChannelFuture write(Object msg); /** - * Request to flush all pending messages via this ChannelOutboundInvoker and notify the {@link ChannelFuture} - * once the operation completes, either because the operation was successful or because of an error. + * Request to write a message via this ChannelOutboundInvoker through the {@link ChannelPipeline}. + * This method will not request to actual flush, so be sure to call {@link #flush()} + * once you want to request to flush all pending data to the actual transport. */ - ChannelFuture flush(ChannelPromise promise); + ChannelFuture write(Object msg, ChannelPromise promise); /** - * Shortcut for call {@link #write(Object)} and {@link #flush(ChannelPromise)}. + * Request to flush all pending messages via this ChannelOutboundInvoker. + */ + ChannelOutboundInvoker flush(); + + /** + * Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}. */ ChannelFuture writeAndFlush(Object msg, ChannelPromise promise); diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 9aabd90a3c..13302b0611 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -145,8 +145,8 @@ import java.util.NoSuchElementException; *
    *
  • {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}
  • *
  • {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}
  • - *
  • {@link ChannelHandlerContext#write(Object)}
  • - *
  • {@link ChannelHandlerContext#flush(ChannelPromise)}
  • + *
  • {@link ChannelHandlerContext#write(Object, ChannelPromise)}
  • + *
  • {@link ChannelHandlerContext#flush()}
  • *
  • {@link ChannelHandlerContext#read()}
  • *
  • {@link ChannelHandlerContext#disconnect(ChannelPromise)}
  • *
  • {@link ChannelHandlerContext#close(ChannelPromise)}
  • @@ -622,7 +622,7 @@ public interface ChannelPipeline ChannelPipeline fireChannelWritabilityChanged(); @Override - ChannelPipeline write(Object msg); + ChannelPipeline flush(); @Override ChannelPipeline read(); diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java index d604bd4dc9..7ba94a1208 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java @@ -189,13 +189,13 @@ public class CombinedChannelDuplexHandler= capacity) { + if (messages.length >= capacity) { return; } - Object[] newElements = new Object[normalizeCapacity(capacity)]; - System.arraycopy(elements, 0, newElements, 0, size); - elements = newElements; + final int size = this.size; + capacity = normalizeCapacity(capacity); + + Object[] newMessages = new Object[capacity]; + System.arraycopy(messages, 0, newMessages, 0, size); + messages = newMessages; + + ChannelPromise[] newPromises = new ChannelPromise[capacity]; + System.arraycopy(promises, 0, newPromises, 0, size); + promises = newPromises; } private static int normalizeCapacity(int initialCapacity) { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 1f4def07be..29adf2db77 100755 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.RecyclableArrayList; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -167,20 +168,31 @@ public class EmbeddedChannel extends AbstractChannel { return !lastOutboundBuffer.isEmpty(); } - for (Object m: msgs) { - if (m == null) { - break; + RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length); + try { + for (Object m: msgs) { + if (m == null) { + break; + } + futures.add(write(m)); } - write(m); + + flush(); + + for (int i = 0; i < futures.size(); i++) { + ChannelFuture future = (ChannelFuture) futures.get(i); + assert future.isDone(); + if (future.cause() != null) { + recordException(future.cause()); + } + } + + runPendingTasks(); + checkException(); + return !lastOutboundBuffer.isEmpty(); + } finally { + futures.recycle(); } - ChannelFuture future = flush(); - assert future.isDone(); - if (future.cause() != null) { - recordException(future.cause()); - } - runPendingTasks(); - checkException(); - return !lastOutboundBuffer.isEmpty(); } /** diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java index adfc3fd573..9f2998d447 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java @@ -105,7 +105,7 @@ public interface ChannelGroup extends Set, Comparable { * * @return itself */ - ChannelGroup write(Object message); + ChannelGroupFuture write(Object message); /** * Flush all {@link Channel}s in this @@ -115,7 +115,7 @@ public interface ChannelGroup extends Set, Comparable { * @return the {@link ChannelGroupFuture} instance that notifies when * the operation is done for all channels */ - ChannelGroupFuture flush(); + ChannelGroup flush(); /** * Shortcut for calling {@link #write(Object)} and {@link #flush()}. diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index 8dccfcfd58..cf2dcd749b 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -192,27 +192,27 @@ public class DefaultChannelGroup extends AbstractSet implements Channel } @Override - public ChannelGroup write(Object message) { + public ChannelGroupFuture write(Object message) { if (message == null) { throw new NullPointerException("message"); } + Map futures = new LinkedHashMap(size()); + for (Channel c: nonServerChannels) { - c.write(safeDuplicate(message)); + futures.put(c, c.write(safeDuplicate(message))); } ReferenceCountUtil.release(message); - return this; + return new DefaultChannelGroupFuture(this, futures, executor); } @Override - public ChannelGroupFuture flush() { - Map futures = new LinkedHashMap(size()); + public ChannelGroup flush() { for (Channel c: nonServerChannels) { - futures.put(c, c.flush()); + c.flush(); } - - return new DefaultChannelGroupFuture(this, futures, executor); + return this; } @Override diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index a9bae244df..f137525668 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -137,7 +137,7 @@ public class DefaultChannelPipelineTest { StringInboundHandler handler = new StringInboundHandler(); setUp(handler); - peer.write(holder).flush().sync(); + peer.writeAndFlush(holder).sync(); assertTrue(free.await(10, TimeUnit.SECONDS)); assertTrue(handler.called); @@ -488,7 +488,7 @@ public class DefaultChannelPipelineTest { final Queue outboundBuffer = new ArrayDeque(); @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { outboundBuffer.add(msg); } diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index c6206255dc..208b0bec27 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -124,7 +124,7 @@ public class LocalChannelTest { // Close the channel and write something. cc.close().sync(); try { - cc.write(new Object()).flush().sync(); + cc.writeAndFlush(new Object()).sync(); fail("must raise a ClosedChannelException"); } catch (Exception e) { assertThat(e, is(instanceOf(ClosedChannelException.class))); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index bdae50917e..f69bad9ba8 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -17,11 +17,13 @@ package io.netty.channel.local; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; @@ -108,7 +110,7 @@ public class LocalTransportThreadModelTest { ch.pipeline().write("5"); ch.pipeline().context(h3).write("6"); ch.pipeline().context(h2).write("7"); - ch.pipeline().context(h1).write("8").flush().sync(); + ch.pipeline().context(h1).writeAndFlush("8").sync(); ch.close().sync(); @@ -371,9 +373,9 @@ public class LocalTransportThreadModelTest { } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { outboundThreadNames.add(Thread.currentThread().getName()); - ctx.write(msg); + ctx.write(msg, promise); } @Override @@ -414,7 +416,7 @@ public class LocalTransportThreadModelTest { } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { Assert.assertSame(t, Thread.currentThread()); // Don't let the write request go to the server-side channel - just swallow. @@ -430,6 +432,7 @@ public class LocalTransportThreadModelTest { ctx.write(actual); } } + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise); m.release(); } @@ -473,7 +476,7 @@ public class LocalTransportThreadModelTest { } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { Assert.assertSame(t, Thread.currentThread()); ByteBuf out = ctx.alloc().buffer(4); @@ -482,7 +485,7 @@ public class LocalTransportThreadModelTest { Assert.assertEquals(expected, m); out.writeInt(m); - ctx.write(out); + ctx.write(out, promise); } @Override @@ -521,14 +524,14 @@ public class LocalTransportThreadModelTest { } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { Assert.assertSame(t, Thread.currentThread()); int actual = (Integer) msg; int expected = outCnt ++; Assert.assertEquals(expected, actual); - ctx.write(msg); + ctx.write(msg, promise); } @Override @@ -566,13 +569,13 @@ public class LocalTransportThreadModelTest { @Override public void write( - ChannelHandlerContext ctx, Object msg) throws Exception { + ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { Assert.assertSame(t, Thread.currentThread()); int actual = (Integer) msg; int expected = outCnt ++; Assert.assertEquals(expected, actual); - ctx.write(msg); + ctx.write(msg, promise); } @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java index 95c447c677..45218327aa 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -107,9 +107,9 @@ public class LocalTransportThreadModelTest2 { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < messageCountPerRun; i ++) { - ctx.channel().write(name + ' ' + i); + lastWriteFuture = ctx.channel().write(name + ' ' + i); } - lastWriteFuture = ctx.channel().flush(); + ctx.channel().flush(); } @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index 68024a82a7..bfb3b4bd67 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; @@ -309,10 +310,11 @@ public class LocalTransportThreadModelTest3 { } @Override - public void write(ChannelHandlerContext ctx, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (!inbound) { events.add(EventType.WRITE); } + promise.setSuccess(); } @Override