Remove freeInbound/OutboundMessage(), replaced by ReferenceCounted.retain/release()

- Related: #1029
This commit is contained in:
Trustin Lee 2013-02-10 13:31:31 +09:00
parent b9996908b1
commit 2f1a0b0593
13 changed files with 35 additions and 133 deletions

View File

@ -61,7 +61,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
} }
if (msg instanceof HttpContent) { if (msg instanceof HttpContent) {
HttpContent c = (HttpContent) msg; final HttpContent c = (HttpContent) msg;
if (!decodeStarted) { if (!decodeStarted) {
decodeStarted = true; decodeStarted = true;
@ -98,12 +98,15 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
} }
return decoded; return decoded;
} }
c.retain();
return new Object[] { message, c }; return new Object[] { message, c };
} }
if (decoder != null) { if (decoder != null) {
return decodeContent(null, c); return decodeContent(null, c);
} else { } else {
c.retain();
return c; return c;
} }
} }
@ -111,15 +114,6 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
return null; return null;
} }
@Override
protected void freeInboundMessage(HttpObject msg) throws Exception {
if (decoder == null) {
// if the decoder was null we returned the original message so we are not allowed to free it
return;
}
super.freeInboundMessage(msg);
}
private Object[] decodeContent(HttpMessage header, HttpContent c) { private Object[] decodeContent(HttpMessage header, HttpContent c) {
ByteBuf newContent = Unpooled.buffer(); ByteBuf newContent = Unpooled.buffer();
ByteBuf content = c.data(); ByteBuf content = c.data();

View File

@ -158,28 +158,17 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
} }
return encoded; return encoded;
} }
if (encoder != null) { if (encoder != null) {
return encodeContent(null, c); return encodeContent(null, c);
} }
return msg;
c.retain();
return c;
} }
return null; return null;
} }
@Override
protected void freeOutboundMessage(HttpObject msg) throws Exception {
if (encoder == null) {
// if the decoder was null we returned the original message so we are not allowed to free it
return;
}
super.freeOutboundMessage(msg);
}
@Override
protected void freeInboundMessage(HttpMessage msg) throws Exception {
// not free it as it is only passed through
}
private Object[] encodeContent(HttpMessage header, HttpContent c) { private Object[] encodeContent(HttpMessage header, HttpContent c) {
ByteBuf newContent = Unpooled.buffer(); ByteBuf newContent = Unpooled.buffer();
ByteBuf content = c.data(); ByteBuf content = c.data();

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.buffer.BufUtil;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -127,6 +128,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
if (!m.getDecoderResult().isSuccess()) { if (!m.getDecoderResult().isSuccess()) {
removeTransferEncodingChunked(m); removeTransferEncodingChunked(m);
this.currentMessage = null; this.currentMessage = null;
BufUtil.retain(m);
return m; return m;
} }
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
@ -156,7 +158,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
CompositeByteBuf content = (CompositeByteBuf) currentMessage.data(); CompositeByteBuf content = (CompositeByteBuf) currentMessage.data();
if (content.readableBytes() > maxContentLength - chunk.data().readableBytes()) { if (content.readableBytes() > maxContentLength - chunk.data().readableBytes()) {
chunk.release();
// TODO: Respond with 413 Request Entity Too Large // TODO: Respond with 413 Request Entity Too Large
// and discard the traffic or close the connection. // and discard the traffic or close the connection.
// No need to notify the upstream handlers - just log. // No need to notify the upstream handlers - just log.
@ -168,10 +169,9 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
// Append the content of the chunk // Append the content of the chunk
if (chunk.data().isReadable()) { if (chunk.data().isReadable()) {
chunk.retain();
content.addComponent(chunk.data()); content.addComponent(chunk.data());
content.writerIndex(content.writerIndex() + chunk.data().readableBytes()); content.writerIndex(content.writerIndex() + chunk.data().readableBytes());
} else {
chunk.release();
} }
final boolean last; final boolean last;
@ -180,7 +180,7 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
DecoderResult.partialFailure(chunk.getDecoderResult().cause())); DecoderResult.partialFailure(chunk.getDecoderResult().cause()));
last = true; last = true;
} else { } else {
last = msg instanceof LastHttpContent; last = chunk instanceof LastHttpContent;
} }
if (last) { if (last) {
@ -207,11 +207,6 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
} }
} }
@Override
protected void freeInboundMessage(HttpObject msg) throws Exception {
// decode() frees HttpContents.
}
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx; this.ctx = ctx;

View File

@ -78,14 +78,18 @@ public class WebSocketServerProtocolHandler extends ChannelInboundMessageHandler
public void messageReceived(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { public void messageReceived(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof CloseWebSocketFrame) { if (frame instanceof CloseWebSocketFrame) {
WebSocketServerHandshaker handshaker = getHandshaker(ctx); WebSocketServerHandshaker handshaker = getHandshaker(ctx);
frame.retain();
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
return; return;
} }
if (frame instanceof PingWebSocketFrame) { if (frame instanceof PingWebSocketFrame) {
frame.data().retain();
ctx.channel().write(new PongWebSocketFrame(frame.data())); ctx.channel().write(new PongWebSocketFrame(frame.data()));
return; return;
} }
frame.retain();
ctx.nextInboundMessageBuffer().add(frame); ctx.nextInboundMessageBuffer().add(frame);
} }
@ -100,11 +104,6 @@ public class WebSocketServerProtocolHandler extends ChannelInboundMessageHandler
} }
} }
@Override
protected void freeInboundMessage(WebSocketFrame msg) throws Exception {
// Do not free; other handlers will.
}
static WebSocketServerHandshaker getHandshaker(ChannelHandlerContext ctx) { static WebSocketServerHandshaker getHandshaker(ChannelHandlerContext ctx) {
return ctx.attr(HANDSHAKER_ATTR_KEY).get(); return ctx.attr(HANDSHAKER_ATTR_KEY).get();
} }

View File

@ -162,6 +162,8 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
if (msg instanceof HttpContent) { if (msg instanceof HttpContent) {
HttpContent chunk = (HttpContent) msg; HttpContent chunk = (HttpContent) msg;
chunk.data().retain();
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamId, chunk.data()); SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(currentStreamId, chunk.data());
spdyDataFrame.setLast(chunk instanceof LastHttpContent); spdyDataFrame.setLast(chunk instanceof LastHttpContent);
if (chunk instanceof LastHttpContent) { if (chunk instanceof LastHttpContent) {
@ -284,13 +286,4 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
return spdySynReplyFrame; return spdySynReplyFrame;
} }
@Override
protected void freeOutboundMessage(HttpObject msg) throws Exception {
if (msg instanceof HttpContent) {
// Will be freed later as the content of them is just reused here
return;
}
super.freeOutboundMessage(msg);
}
} }

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.buffer.BufUtil;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpMessage;
@ -43,6 +44,8 @@ public class SpdyHttpResponseStreamIdHandler extends
if (id != null && id.intValue() != NO_ID && !msg.headers().contains(SpdyHttpHeaders.Names.STREAM_ID)) { if (id != null && id.intValue() != NO_ID && !msg.headers().contains(SpdyHttpHeaders.Names.STREAM_ID)) {
SpdyHttpHeaders.setStreamId(msg, id); SpdyHttpHeaders.setStreamId(msg, id);
} }
BufUtil.retain(msg);
return msg; return msg;
} }
@ -59,16 +62,7 @@ public class SpdyHttpResponseStreamIdHandler extends
ids.remove(((SpdyRstStreamFrame) msg).getStreamId()); ids.remove(((SpdyRstStreamFrame) msg).getStreamId());
} }
BufUtil.retain(msg);
return msg; return msg;
} }
@Override
protected void freeInboundMessage(Object msg) throws Exception {
// just pass through so no free
}
@Override
protected void freeOutboundMessage(HttpMessage msg) throws Exception {
// just pass through so no free
}
} }

View File

@ -15,7 +15,6 @@
*/ */
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -68,12 +67,6 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
protected Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { protected Object encode(ChannelHandlerContext ctx, Object msg) throws Exception {
return MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg); return MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg);
} }
@Override
@SuppressWarnings("unchecked")
protected void freeOutboundMessage(Object msg) throws Exception {
MessageToMessageCodec.this.freeOutboundMessage((OUTBOUND_IN) msg);
}
}; };
private final MessageToMessageDecoder<Object> decoder = private final MessageToMessageDecoder<Object> decoder =
@ -89,12 +82,6 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception { protected Object decode(ChannelHandlerContext ctx, Object msg) throws Exception {
return MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg); return MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg);
} }
@Override
@SuppressWarnings("unchecked")
protected void freeInboundMessage(Object msg) throws Exception {
MessageToMessageCodec.this.freeInboundMessage((INBOUND_IN) msg);
}
}; };
private final TypeParameterMatcher inboundMsgMatcher; private final TypeParameterMatcher inboundMsgMatcher;
@ -172,12 +159,4 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
protected abstract Object encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception; protected abstract Object encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception;
protected abstract Object decode(ChannelHandlerContext ctx, INBOUND_IN msg) throws Exception; protected abstract Object decode(ChannelHandlerContext ctx, INBOUND_IN msg) throws Exception;
protected void freeInboundMessage(INBOUND_IN msg) throws Exception {
BufUtil.release(msg);
}
protected void freeOutboundMessage(OUTBOUND_IN msg) throws Exception {
BufUtil.release(msg);
}
} }

View File

@ -90,18 +90,22 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
} }
if (frame instanceof CloseWebSocketFrame) { if (frame instanceof CloseWebSocketFrame) {
frame.retain();
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) { } else if (frame instanceof PingWebSocketFrame) {
frame.data().retain();
ctx.channel().write( ctx.channel().write(
new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data())); new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof TextWebSocketFrame) { } else if (frame instanceof TextWebSocketFrame) {
// String text = ((TextWebSocketFrame) frame).getText(); frame.data().retain();
ctx.channel().write( ctx.channel().write(
new TextWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data())); new TextWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof BinaryWebSocketFrame) { } else if (frame instanceof BinaryWebSocketFrame) {
frame.data().retain();
ctx.channel().write( ctx.channel().write(
new BinaryWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data())); new BinaryWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof ContinuationWebSocketFrame) { } else if (frame instanceof ContinuationWebSocketFrame) {
frame.data().retain();
ctx.channel().write( ctx.channel().write(
new ContinuationWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data())); new ContinuationWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.data()));
} else if (frame instanceof PongWebSocketFrame) { } else if (frame instanceof PongWebSocketFrame) {
@ -136,14 +140,4 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
private static String getWebSocketLocation(FullHttpRequest req) { private static String getWebSocketLocation(FullHttpRequest req) {
return "ws://" + req.headers().get(HttpHeaders.Names.HOST); return "ws://" + req.headers().get(HttpHeaders.Names.HOST);
} }
@Override
protected void freeInboundMessage(Object msg) throws Exception {
if (!(msg instanceof PongWebSocketFrame) && msg instanceof WebSocketFrame) {
// will be freed once written by the encoder
return;
}
super.freeInboundMessage(msg);
}
} }

View File

@ -105,10 +105,12 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
// Check for closing frame // Check for closing frame
if (frame instanceof CloseWebSocketFrame) { if (frame instanceof CloseWebSocketFrame) {
frame.retain();
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
return; return;
} }
if (frame instanceof PingWebSocketFrame) { if (frame instanceof PingWebSocketFrame) {
frame.data().retain();
ctx.channel().write(new PongWebSocketFrame(frame.data())); ctx.channel().write(new PongWebSocketFrame(frame.data()));
return; return;
} }
@ -146,15 +148,6 @@ public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<
ctx.close(); ctx.close();
} }
@Override
protected void freeInboundMessage(Object msg) throws Exception {
if (msg instanceof PingWebSocketFrame || msg instanceof CloseWebSocketFrame) {
// Will be freed once wrote back
return;
}
super.freeInboundMessage(msg);
}
private static String getWebSocketLocation(FullHttpRequest req) { private static String getWebSocketLocation(FullHttpRequest req) {
return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH; return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH;
} }

View File

@ -107,10 +107,12 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
// Check for closing frame // Check for closing frame
if (frame instanceof CloseWebSocketFrame) { if (frame instanceof CloseWebSocketFrame) {
frame.retain();
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
return; return;
} }
if (frame instanceof PingWebSocketFrame) { if (frame instanceof PingWebSocketFrame) {
frame.data().retain();
ctx.channel().write(new PongWebSocketFrame(frame.data())); ctx.channel().write(new PongWebSocketFrame(frame.data()));
return; return;
} }
@ -151,13 +153,4 @@ public class WebSocketSslServerHandler extends ChannelInboundMessageHandlerAdapt
private static String getWebSocketLocation(FullHttpRequest req) { private static String getWebSocketLocation(FullHttpRequest req) {
return "wss://" + req.headers().get(HOST) + WEBSOCKET_PATH; return "wss://" + req.headers().get(HOST) + WEBSOCKET_PATH;
} }
@Override
protected void freeInboundMessage(Object msg) throws Exception {
if (msg instanceof PingWebSocketFrame || msg instanceof CloseWebSocketFrame) {
// Will be freed once wrote back
return;
}
super.freeInboundMessage(msg);
}
} }

View File

@ -84,15 +84,12 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd
//first incomplete message //first incomplete message
fragments.put(streamIdentifier, byteBuf); fragments.put(streamIdentifier, byteBuf);
} }
byteBuf.retain();
} }
private void handleAssembledMessage(ChannelHandlerContext ctx, SctpMessage assembledMsg) { private void handleAssembledMessage(ChannelHandlerContext ctx, SctpMessage assembledMsg) {
ctx.nextInboundMessageBuffer().add(assembledMsg); ctx.nextInboundMessageBuffer().add(assembledMsg);
assembled = true; assembled = true;
} }
@Override
protected void freeInboundMessage(SctpMessage msg) throws Exception {
// It is an aggregator so not free it yet
}
} }

View File

@ -113,7 +113,7 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
abort.expect(ABORT); abort.expect(ABORT);
break; break;
} finally { } finally {
freeInboundMessage(imsg); BufUtil.release(imsg);
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -171,13 +171,4 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
} }
/**
* Is called after a message was processed via {@link #messageReceived(ChannelHandlerContext, Object)} to free
* up any resources that is held by the inbound message. You may want to override this if your implementation
* just pass-through the input message or need it for later usage.
*/
protected void freeInboundMessage(I msg) throws Exception {
BufUtil.release(msg);
}
} }

View File

@ -111,7 +111,7 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
flush(ctx, imsg); flush(ctx, imsg);
processed ++; processed ++;
} finally { } finally {
freeOutboundMessage(imsg); BufUtil.release(imsg);
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -172,13 +172,4 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/ */
protected void endFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { } protected void endFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { }
/**
* Is called after a message was processed via {@link #flush(ChannelHandlerContext, Object)} to free
* up any resources that is held by the outbound message. You may want to override this if your implementation
* just pass-through the input message or need it for later usage.
*/
protected void freeOutboundMessage(I msg) throws Exception {
BufUtil.release(msg);
}
} }