Throw NoSuchBufferException instead of returning null

- Exception in this case makes a user less confusing
- To reduce the overhead of filling the stack trace,
  NoSuchBufferException has a public pre-constructed instance.
  - This is necessary because codec framework sometimes need to support
    both type of outbound buffers.
- Fixed a bug where SpdyFrameEncoder did not handle ping messages
- Reduced memory copy in codec embedder (EmbeddedChannel)
This commit is contained in:
Trustin Lee 2012-05-29 17:25:09 -07:00
parent 8237afff64
commit a9948d681e
6 changed files with 87 additions and 34 deletions

View File

@ -84,7 +84,8 @@ public class SpdyFrameEncoder extends MessageToStreamEncoder<Object> {
msg instanceof SpdySettingsFrame || msg instanceof SpdySettingsFrame ||
msg instanceof SpdyNoOpFrame || msg instanceof SpdyNoOpFrame ||
msg instanceof SpdyGoAwayFrame || msg instanceof SpdyGoAwayFrame ||
msg instanceof SpdyHeadersFrame; msg instanceof SpdyHeadersFrame ||
msg instanceof SpdyPingFrame;
} }
@Override @Override

View File

@ -2,8 +2,7 @@ package io.netty.handler.codec;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.NoSuchBufferException;
import java.util.Queue;
class CodecUtil { class CodecUtil {
@ -33,34 +32,32 @@ class CodecUtil {
} }
if (inbound) { if (inbound) {
Queue<Object> dst = ctx.nextInboundMessageBuffer(); try {
if (dst != null) { ctx.nextInboundMessageBuffer().add(msg);
dst.add(msg);
return true; return true;
} else if (msg instanceof ChannelBuffer) { } catch (NoSuchBufferException e) {
ChannelBuffer altDst = ctx.nextInboundByteBuffer(); if (msg instanceof ChannelBuffer) {
ChannelBuffer src = (ChannelBuffer) msg; ChannelBuffer altDst = ctx.nextInboundByteBuffer();
if (altDst != null) { ChannelBuffer src = (ChannelBuffer) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes()); altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true; return true;
} }
} }
} else { } else {
Queue<Object> dst = ctx.nextOutboundMessageBuffer(); try {
if (dst != null) { ctx.nextOutboundMessageBuffer().add(msg);
dst.add(msg);
return true; return true;
} else if (msg instanceof ChannelBuffer) { } catch (NoSuchBufferException e) {
ChannelBuffer altDst = ctx.nextOutboundByteBuffer(); if (msg instanceof ChannelBuffer) {
ChannelBuffer src = (ChannelBuffer) msg; ChannelBuffer altDst = ctx.nextOutboundByteBuffer();
if (altDst != null) { ChannelBuffer src = (ChannelBuffer) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes()); altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true; return true;
} }
} }
} }
throw new IllegalStateException("no suitable destination buffer found"); throw new NoSuchBufferException();
} }
private CodecUtil() { private CodecUtil() {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.embedder; package io.netty.handler.codec.embedder;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelBufferHolders;
@ -39,7 +40,7 @@ class EmbeddedChannel extends AbstractChannel {
EmbeddedChannel(Queue<Object> productQueue) { EmbeddedChannel(Queue<Object> productQueue) {
super(null, null); super(null, null);
this.productQueue = productQueue; this.productQueue = productQueue;
firstOut = ChannelBufferHolders.catchAllBuffer(); firstOut = ChannelBufferHolders.catchAllBuffer(productQueue, ChannelBuffers.dynamicBuffer());
} }
@Override @Override
@ -111,11 +112,7 @@ class EmbeddedChannel extends AbstractChannel {
productQueue.add(byteBuf.readBytes(byteBufLen)); productQueue.add(byteBuf.readBytes(byteBufLen));
byteBuf.clear(); byteBuf.clear();
} }
Queue<Object> msgBuf = buf.messageBuffer(); // We do nothing for message buffer because it's actually productQueue.
if (!msgBuf.isEmpty()) {
productQueue.addAll(msgBuf);
msgBuf.clear();
}
} }
@Override @Override

View File

@ -59,9 +59,21 @@ public final class ChannelBufferHolder<E> {
case 0: case 0:
return msgBuf != null; return msgBuf != null;
case 1: case 1:
return ctx.nextInboundMessageBuffer() != null; // XXX: Should we introduce hasNext(Inbound|Outbound)(Message|Byte)Buffer()
// just because of this?
try {
ctx.nextInboundMessageBuffer();
return true;
} catch (NoSuchBufferException e) {
return false;
}
case 2: case 2:
return ctx.nextOutboundMessageBuffer() != null; try {
ctx.nextOutboundMessageBuffer();
return true;
} catch (NoSuchBufferException e) {
return false;
}
default: default:
throw new Error(); throw new Error();
} }
@ -72,9 +84,19 @@ public final class ChannelBufferHolder<E> {
case 0: case 0:
return byteBuf != null; return byteBuf != null;
case 1: case 1:
return ctx.nextInboundByteBuffer() != null; try {
ctx.nextInboundByteBuffer();
return true;
} catch (NoSuchBufferException e) {
return false;
}
case 2: case 2:
return ctx.nextOutboundByteBuffer() != null; try {
ctx.nextOutboundByteBuffer();
return true;
} catch (NoSuchBufferException e) {
return false;
}
default: default:
throw new Error(); throw new Error();
} }
@ -84,7 +106,7 @@ public final class ChannelBufferHolder<E> {
switch (bypassDirection) { switch (bypassDirection) {
case 0: case 0:
if (msgBuf == null) { if (msgBuf == null) {
throw new IllegalStateException("does not have a message buffer"); throw new NoSuchBufferException();
} }
return msgBuf; return msgBuf;
case 1: case 1:
@ -100,7 +122,7 @@ public final class ChannelBufferHolder<E> {
switch (bypassDirection) { switch (bypassDirection) {
case 0: case 0:
if (byteBuf == null) { if (byteBuf == null) {
throw new IllegalStateException("does not have a byte buffer"); throw new NoSuchBufferException();
} }
return byteBuf; return byteBuf;
case 1: case 1:

View File

@ -1157,7 +1157,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
for (;;) { for (;;) {
ctx = nextInboundContext(ctx.next); ctx = nextInboundContext(ctx.next);
if (ctx == null) { if (ctx == null) {
return null; throw NoSuchBufferException.INSTANCE;
} }
ChannelBufferHolder<Object> nextIn = ctx.inbound(); ChannelBufferHolder<Object> nextIn = ctx.inbound();
if (nextIn.hasByteBuffer()) { if (nextIn.hasByteBuffer()) {
@ -1172,7 +1172,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
for (;;) { for (;;) {
ctx = nextInboundContext(ctx.next); ctx = nextInboundContext(ctx.next);
if (ctx == null) { if (ctx == null) {
return null; throw NoSuchBufferException.INSTANCE;
} }
ChannelBufferHolder<Object> nextIn = ctx.inbound(); ChannelBufferHolder<Object> nextIn = ctx.inbound();
if (nextIn.hasMessageBuffer()) { if (nextIn.hasMessageBuffer()) {
@ -1191,7 +1191,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (lastOut.hasByteBuffer()) { if (lastOut.hasByteBuffer()) {
return lastOut.byteBuffer(); return lastOut.byteBuffer();
} else { } else {
return null; throw NoSuchBufferException.INSTANCE;
} }
} }
ChannelBufferHolder<Object> nextOut = ctx.outbound(); ChannelBufferHolder<Object> nextOut = ctx.outbound();
@ -1211,7 +1211,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (lastOut.hasMessageBuffer()) { if (lastOut.hasMessageBuffer()) {
return lastOut.messageBuffer(); return lastOut.messageBuffer();
} else { } else {
return null; throw NoSuchBufferException.INSTANCE;
} }
} }
ChannelBufferHolder<Object> nextOut = ctx.outbound(); ChannelBufferHolder<Object> nextOut = ctx.outbound();

View File

@ -0,0 +1,36 @@
package io.netty.channel;
/**
* A {@link ChannelPipelineException} which is raised if an inbound or outbound buffer of
* the expected type is not found while transferring data between {@link ChannelHandler}s.
* This exception is usually triggered by an incorrectly configured {@link ChannelPipeline}.
*/
public class NoSuchBufferException extends ChannelPipelineException {
private static final String DEFAULT_MESSAGE =
"Could not find a suitable destination buffer. Double-check if the pipeline is " +
"configured correctly and its handlers works as expected.";
public static final NoSuchBufferException INSTANCE = new NoSuchBufferException();
static {
INSTANCE.setStackTrace(new StackTraceElement[0]);
}
private static final long serialVersionUID = -131650785896627090L;
public NoSuchBufferException() {
this(DEFAULT_MESSAGE);
}
public NoSuchBufferException(String message, Throwable cause) {
super(message, cause);
}
public NoSuchBufferException(String message) {
super(message);
}
public NoSuchBufferException(Throwable cause) {
super(cause);
}
}