Add begin/endFlush() and closeOnFailedFlush to ChannelOutboundMessageHandlerAdapter / Make ChannelInboundMessageHandlerAdapter stop processing on first exception to avoid excessive exceptionCaught() events against pipelined messages.

This commit is contained in:
Trustin Lee 2013-02-09 17:31:20 +09:00
parent cedcee3f42
commit 139b1b8382
2 changed files with 95 additions and 52 deletions

View File

@ -85,32 +85,30 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
break; break;
} }
if (!acceptInboundMessage(msg)) {
out.add(msg);
unsupportedFound = true;
continue;
}
if (unsupportedFound) {
// the last message were unsupported, but now we received one that is supported.
// So reset the flag and notify the next context
unsupportedFound = false;
ctx.fireInboundBufferUpdated();
oldOutSize = out.size();
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
try { try {
if (!acceptInboundMessage(msg)) { messageReceived(ctx, imsg);
out.add(msg); } finally {
unsupportedFound = true; freeInboundMessage(imsg);
continue;
}
if (unsupportedFound) {
// the last message were unsupported, but now we received one that is supported.
// So reset the flag and notify the next context
unsupportedFound = false;
ctx.fireInboundBufferUpdated();
oldOutSize = out.size();
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
try {
messageReceived(ctx, imsg);
} finally {
freeInboundMessage(imsg);
}
} catch (Throwable t) {
exceptionCaught(ctx, t);
} }
} }
} catch (Throwable t) {
exceptionCaught(ctx, t);
} finally { } finally {
if (oldOutSize != out.size()) { if (oldOutSize != out.size()) {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
@ -140,7 +138,8 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
* *
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/ */
protected boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception { protected boolean beginMessageReceived(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception {
return true; return true;
} }
@ -149,19 +148,18 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
* *
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @param msg the message to handle * @param msg the message to handle
* @throws Exception thrown when an error accour
*/ */
protected abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception; protected abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;
/** /**
* Is called after all messages of the {@link MessageBuf} was consumed. * Is called when {@link #messageReceived(ChannelHandlerContext, Object)} returns.
* *
* Super-classes may-override this for special handling. * Super-classes may-override this for special handling.
* *
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @throws Exception thrown when an error accour
*/ */
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception { protected void endMessageReceived(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.buffer.BufUtil; import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.TypeParameterMatcher; import io.netty.util.internal.TypeParameterMatcher;
/** /**
@ -29,6 +30,7 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> { extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> {
private final TypeParameterMatcher msgMatcher; private final TypeParameterMatcher msgMatcher;
private boolean closeOnFailedFlush = true;
protected ChannelOutboundMessageHandlerAdapter() { protected ChannelOutboundMessageHandlerAdapter() {
this(ChannelOutboundMessageHandlerAdapter.class, 0); this(ChannelOutboundMessageHandlerAdapter.class, 0);
@ -41,6 +43,14 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
msgMatcher = TypeParameterMatcher.find(this, parameterizedHandlerType, messageTypeParamIndex); msgMatcher = TypeParameterMatcher.find(this, parameterizedHandlerType, messageTypeParamIndex);
} }
protected final boolean isCloseOnFailedFlush() {
return closeOnFailedFlush;
}
protected final void setCloseOnFailedFlush(boolean closeOnFailedFlush) {
this.closeOnFailedFlush = closeOnFailedFlush;
}
@Override @Override
public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer(); return Unpooled.messageBuffer();
@ -64,52 +74,87 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer(); MessageBuf<Object> in = ctx.outboundMessageBuffer();
MessageBuf<Object> out = null; MessageBuf<Object> out = null;
ChannelPromise nextPromise = promise;
final int inSize = in.size();
int processed = 0;
try { try {
beginFlush(ctx);
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
break; break;
} }
try { if (!acceptOutboundMessage(msg)) {
if (!acceptOutboundMessage(msg)) { if (out == null) {
if (out == null) { out = ctx.nextOutboundMessageBuffer();
out = ctx.nextOutboundMessageBuffer();
}
out.add(msg);
continue;
} }
out.add(msg);
processed ++;
continue;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
I imsg = (I) msg; I imsg = (I) msg;
try { try {
flush(ctx, imsg); flush(ctx, imsg);
} finally { processed ++;
freeOutboundMessage(imsg); } finally {
} freeOutboundMessage(imsg);
} catch (Throwable t) {
if (!promise.isDone()) {
promise.setFailure(new PartialFlushException(
"faied to encode all messages associated with the future", t));
nextPromise = ctx.newPromise();
}
} }
} }
} finally { } catch (Throwable t) {
ctx.flush(nextPromise); fail(ctx, promise, new PartialFlushException(
processed + " out of " + inSize + " message(s) flushed; " + in.size() + " left", t));
}
try {
endFlush(ctx);
} catch (Throwable t) {
if (promise.isDone()) {
InternalLoggerFactory.getInstance(getClass()).warn(
"endFlush() raised a masked exception due to failed flush().", t);
} else {
fail(ctx, promise, t);
}
return;
}
ctx.flush(promise);
}
private void fail(ChannelHandlerContext ctx, ChannelPromise promise, Throwable cause) {
promise.setFailure(cause);
if (isCloseOnFailedFlush()) {
ctx.close();
} }
} }
/**
* Will get notified once {@link #flush(ChannelHandlerContext, ChannelPromise)} was called.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
protected void beginFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { }
/** /**
* Is called once a message is being flushed. * Is called once a message is being flushed.
* *
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @param msg the message to handle * @param msg the message to handle
* @throws Exception thrown when an error accour
*/ */
protected abstract void flush(ChannelHandlerContext ctx, I msg) throws Exception; protected abstract void flush(ChannelHandlerContext ctx, I msg) throws Exception;
/**
* Is called when {@link #flush(ChannelHandlerContext, ChannelPromise)} returns.
*
* Super-classes may-override this for special handling.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
protected void endFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { }
/** /**
* Is called after a message was processed via {@link #flush(ChannelHandlerContext, Object)} to free * Is called after a message was processed via {@link #flush(ChannelHandlerContext, Object)} to free
* up any resources that is held by the outbound message. You may want to override this if your implementation * up any resources that is held by the outbound message. You may want to override this if your implementation