From 51daf2a6a2575a4114a3f4bd926f708a853e3d9d Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sat, 9 Feb 2013 20:11:16 +0900 Subject: [PATCH] Add ABORT signal to Channel*MessageHandlerAdapter - Related: #1030 --- .../ChannelInboundMessageHandlerAdapter.java | 13 ++++++++-- .../ChannelOutboundMessageHandlerAdapter.java | 24 ++++++++++++++++--- .../netty/channel/PartialFlushException.java | 8 +++++-- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index cf1333fd82..b20edd8e0b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -18,6 +18,7 @@ package io.netty.channel; import io.netty.buffer.BufUtil; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; +import io.netty.util.internal.Signal; import io.netty.util.internal.TypeParameterMatcher; /** @@ -45,6 +46,11 @@ import io.netty.util.internal.TypeParameterMatcher; public abstract class ChannelInboundMessageHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler { + /** + * Thrown by {@link #messageReceived(ChannelHandlerContext, Object)} to abort message processing. + */ + protected static final Signal ABORT = new Signal(ChannelInboundMessageHandlerAdapter.class.getName() + ".ABORT"); + private final TypeParameterMatcher msgMatcher; protected ChannelInboundMessageHandlerAdapter() { @@ -70,11 +76,11 @@ public abstract class ChannelInboundMessageHandlerAdapter @Override public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { - if (!beginMessageReceived(ctx)) { + MessageBuf in = ctx.inboundMessageBuffer(); + if (in.isEmpty() || !beginMessageReceived(ctx)) { return; } - MessageBuf in = ctx.inboundMessageBuffer(); MessageBuf out = ctx.nextInboundMessageBuffer(); int oldOutSize = out.size(); try { @@ -103,6 +109,9 @@ public abstract class ChannelInboundMessageHandlerAdapter I imsg = (I) msg; try { messageReceived(ctx, imsg); + } catch (Signal abort) { + abort.expect(ABORT); + break; } finally { freeInboundMessage(imsg); } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java index 6af7b09654..1c249303f4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java @@ -19,6 +19,7 @@ import io.netty.buffer.BufUtil; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.logging.InternalLoggerFactory; +import io.netty.util.internal.Signal; import io.netty.util.internal.TypeParameterMatcher; /** @@ -29,6 +30,11 @@ import io.netty.util.internal.TypeParameterMatcher; public abstract class ChannelOutboundMessageHandlerAdapter extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler { + /** + * Thrown by {@link #flush(ChannelHandlerContext, Object)} to abort message processing. + */ + protected static final Signal ABORT = new Signal(ChannelOutboundMessageHandlerAdapter.class.getName() + ".ABORT"); + private final TypeParameterMatcher msgMatcher; private boolean closeOnFailedFlush = true; @@ -76,8 +82,12 @@ public abstract class ChannelOutboundMessageHandlerAdapter MessageBuf out = null; final int inSize = in.size(); - int processed = 0; + if (inSize == 0) { + ctx.flush(promise); + return; + } + int processed = 0; try { beginFlush(ctx); for (;;) { @@ -105,8 +115,16 @@ public abstract class ChannelOutboundMessageHandlerAdapter } } } catch (Throwable t) { - fail(ctx, promise, new PartialFlushException( - processed + " out of " + inSize + " message(s) flushed; " + in.size() + " left", t)); + PartialFlushException pfe; + String msg = processed + " out of " + inSize + " message(s) flushed"; + if (t instanceof Signal) { + Signal abort = (Signal) t; + abort.expect(ABORT); + pfe = new PartialFlushException("aborted by " + getClass().getSimpleName() + ": " + msg); + } else { + pfe = new PartialFlushException(msg, t); + } + fail(ctx, promise, pfe); } try { diff --git a/transport/src/main/java/io/netty/channel/PartialFlushException.java b/transport/src/main/java/io/netty/channel/PartialFlushException.java index f70e16e75e..84db69e669 100644 --- a/transport/src/main/java/io/netty/channel/PartialFlushException.java +++ b/transport/src/main/java/io/netty/channel/PartialFlushException.java @@ -23,8 +23,12 @@ package io.netty.channel; public class PartialFlushException extends RuntimeException { private static final long serialVersionUID = 990261865971015004L; - public PartialFlushException(String msg, Throwable cause) { - super(msg, cause); + public PartialFlushException(String message) { + super(message); + } + + public PartialFlushException(String message, Throwable cause) { + super(message, cause); } public PartialFlushException(Throwable cause) {