Optimize to minimize volatile reads to access next buffer in codec framework

This commit is contained in:
Norman Maurer 2013-04-03 18:03:55 +02:00
parent 9828267165
commit 94ef7dc1b9
6 changed files with 165 additions and 74 deletions

View File

@ -25,10 +25,12 @@ import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
/** /**
* Default {@link MessageBuf} implementation * Default {@link MessageBuf} implementation.
*
* You should use {@link Unpooled#messageBuffer()} to create an instance
* *
*/ */
final class DefaultMessageBuf<T> extends AbstractMessageBuf<T> { public class DefaultMessageBuf<T> extends AbstractMessageBuf<T> {
private static final int MIN_INITIAL_CAPACITY = 8; private static final int MIN_INITIAL_CAPACITY = 8;
private static final Object[] PLACEHOLDER = new Object[2]; private static final Object[] PLACEHOLDER = new Object[2];
@ -37,15 +39,15 @@ final class DefaultMessageBuf<T> extends AbstractMessageBuf<T> {
private int head; private int head;
private int tail; private int tail;
DefaultMessageBuf() { protected DefaultMessageBuf() {
this(MIN_INITIAL_CAPACITY << 1); this(MIN_INITIAL_CAPACITY << 1);
} }
DefaultMessageBuf(int initialCapacity) { protected DefaultMessageBuf(int initialCapacity) {
this(initialCapacity, Integer.MAX_VALUE); this(initialCapacity, Integer.MAX_VALUE);
} }
DefaultMessageBuf(int initialCapacity, int maxCapacity) { protected DefaultMessageBuf(int initialCapacity, int maxCapacity) {
super(maxCapacity); super(maxCapacity);
if (initialCapacity < 0) { if (initialCapacity < 0) {

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
@ -46,24 +45,14 @@ public abstract class ByteToMessageDecoder
private volatile boolean singleDecode; private volatile boolean singleDecode;
private boolean decodeWasNull; private boolean decodeWasNull;
private static final ThreadLocal<MessageBuf<Object>> decoderOutput = private static final ThreadLocal<OutputMessageBuf> decoderOutput =
new ThreadLocal<MessageBuf<Object>>() { new ThreadLocal<OutputMessageBuf>() {
@Override @Override
protected MessageBuf<Object> initialValue() { protected OutputMessageBuf initialValue() {
return Unpooled.messageBuffer(); return new OutputMessageBuf();
} }
}; };
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return super.newInboundBuffer(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
super.freeInboundBuffer(ctx);
}
/** /**
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
* This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up. * This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up.
@ -102,7 +91,7 @@ public abstract class ByteToMessageDecoder
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
MessageBuf<Object> out = decoderOutput(); OutputMessageBuf out = decoderOutput();
try { try {
ByteBuf in = ctx.inboundByteBuffer(); ByteBuf in = ctx.inboundByteBuffer();
if (in.isReadable()) { if (in.isReadable()) {
@ -118,13 +107,19 @@ public abstract class ByteToMessageDecoder
} }
} finally { } finally {
boolean decoded = false; boolean decoded = false;
for (;;) { if (out.containsByteBuf()) {
Object msg = out.poll(); for (;;) {
if (msg == null) { Object msg = out.poll();
break; if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
} }
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
} }
if (decoded) { if (decoded) {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
@ -136,7 +131,7 @@ public abstract class ByteToMessageDecoder
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) { protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) {
boolean wasNull = false; boolean wasNull = false;
boolean decoded = false; boolean decoded = false;
MessageBuf<Object> out = decoderOutput(); OutputMessageBuf out = decoderOutput();
assert out.isEmpty(); assert out.isEmpty();
@ -173,13 +168,19 @@ public abstract class ByteToMessageDecoder
} }
} }
} finally { } finally {
for (;;) { if (out.containsByteBuf()) {
Object msg = out.poll(); for (;;) {
if (msg == null) { Object msg = out.poll();
break; if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
} }
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
} }
if (decoded) { if (decoded) {
@ -217,7 +218,7 @@ public abstract class ByteToMessageDecoder
decode(ctx, in, out); decode(ctx, in, out);
} }
final MessageBuf<Object> decoderOutput() { final OutputMessageBuf decoderOutput() {
return decoderOutput.get(); return decoderOutput.get();
} }

View File

@ -16,7 +16,6 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
@ -43,11 +42,11 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
*/ */
public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHandlerAdapter<I> { public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHandlerAdapter<I> {
private static final ThreadLocal<MessageBuf<Object>> decoderOutput = private static final ThreadLocal<OutputMessageBuf> decoderOutput =
new ThreadLocal<MessageBuf<Object>>() { new ThreadLocal<OutputMessageBuf>() {
@Override @Override
protected MessageBuf<Object> initialValue() { protected OutputMessageBuf initialValue() {
return Unpooled.messageBuffer(); return new OutputMessageBuf();
} }
}; };
@ -59,16 +58,20 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHa
@Override @Override
public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
MessageBuf<Object> out = decoderOutput.get(); OutputMessageBuf out = decoderOutput.get();
try { try {
decode(ctx, msg, out); decode(ctx, msg, out);
} finally { } finally {
for (;;) { if (out.containsByteBuf()) {
Object obj = out.poll(); for (;;) {
if (obj == null) { Object decoded = out.poll();
break; if (decoded == null) {
break;
}
ChannelHandlerUtil.addToNextInboundBuffer(ctx, decoded);
} }
ChannelHandlerUtil.addToNextInboundBuffer(ctx, obj); } else {
out.drainTo(ctx.nextInboundMessageBuffer());
} }
} }
} }

View File

@ -16,7 +16,6 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
@ -40,11 +39,11 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
* *
*/ */
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
private static final ThreadLocal<MessageBuf<Object>> encoderOutput = private static final ThreadLocal<OutputMessageBuf> encoderOutput =
new ThreadLocal<MessageBuf<Object>>() { new ThreadLocal<OutputMessageBuf>() {
@Override @Override
protected MessageBuf<Object> initialValue() { protected OutputMessageBuf initialValue() {
return Unpooled.messageBuffer(); return new OutputMessageBuf();
} }
}; };
@ -56,7 +55,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
@Override @Override
public final void flush(ChannelHandlerContext ctx, I msg) throws Exception { public final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
MessageBuf<Object> out = encoderOutput.get(); OutputMessageBuf out = encoderOutput.get();
assert out.isEmpty(); assert out.isEmpty();
@ -71,14 +70,18 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
throw new EncoderException(cause); throw new EncoderException(cause);
} }
} finally { } finally {
for (;;) { if (out.containsByteBuf()) {
Object encoded = out.poll(); for (;;) {
if (encoded == null) { Object encoded = out.poll();
break; if (encoded == null) {
break;
}
// Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline
// accept bytes. Related to #1222
ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded);
} }
// Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline } else {
// accept bytes. Related to #1222 out.drainTo(ctx.nextOutboundMessageBuffer());
ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded);
} }
} }
} }

View File

@ -0,0 +1,71 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultMessageBuf;
final class OutputMessageBuf extends DefaultMessageBuf<Object> {
private int byteBufs;
public OutputMessageBuf() {
}
public OutputMessageBuf(int initialCapacity) {
super(initialCapacity);
}
public OutputMessageBuf(int initialCapacity, int maxCapacity) {
super(initialCapacity, maxCapacity);
}
@Override
public boolean offer(Object e) {
boolean added = super.offer(e);
if (added && e instanceof ByteBuf) {
byteBufs++;
}
return added;
}
@Override
public boolean remove(Object o) {
boolean removed = super.remove(o);
if (removed && o instanceof ByteBuf) {
byteBufs--;
}
return removed;
}
@Override
public Object poll() {
Object o = super.poll();
if (o instanceof ByteBuf) {
byteBufs--;
}
return o;
}
@Override
public void clear() {
super.clear();
byteBufs = 0;
}
public boolean containsByteBuf() {
return byteBufs > 0;
}
}

View File

@ -16,7 +16,6 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
@ -365,7 +364,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
MessageBuf<Object> out = decoderOutput(); OutputMessageBuf out = decoderOutput();
try { try {
replayable.terminate(); replayable.terminate();
@ -387,13 +386,19 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
} finally { } finally {
boolean decoded = false; boolean decoded = false;
for (;;) { if (out.containsByteBuf()) {
Object msg = out.poll(); for (;;) {
if (msg == null) { Object msg = out.poll();
break; if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
} }
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
} }
if (decoded) { if (decoded) {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
@ -407,7 +412,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) { protected void callDecode(ChannelHandlerContext ctx, ByteBuf buf) {
boolean wasNull = false; boolean wasNull = false;
ByteBuf in = cumulation; ByteBuf in = cumulation;
MessageBuf<Object> out = decoderOutput(); OutputMessageBuf out = decoderOutput();
boolean decoded = false; boolean decoded = false;
assert out.isEmpty(); assert out.isEmpty();
@ -460,13 +465,19 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
} }
} }
} finally { } finally {
for (;;) { if (out.containsByteBuf()) {
Object msg = out.poll(); for (;;) {
if (msg == null) { Object msg = out.poll();
break; if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
} }
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
} }
if (decoded) { if (decoded) {
decodeWasNull = false; decodeWasNull = false;