Share same ThreadLocal for all decoder/encoders to minimize memory usage
This commit is contained in:
parent
94ef7dc1b9
commit
1675e61f5b
@ -45,14 +45,6 @@ public abstract class ByteToMessageDecoder
|
|||||||
private volatile boolean singleDecode;
|
private volatile boolean singleDecode;
|
||||||
private boolean decodeWasNull;
|
private boolean decodeWasNull;
|
||||||
|
|
||||||
private static final ThreadLocal<OutputMessageBuf> decoderOutput =
|
|
||||||
new ThreadLocal<OutputMessageBuf>() {
|
|
||||||
@Override
|
|
||||||
protected OutputMessageBuf initialValue() {
|
|
||||||
return new OutputMessageBuf();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
|
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
|
||||||
* This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
|
* This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
|
||||||
@ -91,7 +83,7 @@ public abstract class ByteToMessageDecoder
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
OutputMessageBuf out = decoderOutput();
|
OutputMessageBuf out = OutputMessageBuf.get();
|
||||||
try {
|
try {
|
||||||
ByteBuf in = ctx.inboundByteBuffer();
|
ByteBuf in = ctx.inboundByteBuffer();
|
||||||
if (in.isReadable()) {
|
if (in.isReadable()) {
|
||||||
@ -131,7 +123,7 @@ public abstract class ByteToMessageDecoder
|
|||||||
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) {
|
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) {
|
||||||
boolean wasNull = false;
|
boolean wasNull = false;
|
||||||
boolean decoded = false;
|
boolean decoded = false;
|
||||||
OutputMessageBuf out = decoderOutput();
|
OutputMessageBuf out = OutputMessageBuf.get();
|
||||||
|
|
||||||
assert out.isEmpty();
|
assert out.isEmpty();
|
||||||
|
|
||||||
@ -217,9 +209,4 @@ public abstract class ByteToMessageDecoder
|
|||||||
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
|
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
|
||||||
decode(ctx, in, out);
|
decode(ctx, in, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
final OutputMessageBuf decoderOutput() {
|
|
||||||
return decoderOutput.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -42,14 +42,6 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
|||||||
*/
|
*/
|
||||||
public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHandlerAdapter<I> {
|
public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHandlerAdapter<I> {
|
||||||
|
|
||||||
private static final ThreadLocal<OutputMessageBuf> decoderOutput =
|
|
||||||
new ThreadLocal<OutputMessageBuf>() {
|
|
||||||
@Override
|
|
||||||
protected OutputMessageBuf initialValue() {
|
|
||||||
return new OutputMessageBuf();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
protected MessageToMessageDecoder() { }
|
protected MessageToMessageDecoder() { }
|
||||||
|
|
||||||
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
|
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
|
||||||
@ -58,7 +50,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHa
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
|
public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
|
||||||
OutputMessageBuf out = decoderOutput.get();
|
OutputMessageBuf out = OutputMessageBuf.get();
|
||||||
try {
|
try {
|
||||||
decode(ctx, msg, out);
|
decode(ctx, msg, out);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -39,13 +39,6 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
|
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
|
||||||
private static final ThreadLocal<OutputMessageBuf> encoderOutput =
|
|
||||||
new ThreadLocal<OutputMessageBuf>() {
|
|
||||||
@Override
|
|
||||||
protected OutputMessageBuf initialValue() {
|
|
||||||
return new OutputMessageBuf();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
protected MessageToMessageEncoder() { }
|
protected MessageToMessageEncoder() { }
|
||||||
|
|
||||||
@ -55,9 +48,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
|
public final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
|
||||||
OutputMessageBuf out = encoderOutput.get();
|
OutputMessageBuf out = OutputMessageBuf.get();
|
||||||
|
|
||||||
assert out.isEmpty();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
encode(ctx, msg, out);
|
encode(ctx, msg, out);
|
||||||
|
@ -20,15 +20,28 @@ import io.netty.buffer.DefaultMessageBuf;
|
|||||||
|
|
||||||
final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
||||||
private int byteBufs;
|
private int byteBufs;
|
||||||
public OutputMessageBuf() {
|
|
||||||
|
private static final ThreadLocal<OutputMessageBuf> output =
|
||||||
|
new ThreadLocal<OutputMessageBuf>() {
|
||||||
|
@Override
|
||||||
|
protected OutputMessageBuf initialValue() {
|
||||||
|
return new OutputMessageBuf();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OutputMessageBuf get() {
|
||||||
|
OutputMessageBuf buf = super.get();
|
||||||
|
// Just to be sure
|
||||||
|
buf.clear();
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
static OutputMessageBuf get() {
|
||||||
|
return output.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public OutputMessageBuf(int initialCapacity) {
|
private OutputMessageBuf() {
|
||||||
super(initialCapacity);
|
|
||||||
}
|
|
||||||
|
|
||||||
public OutputMessageBuf(int initialCapacity, int maxCapacity) {
|
|
||||||
super(initialCapacity, maxCapacity);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -364,7 +364,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
OutputMessageBuf out = decoderOutput();
|
OutputMessageBuf out = OutputMessageBuf.get();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
replayable.terminate();
|
replayable.terminate();
|
||||||
@ -412,7 +412,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
|||||||
protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) {
|
protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) {
|
||||||
boolean wasNull = false;
|
boolean wasNull = false;
|
||||||
ByteBuf in = cumulation;
|
ByteBuf in = cumulation;
|
||||||
OutputMessageBuf out = decoderOutput();
|
OutputMessageBuf out = OutputMessageBuf.get();
|
||||||
boolean decoded = false;
|
boolean decoded = false;
|
||||||
|
|
||||||
assert out.isEmpty();
|
assert out.isEmpty();
|
||||||
|
Loading…
Reference in New Issue
Block a user