Move drain logic to OutputMessageBuf and optimize it as far as possible

This commit is contained in:
Norman Maurer 2013-04-03 18:52:57 +02:00
parent 1675e61f5b
commit 7ee2adb587
5 changed files with 103 additions and 100 deletions

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.ChannelInboundByteHandlerAdapter;
@ -98,22 +97,7 @@ public abstract class ByteToMessageDecoder
throw new DecoderException(t); throw new DecoderException(t);
} }
} finally { } finally {
boolean decoded = false; if (out.drainToNextInbound(ctx)) {
if (out.containsByteBuf()) {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
}
}
if (decoded) {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} }
ctx.fireChannelInactive(); ctx.fireChannelInactive();
@ -122,7 +106,6 @@ public abstract class ByteToMessageDecoder
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) { protected void callDecode(ChannelHandlerContext ctx, ByteBuf in) {
boolean wasNull = false; boolean wasNull = false;
boolean decoded = false;
OutputMessageBuf out = OutputMessageBuf.get(); OutputMessageBuf out = OutputMessageBuf.get();
assert out.isEmpty(); assert out.isEmpty();
@ -160,22 +143,7 @@ public abstract class ByteToMessageDecoder
} }
} }
} finally { } finally {
if (out.containsByteBuf()) { if (out.drainToNextInbound(ctx)) {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
}
}
if (decoded) {
decodeWasNull = false; decodeWasNull = false;
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} else { } else {

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
@ -54,17 +53,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHa
try { try {
decode(ctx, msg, out); decode(ctx, msg, out);
} finally { } finally {
if (out.containsByteBuf()) { out.drainToNextInbound(ctx);
for (;;) {
Object decoded = out.poll();
if (decoded == null) {
break;
}
ChannelHandlerUtil.addToNextInboundBuffer(ctx, decoded);
}
} else {
out.drainTo(ctx.nextInboundMessageBuffer());
}
} }
} }

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
/** /**
@ -61,19 +60,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
throw new EncoderException(cause); throw new EncoderException(cause);
} }
} finally { } finally {
if (out.containsByteBuf()) { out.drainToNextOutbound(ctx);
for (;;) {
Object encoded = out.poll();
if (encoded == null) {
break;
}
// Handle special case when the encoded output is a ByteBuf and the next handler in the pipeline
// accept bytes. Related to #1222
ChannelHandlerUtil.addToNextOutboundBuffer(ctx, encoded);
}
} else {
out.drainTo(ctx.nextOutboundMessageBuffer());
}
} }
} }

View File

@ -15,11 +15,15 @@
*/ */
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.DefaultMessageBuf; import io.netty.buffer.DefaultMessageBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
final class OutputMessageBuf extends DefaultMessageBuf<Object> { final class OutputMessageBuf extends DefaultMessageBuf<Object> {
private int byteBufs; private int byteBufs;
private int nonByteBufs;
private static final ThreadLocal<OutputMessageBuf> output = private static final ThreadLocal<OutputMessageBuf> output =
new ThreadLocal<OutputMessageBuf>() { new ThreadLocal<OutputMessageBuf>() {
@ -47,8 +51,12 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
@Override @Override
public boolean offer(Object e) { public boolean offer(Object e) {
boolean added = super.offer(e); boolean added = super.offer(e);
if (added && e instanceof ByteBuf) { if (added) {
if (e instanceof ByteBuf) {
byteBufs++; byteBufs++;
} else {
nonByteBufs++;
}
} }
return added; return added;
} }
@ -57,8 +65,12 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
public boolean remove(Object o) { public boolean remove(Object o) {
boolean removed = super.remove(o); boolean removed = super.remove(o);
if (removed && o instanceof ByteBuf) { if (removed) {
if (o instanceof ByteBuf) {
byteBufs--; byteBufs--;
} else {
nonByteBufs--;
}
} }
return removed; return removed;
} }
@ -66,8 +78,13 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
@Override @Override
public Object poll() { public Object poll() {
Object o = super.poll(); Object o = super.poll();
if (o == null) {
return o;
}
if (o instanceof ByteBuf) { if (o instanceof ByteBuf) {
byteBufs--; byteBufs--;
} else {
nonByteBufs--;
} }
return o; return o;
} }
@ -76,9 +93,84 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
public void clear() { public void clear() {
super.clear(); super.clear();
byteBufs = 0; byteBufs = 0;
nonByteBufs = 0;
} }
public boolean containsByteBuf() { private boolean containsByteBuf() {
return byteBufs > 0; return byteBufs > 0;
} }
private boolean containsNonByteBuf() {
return nonByteBufs > 0;
}
public boolean drainToNextInbound(ChannelHandlerContext ctx) {
if (containsByteBuf() && ctx.nextInboundBufferType() == BufType.BYTE) {
ByteBuf buf = ctx.nextInboundByteBuffer();
boolean drained = false;
if (!containsNonByteBuf()) {
for (;;) {
Object o = poll();
if (o == null) {
break;
}
buf.writeBytes((ByteBuf) o);
drained = true;
}
} else {
// mixed case
MessageBuf<Object> msgBuf = ctx.nextInboundMessageBuffer();
for (;;) {
Object o = poll();
if (o == null) {
break;
}
if (o instanceof ByteBuf) {
buf.writeBytes((ByteBuf) o);
} else {
msgBuf.add(o);
}
drained = true;
}
}
return drained;
} else {
return drainTo(ctx.nextInboundMessageBuffer()) > 0;
}
}
public boolean drainToNextOutbound(ChannelHandlerContext ctx) {
if (containsByteBuf() && ctx.nextOutboundBufferType() == BufType.BYTE) {
ByteBuf buf = ctx.nextOutboundByteBuffer();
boolean drained = false;
if (!containsNonByteBuf()) {
for (;;) {
Object o = poll();
if (o == null) {
break;
}
buf.writeBytes((ByteBuf) o);
drained = true;
}
} else {
// mixed case
MessageBuf<Object> msgBuf = ctx.nextOutboundMessageBuffer();
for (;;) {
Object o = poll();
if (o == null) {
break;
}
if (o instanceof ByteBuf) {
buf.writeBytes((ByteBuf) o);
} else {
msgBuf.add(o);
}
drained = true;
}
}
return drained;
} else {
return drainTo(ctx.nextOutboundMessageBuffer()) > 0;
}
}
} }

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.util.Signal; import io.netty.util.Signal;
@ -384,23 +383,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
throw new DecoderException(t); throw new DecoderException(t);
} }
} finally { } finally {
if (out.drainToNextInbound(ctx)) {
boolean decoded = false;
if (out.containsByteBuf()) {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
}
}
if (decoded) {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} }
@ -413,8 +396,6 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
boolean wasNull = false; boolean wasNull = false;
ByteBuf in = cumulation; ByteBuf in = cumulation;
OutputMessageBuf out = OutputMessageBuf.get(); OutputMessageBuf out = OutputMessageBuf.get();
boolean decoded = false;
assert out.isEmpty(); assert out.isEmpty();
try { try {
@ -465,21 +446,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
} }
} }
} finally { } finally {
if (out.containsByteBuf()) { if (out.drainToNextInbound(ctx)) {
for (;;) {
Object msg = out.poll();
if (msg == null) {
break;
}
decoded = true;
ChannelHandlerUtil.addToNextInboundBuffer(ctx, msg);
}
} else {
if (out.drainTo(ctx.nextInboundMessageBuffer()) > 0) {
decoded = true;
}
}
if (decoded) {
decodeWasNull = false; decodeWasNull = false;
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} else { } else {