Add ABORT signal to Channel*MessageHandlerAdapter

- Related: #1030
This commit is contained in:
Trustin Lee 2013-02-09 20:11:16 +09:00
parent 779870321c
commit 51daf2a6a2
3 changed files with 38 additions and 7 deletions

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.buffer.BufUtil; import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.util.internal.Signal;
import io.netty.util.internal.TypeParameterMatcher; import io.netty.util.internal.TypeParameterMatcher;
/** /**
@ -45,6 +46,11 @@ import io.netty.util.internal.TypeParameterMatcher;
public abstract class ChannelInboundMessageHandlerAdapter<I> public abstract class ChannelInboundMessageHandlerAdapter<I>
extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<I> { extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<I> {
/**
* Thrown by {@link #messageReceived(ChannelHandlerContext, Object)} to abort message processing.
*/
protected static final Signal ABORT = new Signal(ChannelInboundMessageHandlerAdapter.class.getName() + ".ABORT");
private final TypeParameterMatcher msgMatcher; private final TypeParameterMatcher msgMatcher;
protected ChannelInboundMessageHandlerAdapter() { protected ChannelInboundMessageHandlerAdapter() {
@ -70,11 +76,11 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
@Override @Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
if (!beginMessageReceived(ctx)) { MessageBuf<Object> in = ctx.inboundMessageBuffer();
if (in.isEmpty() || !beginMessageReceived(ctx)) {
return; return;
} }
MessageBuf<Object> in = ctx.inboundMessageBuffer();
MessageBuf<Object> out = ctx.nextInboundMessageBuffer(); MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
int oldOutSize = out.size(); int oldOutSize = out.size();
try { try {
@ -103,6 +109,9 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
I imsg = (I) msg; I imsg = (I) msg;
try { try {
messageReceived(ctx, imsg); messageReceived(ctx, imsg);
} catch (Signal abort) {
abort.expect(ABORT);
break;
} finally { } finally {
freeInboundMessage(imsg); freeInboundMessage(imsg);
} }

View File

@ -19,6 +19,7 @@ import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.Signal;
import io.netty.util.internal.TypeParameterMatcher; import io.netty.util.internal.TypeParameterMatcher;
/** /**
@ -29,6 +30,11 @@ import io.netty.util.internal.TypeParameterMatcher;
public abstract class ChannelOutboundMessageHandlerAdapter<I> public abstract class ChannelOutboundMessageHandlerAdapter<I>
extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> { extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> {
/**
* Thrown by {@link #flush(ChannelHandlerContext, Object)} to abort message processing.
*/
protected static final Signal ABORT = new Signal(ChannelOutboundMessageHandlerAdapter.class.getName() + ".ABORT");
private final TypeParameterMatcher msgMatcher; private final TypeParameterMatcher msgMatcher;
private boolean closeOnFailedFlush = true; private boolean closeOnFailedFlush = true;
@ -76,8 +82,12 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
MessageBuf<Object> out = null; MessageBuf<Object> out = null;
final int inSize = in.size(); final int inSize = in.size();
int processed = 0; if (inSize == 0) {
ctx.flush(promise);
return;
}
int processed = 0;
try { try {
beginFlush(ctx); beginFlush(ctx);
for (;;) { for (;;) {
@ -105,8 +115,16 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
fail(ctx, promise, new PartialFlushException( PartialFlushException pfe;
processed + " out of " + inSize + " message(s) flushed; " + in.size() + " left", t)); String msg = processed + " out of " + inSize + " message(s) flushed";
if (t instanceof Signal) {
Signal abort = (Signal) t;
abort.expect(ABORT);
pfe = new PartialFlushException("aborted by " + getClass().getSimpleName() + ": " + msg);
} else {
pfe = new PartialFlushException(msg, t);
}
fail(ctx, promise, pfe);
} }
try { try {

View File

@ -23,8 +23,12 @@ package io.netty.channel;
public class PartialFlushException extends RuntimeException { public class PartialFlushException extends RuntimeException {
private static final long serialVersionUID = 990261865971015004L; private static final long serialVersionUID = 990261865971015004L;
public PartialFlushException(String msg, Throwable cause) { public PartialFlushException(String message) {
super(msg, cause); super(message);
}
public PartialFlushException(String message, Throwable cause) {
super(message, cause);
} }
public PartialFlushException(Throwable cause) { public PartialFlushException(Throwable cause) {