Optimize OutputMessageBuf.drainToNextInbound/Outbound()
- Return early when the buffer is empty - Keep only the number of byte buffers - Remove unnecessary null check in the loop (because we know buffer is not empty at certain point)
This commit is contained in:
parent
8a672c2800
commit
46540578fc
@ -22,8 +22,7 @@ import io.netty.buffer.MessageBuf;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
|
||||||
final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
||||||
private int byteBufs;
|
private int byteBufCnt;
|
||||||
private int nonByteBufs;
|
|
||||||
|
|
||||||
private static final ThreadLocal<OutputMessageBuf> output =
|
private static final ThreadLocal<OutputMessageBuf> output =
|
||||||
new ThreadLocal<OutputMessageBuf>() {
|
new ThreadLocal<OutputMessageBuf>() {
|
||||||
@ -53,9 +52,7 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
|||||||
boolean added = super.offer(e);
|
boolean added = super.offer(e);
|
||||||
if (added) {
|
if (added) {
|
||||||
if (e instanceof ByteBuf) {
|
if (e instanceof ByteBuf) {
|
||||||
byteBufs++;
|
byteBufCnt ++;
|
||||||
} else {
|
|
||||||
nonByteBufs++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return added;
|
return added;
|
||||||
@ -64,12 +61,9 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
|||||||
@Override
|
@Override
|
||||||
public boolean remove(Object o) {
|
public boolean remove(Object o) {
|
||||||
boolean removed = super.remove(o);
|
boolean removed = super.remove(o);
|
||||||
|
|
||||||
if (removed) {
|
if (removed) {
|
||||||
if (o instanceof ByteBuf) {
|
if (o instanceof ByteBuf) {
|
||||||
byteBufs--;
|
byteBufCnt --;
|
||||||
} else {
|
|
||||||
nonByteBufs--;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return removed;
|
return removed;
|
||||||
@ -82,9 +76,7 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
|||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
if (o instanceof ByteBuf) {
|
if (o instanceof ByteBuf) {
|
||||||
byteBufs--;
|
byteBufCnt --;
|
||||||
} else {
|
|
||||||
nonByteBufs--;
|
|
||||||
}
|
}
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
@ -92,85 +84,84 @@ final class OutputMessageBuf extends DefaultMessageBuf<Object> {
|
|||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
super.clear();
|
super.clear();
|
||||||
byteBufs = 0;
|
byteBufCnt = 0;
|
||||||
nonByteBufs = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean containsByteBuf() {
|
|
||||||
return byteBufs > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean containsNonByteBuf() {
|
|
||||||
return nonByteBufs > 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean drainToNextInbound(ChannelHandlerContext ctx) {
|
public boolean drainToNextInbound(ChannelHandlerContext ctx) {
|
||||||
if (containsByteBuf() && ctx.nextInboundBufferType() == BufType.BYTE) {
|
final int size = size();
|
||||||
ByteBuf buf = ctx.nextInboundByteBuffer();
|
if (size == 0) {
|
||||||
boolean drained = false;
|
return false;
|
||||||
if (!containsNonByteBuf()) {
|
}
|
||||||
for (;;) {
|
|
||||||
Object o = poll();
|
final int byteBufCnt = this.byteBufCnt;
|
||||||
if (o == null) {
|
if (byteBufCnt == 0 || ctx.nextInboundBufferType() != BufType.BYTE) {
|
||||||
break;
|
|
||||||
}
|
|
||||||
buf.writeBytes((ByteBuf) o);
|
|
||||||
drained = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// mixed case
|
|
||||||
MessageBuf<Object> msgBuf = ctx.nextInboundMessageBuffer();
|
|
||||||
for (;;) {
|
|
||||||
Object o = poll();
|
|
||||||
if (o == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (o instanceof ByteBuf) {
|
|
||||||
buf.writeBytes((ByteBuf) o);
|
|
||||||
} else {
|
|
||||||
msgBuf.add(o);
|
|
||||||
}
|
|
||||||
drained = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return drained;
|
|
||||||
} else {
|
|
||||||
return drainTo(ctx.nextInboundMessageBuffer()) > 0;
|
return drainTo(ctx.nextInboundMessageBuffer()) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final ByteBuf nextByteBuf = ctx.nextInboundByteBuffer();
|
||||||
|
if (byteBufCnt == size) {
|
||||||
|
// Contains only ByteBufs
|
||||||
|
for (Object o = poll();;) {
|
||||||
|
nextByteBuf.writeBytes((ByteBuf) o);
|
||||||
|
if ((o = poll()) == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Contains both ByteBufs and non-ByteBufs (0 < byteBufCnt < size())
|
||||||
|
final MessageBuf<Object> nextMsgBuf = ctx.nextInboundMessageBuffer();
|
||||||
|
for (Object o = poll();;) {
|
||||||
|
if (o instanceof ByteBuf) {
|
||||||
|
nextByteBuf.writeBytes((ByteBuf) o);
|
||||||
|
} else {
|
||||||
|
nextMsgBuf.add(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((o = poll()) == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean drainToNextOutbound(ChannelHandlerContext ctx) {
|
public boolean drainToNextOutbound(ChannelHandlerContext ctx) {
|
||||||
if (containsByteBuf() && ctx.nextOutboundBufferType() == BufType.BYTE) {
|
final int size = size();
|
||||||
ByteBuf buf = ctx.nextOutboundByteBuffer();
|
if (size == 0) {
|
||||||
boolean drained = false;
|
return false;
|
||||||
if (!containsNonByteBuf()) {
|
}
|
||||||
for (;;) {
|
|
||||||
Object o = poll();
|
final int byteBufCnt = this.byteBufCnt;
|
||||||
if (o == null) {
|
if (byteBufCnt == 0 || ctx.nextOutboundBufferType() != BufType.BYTE) {
|
||||||
break;
|
|
||||||
}
|
|
||||||
buf.writeBytes((ByteBuf) o);
|
|
||||||
drained = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// mixed case
|
|
||||||
MessageBuf<Object> msgBuf = ctx.nextOutboundMessageBuffer();
|
|
||||||
for (;;) {
|
|
||||||
Object o = poll();
|
|
||||||
if (o == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (o instanceof ByteBuf) {
|
|
||||||
buf.writeBytes((ByteBuf) o);
|
|
||||||
} else {
|
|
||||||
msgBuf.add(o);
|
|
||||||
}
|
|
||||||
drained = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return drained;
|
|
||||||
} else {
|
|
||||||
return drainTo(ctx.nextOutboundMessageBuffer()) > 0;
|
return drainTo(ctx.nextOutboundMessageBuffer()) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final ByteBuf nextByteBuf = ctx.nextOutboundByteBuffer();
|
||||||
|
if (byteBufCnt == size) {
|
||||||
|
// Contains only ByteBufs
|
||||||
|
for (Object o = poll();;) {
|
||||||
|
nextByteBuf.writeBytes((ByteBuf) o);
|
||||||
|
if ((o = poll()) == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Contains both ByteBufs and non-ByteBufs (0 < byteBufCnt < size())
|
||||||
|
final MessageBuf<Object> nextMsgBuf = ctx.nextOutboundMessageBuffer();
|
||||||
|
for (Object o = poll();;) {
|
||||||
|
if (o instanceof ByteBuf) {
|
||||||
|
nextByteBuf.writeBytes((ByteBuf) o);
|
||||||
|
} else {
|
||||||
|
nextMsgBuf.add(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((o = poll()) == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user