diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java
index f90783be66..1ea168c543 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java
@@ -45,9 +45,9 @@ class WebSocketClientProtocolHandshakeHandler extends ChannelInboundHandlerAdapt
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof FullHttpResponse)) {
- ctx.fireMessageReceived(msg);
+ ctx.fireChannelRead(msg);
return;
}
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java
index 3de70e8800..c5bdde5c92 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandler.java
@@ -15,7 +15,6 @@
*/
package io.netty.handler.codec.http.websocketx;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
@@ -31,6 +30,8 @@ import io.netty.util.AttributeKey;
import java.util.List;
+import static io.netty.handler.codec.http.HttpVersion.*;
+
/**
* This handler does all the heavy lifting for you to run a websocket server.
*
@@ -123,13 +124,13 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
static ChannelHandler forbiddenHttpRequestResponder() {
return new ChannelInboundHandlerAdapter() {
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpResponse response =
new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN);
ctx.channel().write(response).flush();
} else {
- ctx.fireMessageReceived(msg);
+ ctx.fireChannelRead(msg);
}
}
};
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java
index 9ca7a81054..041d6fba2a 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java
@@ -50,7 +50,7 @@ class WebSocketServerProtocolHandshakeHandler
}
@Override
- public void messageReceived(final ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest req = (FullHttpRequest) msg;
if (req.getMethod() != GET) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java
index 9cfc57a899..e92f390238 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyOrHttpChooser.java
@@ -60,13 +60,13 @@ public abstract class SpdyOrHttpChooser extends ChannelInboundHandlerAdapter {
protected abstract SelectedProtocol getProtocol(SSLEngine engine);
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (initPipeline(ctx)) {
// When we reached here we can remove this handler as its now clear what protocol we want to use
// from this point on.
ctx.pipeline().remove(this);
- ctx.fireMessageReceived(msg);
+ ctx.fireChannelRead(msg);
}
}
diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java
index 0de5c6dec1..49b15b933e 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java
@@ -102,7 +102,7 @@ final class SpdySession {
}
/*
- * hasReceivedReply and receivedReply are only called from messageReceived
+ * hasReceivedReply and receivedReply are only called from channelRead()
* no need to synchronize access to the StreamState
*/
boolean hasReceivedReply(int streamID) {
diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java
index f9cceeca90..ec86192def 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySessionHandler.java
@@ -84,7 +84,7 @@ public class SpdySessionHandler
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SpdyDataFrame) {
/*
@@ -378,7 +378,7 @@ public class SpdySessionHandler
}
}
- ctx.fireMessageReceived(msg);
+ ctx.fireChannelRead(msg);
}
@Override
@@ -658,13 +658,13 @@ public class SpdySessionHandler
* Note: this is only called by the worker thread
*/
private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
- boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamId);
+ boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
removeStream(ctx, streamId);
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
ctx.write(spdyRstStreamFrame).flush();
- if (fireMessageReceived) {
- ctx.fireMessageReceived(spdyRstStreamFrame);
+ if (fireChannelRead) {
+ ctx.fireChannelRead(spdyRstStreamFrame);
}
}
@@ -800,7 +800,7 @@ public class SpdySessionHandler
halfCloseStream(streamId, false);
}
- ctx.fireMessageReceived(spdyDataFrame);
+ ctx.fireChannelRead(spdyDataFrame);
} else {
// We can send a partial frame
spdySession.updateSendWindowSize(streamId, -1 * newWindowSize);
@@ -826,7 +826,7 @@ public class SpdySessionHandler
// }
//});
- ctx.fireMessageReceived(partialDataFrame);
+ ctx.fireChannelRead(partialDataFrame);
newWindowSize = 0;
}
diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpObjectAggregatorTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpObjectAggregatorTest.java
index d6f8fb4676..9120c228a5 100644
--- a/codec-http/src/test/java/io/netty/handler/codec/http/HttpObjectAggregatorTest.java
+++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpObjectAggregatorTest.java
@@ -46,7 +46,7 @@ public class HttpObjectAggregatorTest {
assertFalse(embedder.writeInbound(chunk1));
assertFalse(embedder.writeInbound(chunk2));
- // this should trigger a messageReceived event so return true
+ // this should trigger a channelRead event so return true
assertTrue(embedder.writeInbound(chunk3));
assertTrue(embedder.finish());
DefaultFullHttpRequest aggratedMessage = (DefaultFullHttpRequest) embedder.readInbound();
@@ -87,7 +87,7 @@ public class HttpObjectAggregatorTest {
assertFalse(embedder.writeInbound(chunk1));
assertFalse(embedder.writeInbound(chunk2));
- // this should trigger a messageReceived event so return true
+ // this should trigger a channelRead event so return true
assertTrue(embedder.writeInbound(trailer));
assertTrue(embedder.finish());
DefaultFullHttpRequest aggratedMessage = (DefaultFullHttpRequest) embedder.readInbound();
@@ -172,7 +172,7 @@ public class HttpObjectAggregatorTest {
assertFalse(embedder.writeInbound(chunk1));
assertFalse(embedder.writeInbound(chunk2));
- // this should trigger a messageReceived event so return true
+ // this should trigger a channelRead event so return true
assertTrue(embedder.writeInbound(chunk3));
assertTrue(embedder.finish());
FullHttpRequest aggratedMessage = (FullHttpRequest) embedder.readInbound();
diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java
index 11bd2ff56a..704c6f0981 100644
--- a/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java
+++ b/codec-http/src/test/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandlerTest.java
@@ -160,7 +160,7 @@ public class WebSocketServerProtocolHandlerTest {
private String content;
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assertNull(content);
content = "processed: " + ((TextWebSocketFrame) msg).text();
}
diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java
index 3d16c57217..33e10049dd 100644
--- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java
+++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdyFrameDecoderTest.java
@@ -131,7 +131,7 @@ public class SpdyFrameDecoderTest {
public volatile Object message;
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object m) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object m) throws Exception {
message = m;
}
diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java
index 0b9bc82297..15f0e9dcbd 100644
--- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java
+++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java
@@ -303,7 +303,7 @@ public class SpdySessionHandlerTest {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof SpdySynStreamFrame) {
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java
index 9d066118a7..637a319a7a 100644
--- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java
+++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageCodec.java
@@ -78,8 +78,8 @@ public abstract class ByteToMessageCodec extends ChannelDuplexHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
- decoder.messageReceived(ctx, msg);
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ decoder.channelRead(ctx, msg);
}
@Override
diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java
index eaca15e068..d3415e7481 100644
--- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java
+++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java
@@ -56,7 +56,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
/**
- * If set then only one message is decoded on each {@link #messageReceived(ChannelHandlerContext, Object)}
+ * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)}
* call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
*
* Default is {@code false} as this has performance impacts.
@@ -67,7 +67,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
/**
* If {@code true} then only one message is decoded on each
- * {@link #messageReceived(ChannelHandlerContext, Object)} call.
+ * {@link #channelRead(ChannelHandlerContext, Object)} call.
*
* Default is {@code false} as this has performance impacts.
*/
@@ -102,9 +102,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = internalBuffer();
if (buf.isReadable()) {
- ctx.fireMessageReceived(buf);
+ ctx.fireChannelRead(buf);
}
- ctx.fireMessageReceivedLast();
+ ctx.fireChannelReadComplete();
handlerRemoved0(ctx);
}
@@ -115,7 +115,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
@Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutput out = CodecOutput.newInstance();
try {
if (msg instanceof ByteBuf) {
@@ -163,7 +163,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
for (int i = 0; i < out.size(); i ++) {
- ctx.fireMessageReceived(out.get(i));
+ ctx.fireChannelRead(out.get(i));
}
out.recycle();
@@ -171,14 +171,14 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
@Override
- public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
- super.channelReadSuspended(ctx);
+ ctx.fireChannelReadComplete();
}
@Override
@@ -202,7 +202,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
for (int i = 0; i < out.size(); i ++) {
- ctx.fireMessageReceived(out.get(i));
+ ctx.fireChannelRead(out.get(i));
}
ctx.fireChannelInactive();
}
@@ -256,7 +256,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
- * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, CodecOutput)} but sub-classes may
+ * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List
*
- * Be aware that messages are not released after the {@link #messageReceived(ChannelHandlerContext, Object)}
+ * Be aware that messages are not released after the {@link #channelRead(ChannelHandlerContext, Object)}
* method returns automatically. If you are looking for a {@link ChannelInboundHandler} implementation that
* releases the received messages automatically, please see {@link SimpleChannelInboundHandler}.
*
@@ -74,31 +74,21 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
+
/**
- * Calls {@link ChannelHandlerContext#fireChannelReadSuspended()} to forward
+ * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
- public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
- ctx.fireChannelReadSuspended();
- }
-
- /**
- * Calls {@link ChannelHandlerContext#fireMessageReceived(Object)} to forward
- * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
- *
- * Sub-classes may override this method to change behavior.
- */
- @Override
- public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
- ctx.fireMessageReceived(msg);
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ctx.fireChannelRead(msg);
}
@Override
- public void messageReceivedLast(ChannelHandlerContext ctx) throws Exception {
- ctx.fireMessageReceivedLast();
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ ctx.fireChannelReadComplete();
}
/**
diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java
index d4c48e9968..1d87314725 100644
--- a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java
+++ b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java
@@ -76,21 +76,15 @@ interface ChannelInboundInvoker {
ChannelInboundInvoker fireUserEventTriggered(Object event);
/**
- * A {@link Channel} received bytes which are now ready to read from its inbound buffer.
+ * A {@link Channel} received a message.
*
- * This will result in having the {@link ChannelInboundHandler#messageReceived(ChannelHandlerContext, Object)}
+ * This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}
* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
- ChannelInboundInvoker fireMessageReceived(Object msg);
+ ChannelInboundInvoker fireChannelRead(Object msg);
- ChannelInboundInvoker fireMessageReceivedLast();
-
- /**
- * Triggers an {@link ChannelInboundHandler#channelReadSuspended(ChannelHandlerContext) channelReadSuspended}
- * event to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
- */
- ChannelInboundInvoker fireChannelReadSuspended();
+ ChannelInboundInvoker fireChannelReadComplete();
/**
* Triggers an {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}
diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java
index fa92a153a2..4a8df4f6ff 100644
--- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java
+++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java
@@ -32,12 +32,12 @@ final class ChannelOutboundBuffer {
private static final int MIN_INITIAL_CAPACITY = 8;
ChannelPromise currentPromise;
- MessageList currentMessages;
+ MessageList currentMessages;
int currentMessageIndex;
private long currentMessageListSize;
private ChannelPromise[] promises;
- private MessageList[] messages;
+ private MessageList[] messages;
private long[] messageListSizes;
private int head;
@@ -87,7 +87,7 @@ final class ChannelOutboundBuffer {
void addMessage(Object msg) {
int tail = this.tail;
- MessageList msgs = messages[tail];
+ MessageList msgs = messages[tail];
if (msgs == null) {
messages[tail] = msgs = MessageList.newInstance();
}
@@ -154,7 +154,7 @@ final class ChannelOutboundBuffer {
promises = a1;
@SuppressWarnings("unchecked")
- MessageList[] a2 = new MessageList[newCapacity];
+ MessageList[] a2 = new MessageList[newCapacity];
System.arraycopy(messages, p, a2, 0, r);
System.arraycopy(messages, 0, a2, r, p);
messages = a2;
@@ -249,9 +249,11 @@ final class ChannelOutboundBuffer {
if (currentMessages != null) {
// Release all failed messages.
+ Object[] array = currentMessages.array();
+ final int size = currentMessages.size();
try {
- for (int i = currentMessageIndex; i < currentMessages.size(); i++) {
- Object msg = currentMessages.get(i);
+ for (int i = currentMessageIndex; i < size; i++) {
+ Object msg = array[i];
ReferenceCountUtil.release(msg);
}
} finally {
diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java
index 497d27f735..eb9029be59 100644
--- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java
+++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java
@@ -190,9 +190,9 @@ interface ChannelOutboundInvoker {
/**
* Request to Read data from the {@link Channel} into the first inbound buffer, triggers an
- * {@link ChannelInboundHandler#messageReceived(ChannelHandlerContext, Object)} event if data was
- * read, and triggers an
- * {@link ChannelInboundHandler#channelReadSuspended(ChannelHandlerContext) channelReadSuspended} event so the
+ * {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} event if data was
+ * read, and triggers a
+ * {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext) channelReadComplete} event so the
* handler can decide to continue reading. If there's a pending read operation already, this method does nothing.
*
* This will result in having the
diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java
index 2c791769b7..9aabd90a3c 100644
--- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java
+++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java
@@ -44,7 +44,7 @@ import java.util.NoSuchElementException;
* The following diagram describes how I/O events are processed by {@link ChannelHandler}s in a {@link ChannelPipeline}
* typically. An I/O event is handled by either a {@link ChannelInboundHandler} or a {@link ChannelOutboundHandler}
* and be forwarded to its closest handler by calling the event propagation methods defined in
- * {@link ChannelHandlerContext}, such as {@link ChannelHandlerContext#fireMessageReceived(Object)} and
+ * {@link ChannelHandlerContext}, such as {@link ChannelHandlerContext#fireChannelRead(Object)} and
* {@link ChannelHandlerContext#write(Object)}.
*
*
- * A {@link MessageList} is internally managed by a thread-local object pool to keep the GC pressure minimal.
- * To return the {@link MessageList} to the pool, you must call one of the following methods: {@link #recycle()},
- * {@link #releaseAllAndRecycle()}, or {@link #releaseAllAndRecycle(int)}. If the list is returned to the pool (i.e.
- * recycled), it will be reused when you attempts to get a {@link MessageList} via {@link #newInstance()}.
- *
- * If you don't think recycling a {@link MessageList} isn't worth, it is also fine not to recycle it. Because of this
- * relaxed contract, you can also decide not to wrap your code with a {@code try-finally} block only to recycle a
- * list. However, if you decided to recycle it, you must keep in mind that:
- *
- *
you must make sure you do not access the list once it has been recycled.
- *
If you are given with a {@link MessageList} as a parameter of your handler, it means it is your handler's
- * responsibility to release the messages in it and to recycle it.
- *
- *
- *
- *
Consuming the messages from a {@link MessageList} efficiently yet safely
- *
- * The following example shows how to iterate over the list which contains {@code ReferenceCounted} messages safely
- * (i.e. without leaking them) while consuming the messages.
- *
- *
- * public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageList}<Object> msgs) {
- * final int size = msgs.size();
- * final Object[] in = msgs.array();
- * boolean success = false;
- * try {
- * for (int i = 0; i < size; i ++) {
- * Object m = in[i];
- *
- * // Handle the message.
- * doSomethingWithMessage(m);
- *
- * // Release the handled message.
- * {@link ReferenceCountUtil#release(Object) ReferenceCountUtil.release(m)};
- *
- * // To prevent {@link #releaseAllAndRecycle()} from releasing the message again,
- * // replace the message with a dummy object.
- * in[i] = MessageList.NONE;
- * }
- *
- * success = true;
- * } finally {
- * if (success) {
- * {@link #recycle() msgs.recycle()};
- * } else {
- * {@link #releaseAllAndRecycle() msgs.releaseAllAndRecycle()};
- * }
- * }
- * }
- *
- *
- * @param the type of the contained messages
*/
-final class MessageList implements Iterable {
+final class MessageList {
private static final int DEFAULT_INITIAL_CAPACITY = 8;
private static final int MIN_INITIAL_CAPACITY = 4;
- private static final Recycler> RECYCLER = new Recycler>() {
+ private static final Recycler RECYCLER = new Recycler() {
@Override
- protected MessageList> newObject(Handle handle) {
- return new MessageList(handle);
+ protected MessageList newObject(Handle handle) {
+ return new MessageList(handle);
}
};
/**
- * Create a new empty {@link MessageList} instance
+ * Create a new empty {@link MessageList} instance.
*/
- public static MessageList newInstance() {
- return newInstance(DEFAULT_INITIAL_CAPACITY);
- }
-
- /**
- * Create a new empty {@link MessageList} instance with the given capacity.
- */
- @SuppressWarnings("unchecked")
- public static MessageList newInstance(int minCapacity) {
- MessageList ret = (MessageList) RECYCLER.get();
- ret.ensureCapacity(minCapacity);
- ret.modifications = 0;
+ static MessageList newInstance() {
+ MessageList ret = RECYCLER.get();
return ret;
}
- /**
- * Create a new {@link MessageList} instance which holds the given value
- */
- public static MessageList newInstance(T value) {
- MessageList ret = newInstance(MIN_INITIAL_CAPACITY);
- ret.add(value);
- return ret;
- }
-
- /**
- * Call {@link ReferenceCountUtil#retain(Object)} on all messages in this {@link MessageList} and return itself.
- */
- public MessageList retainAll() {
- int size = this.size;
- for (int i = 0; i < size; i ++) {
- ReferenceCountUtil.retain(elements[i]);
- }
- return this;
- }
-
- /**
- * Call {@link ReferenceCountUtil#retain(Object), int} on all messages in this {@link MessageList} and return
- * itself.
- */
- public MessageList retainAll(int increment) {
- int size = this.size;
- for (int i = 0; i < size; i ++) {
- ReferenceCountUtil.retain(elements[i], increment);
- }
- return this;
- }
-
- /**
- * Call {@link ReferenceCountUtil#release(Object)} on all messages in this {@link MessageList} and return
- * {@code true} if all messages were released.
- */
- public boolean releaseAll() {
- boolean releasedAll = true;
- int size = this.size;
- for (int i = 0; i < size; i ++) {
- releasedAll &= ReferenceCountUtil.release(elements[i]);
- }
- return releasedAll;
- }
-
- /**
- * Call {@link ReferenceCountUtil#release(Object, int)} on all messages in this {@link MessageList} and return
- * {@code true} if all messages were released.
- */
- public boolean releaseAll(int decrement) {
- boolean releasedAll = true;
- int size = this.size;
- for (int i = 0; i < size; i ++) {
- releasedAll &= ReferenceCountUtil.release(elements[i], decrement);
- }
- return releasedAll;
- }
-
- /**
- * Short-cut for calling {@link #releaseAll()} and {@link #recycle()}. Returns {@code true} if both return
- * {@code true}.
- */
- public boolean releaseAllAndRecycle() {
- return releaseAll() && recycle();
- }
-
- /**
- * Short-cut for calling {@link #releaseAll(int)} and {@link #recycle()}. Returns {@code true} if both return
- * {@code true}.
- */
- public boolean releaseAllAndRecycle(int decrement) {
- return releaseAll(decrement) && recycle();
- }
-
- /**
- * Clear and recycle this instance.
- */
- public boolean recycle() {
- clear();
- return RECYCLER.recycle(this, handle);
- }
-
private final Handle handle;
- private T[] elements;
+ private Object[] elements;
private int size;
- private int modifications;
- private boolean byteBufsOnly = true;
MessageList(Handle handle) {
this(handle, DEFAULT_INITIAL_CAPACITY);
@@ -210,357 +55,52 @@ final class MessageList implements Iterable {
MessageList(Handle handle, int initialCapacity) {
this.handle = handle;
initialCapacity = normalizeCapacity(initialCapacity);
- elements = newArray(initialCapacity);
- }
-
- private MessageList(Handle handle, T[] elements, int size) {
- this.handle = handle;
- this.elements = elements;
- this.size = size;
+ elements = new Object[initialCapacity];
}
/**
* Return the current size of this {@link MessageList} and so how many messages it holds.
*/
- public int size() {
+ int size() {
return size;
}
/**
* Return {@code true} if this {@link MessageList} is empty and so contains no messages.
*/
- public boolean isEmpty() {
+ boolean isEmpty() {
return size == 0;
}
- /**
- * Return the message on the given index. This method will throw an {@link IndexOutOfBoundsException} if there is
- * no message stored in the given index.
- */
- public T get(int index) {
- checkIndex(index);
- return elements[index];
- }
-
- /**
- * Returns the first message in this list.
- *
- * @throws NoSuchElementException if this list is empty
- */
- public T first() {
- if (size != 0) {
- return elements[0];
- } else {
- throw new NoSuchElementException();
- }
- }
-
- /**
- * Returns the last message in this list.
- *
- * @throws NoSuchElementException if this list is empty
- */
- public T last() {
- if (size != 0) {
- return elements[size - 1];
- } else {
- throw new NoSuchElementException();
- }
- }
-
- /**
- * Sets the message on the given index.
- */
- public MessageList set(int index, T value) {
- checkIndex(index);
- if (value == null) {
- throw new NullPointerException("value");
- }
-
- elements[index] = value;
-
- if (byteBufsOnly && !(value instanceof ByteBuf)) {
- byteBufsOnly = false;
- }
-
- return this;
- }
-
/**
* Add the message to this {@link MessageList} and return itself.
*/
- public MessageList add(T value) {
+ MessageList add(Object value) {
if (value == null) {
throw new NullPointerException("value");
}
- modifications ++;
int oldSize = size;
int newSize = oldSize + 1;
ensureCapacity(newSize);
elements[oldSize] = value;
size = newSize;
- if (byteBufsOnly && !(value instanceof ByteBuf)) {
- byteBufsOnly = false;
- }
return this;
}
/**
- * Add the messages contained in the array to this {@link MessageList} and return itself.
+ * Returns the backing array of this list.
*/
- public MessageList add(T[] src) {
- if (src == null) {
- throw new NullPointerException("src");
- }
- return add(src, 0, src.length);
- }
-
- /**
- * Add the messages contained in the array, using the given src index and src length, to this {@link MessageList}
- * and return itself.
- */
- public MessageList add(T[] src, int srcIdx, int srcLen) {
- if (src == null) {
- throw new NullPointerException("src");
- }
-
- modifications ++;
-
- int dstIdx = size;
- final int newSize = dstIdx + srcLen;
- ensureCapacity(newSize);
-
- final int srcEndIdx = srcIdx + srcLen;
- int i = srcIdx;
- try {
- if (byteBufsOnly) {
- while (i < srcEndIdx) {
- T m = src[i];
- if (m == null) {
- throw new NullPointerException("src[" + srcIdx + ']');
- }
-
- elements[dstIdx ++] = m;
- i ++;
-
- if (!(m instanceof ByteBuf)) {
- byteBufsOnly = false;
- break;
- }
- }
- }
-
- for (; i < srcEndIdx; i ++) {
- T m = src[i];
- if (m == null) {
- throw new NullPointerException("src[" + srcIdx + ']');
- }
-
- elements[dstIdx ++] = m;
- }
- } finally {
- if (dstIdx != newSize) {
- // Could not finish iteration.
- Arrays.fill(elements, size, dstIdx, null);
- }
- }
-
- assert dstIdx == newSize : String.format("dstIdx(%d) != newSize(%d)", dstIdx, newSize);
- size = newSize;
-
- return this;
- }
-
- /**
- * Add the messages contained in the given {@link MessageList} to this {@link MessageList} and return itself.
- */
- public MessageList add(MessageList src) {
- return add(src, 0, src.size());
- }
-
- /**
- * Add the messages contained in the given {@link MessageList}, using the given src index and src length, to this
- * {@link MessageList} and return itself.
- */
- public MessageList add(MessageList src, int srcIdx, int srcLen) {
- if (src == null) {
- throw new NullPointerException("src");
- }
-
- if (srcIdx > src.size - srcLen) {
- throw new IndexOutOfBoundsException(String.format(
- "srcIdx(%d) + srcLen(%d) > src.size(%d)", srcIdx, srcLen, src.size));
- }
-
- modifications ++;
-
- final int dstIdx = size;
- final int newSize = dstIdx + srcLen;
- ensureCapacity(newSize);
-
- byteBufsOnly &= src.byteBufsOnly;
- System.arraycopy(src.elements, srcIdx, elements, dstIdx, srcLen);
-
- size = newSize;
- return this;
- }
-
- /**
- * Clear all messages and return itself.
- */
- public MessageList clear() {
- modifications ++;
- Arrays.fill(elements, 0, size, null);
- byteBufsOnly = true;
- size = 0;
- return this;
- }
-
- /**
- * Create a new copy all messages of this {@link MessageList} and return it.
- */
- public MessageList copy() {
- return new MessageList(handle, elements.clone(), size);
- }
-
- /**
- * Create a new copy all messages of this {@link MessageList}, starting at the given index and using the given len,
- * and return it.
- */
- public MessageList copy(int index, int length) {
- checkRange(index, length);
- MessageList copy = new MessageList(handle, length);
- System.arraycopy(elements, index, copy.elements, 0, length);
- copy.size = length;
- return copy;
- }
-
- /**
- * Casts the type parameter of this list to a different type parameter. This method is often useful when you have
- * to deal with multiple messages and do not want to down-cast the messages every time you access the list.
- *
- *
- */
- @SuppressWarnings("unchecked")
- public MessageList cast() {
- return (MessageList) this;
- }
-
- /**
- * Iterates over the messages in this list with the specified {@code processor}.
- *
- * @return {@code -1} if the processor iterated to or beyond the end of the readable bytes.
- * The last-visited index If the {@link MessageListProcessor#process(Object)} returned {@code false}.
- */
- public int forEach(MessageListProcessor super T> proc) {
- if (proc == null) {
- throw new NullPointerException("proc");
- }
-
- final int size = this.size;
- if (size == 0) {
- return -1;
- }
-
- @SuppressWarnings("unchecked")
- MessageListProcessor p = (MessageListProcessor) proc;
-
- int i = 0;
- try {
- do {
- if (p.process(elements[i])) {
- i ++;
- } else {
- return i;
- }
- } while (i < size);
- } catch (Exception e) {
- PlatformDependent.throwException(e);
- }
-
- return -1;
- }
-
- /**
- * Iterates over the messages in this list with the specified {@code processor}.
- *
- * @return {@code -1} if the processor iterated to or beyond the end of the specified area.
- * The last-visited index If the {@link MessageListProcessor#process(Object)} returned {@code false}.
- */
- public int forEach(int index, int length, MessageListProcessor super T> proc) {
- checkRange(index, length);
- if (proc == null) {
- throw new NullPointerException("proc");
- }
-
- if (size == 0) {
- return -1;
- }
-
- @SuppressWarnings("unchecked")
- MessageListProcessor p = (MessageListProcessor) proc;
-
- final int end = index + length;
-
- int i = index;
- try {
- do {
- if (p.process(elements[i])) {
- i ++;
- } else {
- return i;
- }
- } while (i < end);
- } catch (Exception e) {
- PlatformDependent.throwException(e);
- }
-
- return -1;
- }
-
- @Override
- public Iterator iterator() {
- return new MessageListIterator();
- }
-
- /**
- * Returns the backing array of this list. Use this array when you want to iterate over the list fast:
- *