Provide a way to implement an ChannelInbound/OutboundMessageHandler conveniently without extending an adapter class

- Add ChannelHandlerUtil and move the core logic of ChannelInbound/OutboundMessageHandler to ChannelHandlerUtil
- Add ChannelHandlerUtil.SingleInbound/OutboundMessageHandler and make ChannelInbound/OutboundMessageHandlerAdapter implement them.  This is a backward incompatible change because it forces all handler methods to be public (was protected previously)
- Fixes: #1119
This commit is contained in:
Trustin Lee 2013-03-05 12:52:05 +09:00
parent 307e6c47d8
commit a8a7c4f576
21 changed files with 277 additions and 205 deletions

View File

@ -28,7 +28,7 @@ public class HttpRequestEncoder extends HttpObjectEncoder<HttpRequest> {
private static final char SLASH = '/'; private static final char SLASH = '/';
@Override @Override
protected boolean acceptOutboundMessage(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return super.acceptOutboundMessage(msg) && !(msg instanceof HttpResponse); return super.acceptOutboundMessage(msg) && !(msg instanceof HttpResponse);
} }

View File

@ -27,7 +27,7 @@ import static io.netty.handler.codec.http.HttpConstants.*;
public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> { public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> {
@Override @Override
protected boolean acceptOutboundMessage(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return super.acceptOutboundMessage(msg) && !(msg instanceof HttpRequest); return super.acceptOutboundMessage(msg) && !(msg instanceof HttpRequest);
} }

View File

@ -41,7 +41,7 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
@Override @Override
protected void flush(ChannelHandlerContext ctx, I msg) throws Exception { public void flush(ChannelHandlerContext ctx, I msg) throws Exception {
try { try {
encode(ctx, msg, ctx.nextOutboundByteBuffer()); encode(ctx, msg, ctx.nextOutboundByteBuffer());
} catch (CodecException e) { } catch (CodecException e) {

View File

@ -45,7 +45,7 @@ import io.netty.channel.ChannelInboundMessageHandlerAdapter;
public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHandlerAdapter<I> { public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHandlerAdapter<I> {
@Override @Override
protected final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg)); ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg));
} }

View File

@ -43,7 +43,7 @@ import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
@Override @Override
protected final void flush(ChannelHandlerContext ctx, I msg) throws Exception { public final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
try { try {
ctx.nextOutboundMessageBuffer().unfoldAndAdd(encode(ctx, msg)); ctx.nextOutboundMessageBuffer().unfoldAndAdd(encode(ctx, msg));
} catch (CodecException e) { } catch (CodecException e) {

View File

@ -60,7 +60,7 @@ public class ByteArrayEncoder extends ChannelOutboundMessageHandlerAdapter<byte[
} }
@Override @Override
protected void flush(ChannelHandlerContext ctx, byte[] msg) throws Exception { public void flush(ChannelHandlerContext ctx, byte[] msg) throws Exception {
if (msg.length == 0) { if (msg.length == 0) {
return; return;
} }

View File

@ -76,7 +76,7 @@ public class StringEncoder extends ChannelOutboundMessageHandlerAdapter<CharSequ
} }
@Override @Override
protected void flush(ChannelHandlerContext ctx, CharSequence msg) throws Exception { public void flush(ChannelHandlerContext ctx, CharSequence msg) throws Exception {
ByteBuf encoded = Unpooled.copiedBuffer(msg, charset); ByteBuf encoded = Unpooled.copiedBuffer(msg, charset);
switch (nextBufferType) { switch (nextBufferType) {

View File

@ -190,7 +190,7 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
} }
@Override @Override
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception { public void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); ctx.flush();
} }

View File

@ -110,7 +110,7 @@ public class AutobahnServerHandler extends ChannelInboundMessageHandlerAdapter<O
} }
@Override @Override
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception { public void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
if (handshaker != null) { if (handshaker != null) {
ctx.flush().addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ctx.flush().addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} }

View File

@ -26,7 +26,7 @@ public class RxtxClientHandler extends ChannelInboundMessageHandlerAdapter<Strin
} }
@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
if ("OK".equals(msg)) { if ("OK".equals(msg)) {
System.out.println("Serial port responded to AT"); System.out.println("Serial port responded to AT");
} else { } else {

View File

@ -68,7 +68,7 @@ public class MsgEchoClientHandler extends
} }
@Override @Override
protected void messageReceived(final ChannelHandlerContext ctx, public void messageReceived(final ChannelHandlerContext ctx,
final UdtMessage message) throws Exception { final UdtMessage message) throws Exception {
final ByteBuf byteBuf = message.data(); final ByteBuf byteBuf = message.data();
meter.mark(byteBuf.readableBytes()); meter.mark(byteBuf.readableBytes());

View File

@ -47,7 +47,7 @@ public class MsgEchoServerHandler extends
} }
@Override @Override
protected void messageReceived(final ChannelHandlerContext ctx, public void messageReceived(final ChannelHandlerContext ctx,
final UdtMessage message) throws Exception { final UdtMessage message) throws Exception {
final MessageBuf<Object> out = ctx.nextOutboundMessageBuffer(); final MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
out.add(message); out.add(message);

View File

@ -68,7 +68,7 @@ public class MsgEchoPeerHandler extends
} }
@Override @Override
protected void messageReceived(final ChannelHandlerContext ctx, public void messageReceived(final ChannelHandlerContext ctx,
final UdtMessage message) throws Exception { final UdtMessage message) throws Exception {
final ByteBuf byteBuf = message.data(); final ByteBuf byteBuf = message.data();
meter.mark(byteBuf.readableBytes()); meter.mark(byteBuf.readableBytes());

View File

@ -160,7 +160,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
@Override @Override
protected void messageReceived(final ChannelHandlerContext ctx, String msg) throws Exception { public void messageReceived(final ChannelHandlerContext ctx, String msg) throws Exception {
if ("StartTlsResponse".equals(msg)) { if ("StartTlsResponse".equals(msg)) {
ctx.pipeline().addAfter("logger", "ssl", sslHandler); ctx.pipeline().addAfter("logger", "ssl", sslHandler);
handshakeFuture = sslHandler.handshake(); handshakeFuture = sslHandler.handshake();
@ -202,7 +202,7 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
@Override @Override
protected void messageReceived(final ChannelHandlerContext ctx, String msg) throws Exception { public void messageReceived(final ChannelHandlerContext ctx, String msg) throws Exception {
if ("StartTlsRequest".equals(msg)) { if ("StartTlsRequest".equals(msg)) {
ctx.pipeline().addAfter("logger", "ssl", sslHandler); ctx.pipeline().addAfter("logger", "ssl", sslHandler);
ctx.write("StartTlsResponse\n"); ctx.write("StartTlsResponse\n");

View File

@ -40,7 +40,7 @@ public class SctpInboundByteStreamHandler extends ChannelInboundMessageHandlerAd
} }
@Override @Override
protected final boolean acceptInboundMessage(Object msg) throws Exception { public final boolean acceptInboundMessage(Object msg) throws Exception {
if (super.acceptInboundMessage(msg)) { if (super.acceptInboundMessage(msg)) {
return acceptInboundMessage((SctpMessage) msg); return acceptInboundMessage((SctpMessage) msg);
} }
@ -52,7 +52,7 @@ public class SctpInboundByteStreamHandler extends ChannelInboundMessageHandlerAd
} }
@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
if (!msg.isComplete()) { if (!msg.isComplete()) {
throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in the " + throw new CodecException(String.format("Received SctpMessage is not complete, please add %s in the " +
"pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName())); "pipeline before this handler", SctpMessageCompletionHandler.class.getSimpleName()));

View File

@ -36,13 +36,13 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd
private boolean assembled; private boolean assembled;
@Override @Override
protected boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception { public boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception {
assembled = false; assembled = false;
return super.beginMessageReceived(ctx); return super.beginMessageReceived(ctx);
} }
@Override @Override
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception { public void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
if (assembled) { if (assembled) {
assembled = false; assembled = false;
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
@ -51,7 +51,7 @@ public class SctpMessageCompletionHandler extends ChannelInboundMessageHandlerAd
} }
@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, SctpMessage msg) throws Exception {
final ByteBuf byteBuf = msg.data(); final ByteBuf byteBuf = msg.data();
final int protocolIdentifier = msg.protocolIdentifier(); final int protocolIdentifier = msg.protocolIdentifier();

View File

@ -75,7 +75,7 @@ public class EchoMessageHandler extends
} }
@Override @Override
protected void messageReceived(final ChannelHandlerContext ctx, final UdtMessage message) throws Exception { public void messageReceived(final ChannelHandlerContext ctx, final UdtMessage message) throws Exception {
final ByteBuf byteBuf = message.data(); final ByteBuf byteBuf = message.data();

View File

@ -0,0 +1,227 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf;
import io.netty.util.Signal;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
public final class ChannelHandlerUtil {
public static final Signal ABORT = new Signal(ChannelHandlerUtil.class.getName() + ".ABORT");
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelHandlerUtil.class);
public static <T> void handleInboundBufferUpdated(
ChannelHandlerContext ctx, SingleInboundMessageHandler<T> handler) throws Exception {
MessageBuf<Object> in = ctx.inboundMessageBuffer();
if (in.isEmpty() || !handler.beginMessageReceived(ctx)) {
return;
}
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
int oldOutSize = out.size();
try {
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
if (!handler.acceptInboundMessage(msg)) {
out.add(msg);
continue;
}
@SuppressWarnings("unchecked")
T imsg = (T) msg;
try {
handler.messageReceived(ctx, imsg);
} finally {
BufUtil.release(imsg);
}
}
} catch (Signal abort) {
abort.expect(ABORT);
} finally {
if (oldOutSize != out.size()) {
ctx.fireInboundBufferUpdated();
}
handler.endMessageReceived(ctx);
}
}
public static <T> void handleFlush(
ChannelHandlerContext ctx, ChannelPromise promise,
SingleOutboundMessageHandler<T> handler) throws Exception {
handleFlush(ctx, promise, true, handler);
}
public static <T> void handleFlush(
ChannelHandlerContext ctx, ChannelPromise promise, boolean closeOnFailedFlush,
SingleOutboundMessageHandler<T> handler) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer();
MessageBuf<Object> out = null;
final int inSize = in.size();
if (inSize == 0) {
ctx.flush(promise);
return;
}
int processed = 0;
try {
handler.beginFlush(ctx);
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
if (!handler.acceptOutboundMessage(msg)) {
if (out == null) {
out = ctx.nextOutboundMessageBuffer();
}
out.add(msg);
processed ++;
continue;
}
@SuppressWarnings("unchecked")
T imsg = (T) msg;
try {
handler.flush(ctx, imsg);
processed ++;
} finally {
BufUtil.release(imsg);
}
}
} catch (Throwable t) {
PartialFlushException pfe;
String msg = processed + " out of " + inSize + " message(s) flushed";
if (t instanceof Signal) {
Signal abort = (Signal) t;
abort.expect(ABORT);
pfe = new PartialFlushException("aborted: " + msg);
} else {
pfe = new PartialFlushException(msg, t);
}
fail(ctx, promise, closeOnFailedFlush, pfe);
}
try {
handler.endFlush(ctx);
} catch (Throwable t) {
if (promise.isDone()) {
logger.warn("endFlush() raised a masked exception due to failed flush().", t);
} else {
fail(ctx, promise, closeOnFailedFlush, t);
}
}
if (!promise.isDone()) {
ctx.flush(promise);
}
}
private static void fail(
ChannelHandlerContext ctx, ChannelPromise promise, boolean closeOnFailedFlush, Throwable cause) {
promise.setFailure(cause);
if (closeOnFailedFlush) {
ctx.close();
}
}
private ChannelHandlerUtil() { }
public interface SingleInboundMessageHandler<T> {
/**
* Returns {@code true} if and only if the specified message can be handled by this handler.
*
* @param msg the message
*/
boolean acceptInboundMessage(Object msg) throws Exception;
/**
* Will get notified once {@link #inboundBufferUpdated(ChannelHandlerContext)} was called.
*
* If this method returns {@code false} no further processing of the {@link MessageBuf}
* will be done until the next call of {@link #inboundBufferUpdated(ChannelHandlerContext)}.
*
* This will return {@code true} by default, and may get overriden by sub-classes for
* special handling.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception;
/**
* Is called once a message was received.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @param msg the message to handle
*/
void messageReceived(ChannelHandlerContext ctx, T msg) throws Exception;
/**
* Is called when {@link #messageReceived(ChannelHandlerContext, Object)} returns.
*
* Super-classes may-override this for special handling.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
void endMessageReceived(ChannelHandlerContext ctx) throws Exception;
}
public interface SingleOutboundMessageHandler<T> {
/**
* Returns {@code true} if and only if the specified message can be handled by this handler.
*
* @param msg the message
*/
boolean acceptOutboundMessage(Object msg) throws Exception;
/**
* Will get notified once {@link #flush(ChannelHandlerContext, ChannelPromise)} was called.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
void beginFlush(ChannelHandlerContext ctx) throws Exception;
/**
* Is called once a message is being flushed.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @param msg the message to handle
*/
void flush(ChannelHandlerContext ctx, T msg) throws Exception;
/**
* Is called when {@link #flush(ChannelHandlerContext, ChannelPromise)} returns.
*
* Super-classes may-override this for special handling.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
void endFlush(ChannelHandlerContext ctx) throws Exception;
}
}

View File

@ -15,9 +15,9 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerUtil.SingleInboundMessageHandler;
import io.netty.util.Signal; import io.netty.util.Signal;
import io.netty.util.internal.TypeParameterMatcher; import io.netty.util.internal.TypeParameterMatcher;
@ -44,12 +44,13 @@ import io.netty.util.internal.TypeParameterMatcher;
* @param <I> The type of the messages to handle * @param <I> The type of the messages to handle
*/ */
public abstract class ChannelInboundMessageHandlerAdapter<I> public abstract class ChannelInboundMessageHandlerAdapter<I>
extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<I> { extends ChannelStateHandlerAdapter
implements ChannelInboundMessageHandler<I>, SingleInboundMessageHandler<I> {
/** /**
* Thrown by {@link #messageReceived(ChannelHandlerContext, Object)} to abort message processing. * Thrown by {@link #messageReceived(ChannelHandlerContext, Object)} to abort message processing.
*/ */
protected static final Signal ABORT = new Signal(ChannelInboundMessageHandlerAdapter.class.getName() + ".ABORT"); protected static final Signal ABORT = ChannelHandlerUtil.ABORT;
private final TypeParameterMatcher msgMatcher; private final TypeParameterMatcher msgMatcher;
@ -69,86 +70,21 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
@Override @Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
MessageBuf<Object> in = ctx.inboundMessageBuffer(); ChannelHandlerUtil.handleInboundBufferUpdated(ctx, this);
if (in.isEmpty() || !beginMessageReceived(ctx)) {
return;
}
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
int oldOutSize = out.size();
try {
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
if (!acceptInboundMessage(msg)) {
out.add(msg);
continue;
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
try {
messageReceived(ctx, imsg);
} finally {
BufUtil.release(imsg);
}
}
} catch (Signal abort) {
abort.expect(ABORT);
} finally {
if (oldOutSize != out.size()) {
ctx.fireInboundBufferUpdated();
}
endMessageReceived(ctx);
}
} }
/** @Override
* Returns {@code true} if and only if the specified message can be handled by this handler. public boolean acceptInboundMessage(Object msg) throws Exception {
*
* @param msg the message
*/
protected boolean acceptInboundMessage(Object msg) throws Exception {
return msgMatcher.match(msg); return msgMatcher.match(msg);
} }
/** @Override
* Will get notified once {@link #inboundBufferUpdated(ChannelHandlerContext)} was called. public boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception {
*
* If this method returns {@code false} no further processing of the {@link MessageBuf}
* will be done until the next call of {@link #inboundBufferUpdated(ChannelHandlerContext)}.
*
* This will return {@code true} by default, and may get overriden by sub-classes for
* special handling.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
protected boolean beginMessageReceived(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception {
return true; return true;
} }
/** @Override
* Is called once a message was received. public void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @param msg the message to handle
*/
protected abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;
/**
* Is called when {@link #messageReceived(ChannelHandlerContext, Object)} returns.
*
* Super-classes may-override this for special handling.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
protected void endMessageReceived(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception {
// NOOP // NOOP
} }
} }

View File

@ -15,12 +15,11 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.BufUtil;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerUtil.SingleOutboundMessageHandler;
import io.netty.util.Signal; import io.netty.util.Signal;
import io.netty.util.internal.TypeParameterMatcher; import io.netty.util.internal.TypeParameterMatcher;
import io.netty.util.internal.logging.InternalLoggerFactory;
/** /**
* Abstract base class which handles messages of a specific type. * Abstract base class which handles messages of a specific type.
@ -28,12 +27,13 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
* @param <I> The type of the messages to handle * @param <I> The type of the messages to handle
*/ */
public abstract class ChannelOutboundMessageHandlerAdapter<I> public abstract class ChannelOutboundMessageHandlerAdapter<I>
extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> { extends ChannelOperationHandlerAdapter
implements ChannelOutboundMessageHandler<I>, SingleOutboundMessageHandler<I> {
/** /**
* Thrown by {@link #flush(ChannelHandlerContext, Object)} to abort message processing. * Thrown by {@link #flush(ChannelHandlerContext, Object)} to abort message processing.
*/ */
protected static final Signal ABORT = new Signal(ChannelOutboundMessageHandlerAdapter.class.getName() + ".ABORT"); protected static final Signal ABORT = ChannelHandlerUtil.ABORT;
private final TypeParameterMatcher msgMatcher; private final TypeParameterMatcher msgMatcher;
private boolean closeOnFailedFlush = true; private boolean closeOnFailedFlush = true;
@ -60,110 +60,19 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
ctx.outboundMessageBuffer().release(); ctx.outboundMessageBuffer().release();
} }
/** @Override
* Returns {@code true} if and only if the specified message can be handled by this handler. public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
* ChannelHandlerUtil.handleFlush(ctx, promise, isCloseOnFailedFlush(), this);
* @param msg the message }
*/
protected boolean acceptOutboundMessage(Object msg) throws Exception { @Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return msgMatcher.match(msg); return msgMatcher.match(msg);
} }
@Override @Override
public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void beginFlush(ChannelHandlerContext ctx) throws Exception { }
MessageBuf<Object> in = ctx.outboundMessageBuffer();
MessageBuf<Object> out = null;
final int inSize = in.size(); @Override
if (inSize == 0) { public void endFlush(ChannelHandlerContext ctx) throws Exception { }
ctx.flush(promise);
return;
}
int processed = 0;
try {
beginFlush(ctx);
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
if (!acceptOutboundMessage(msg)) {
if (out == null) {
out = ctx.nextOutboundMessageBuffer();
}
out.add(msg);
processed ++;
continue;
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
try {
flush(ctx, imsg);
processed ++;
} finally {
BufUtil.release(imsg);
}
}
} catch (Throwable t) {
PartialFlushException pfe;
String msg = processed + " out of " + inSize + " message(s) flushed";
if (t instanceof Signal) {
Signal abort = (Signal) t;
abort.expect(ABORT);
pfe = new PartialFlushException("aborted by " + getClass().getSimpleName() + ": " + msg);
} else {
pfe = new PartialFlushException(msg, t);
}
fail(ctx, promise, pfe);
}
try {
endFlush(ctx);
} catch (Throwable t) {
if (promise.isDone()) {
InternalLoggerFactory.getInstance(getClass()).warn(
"endFlush() raised a masked exception due to failed flush().", t);
} else {
fail(ctx, promise, t);
}
}
if (!promise.isDone()) {
ctx.flush(promise);
}
}
private void fail(ChannelHandlerContext ctx, ChannelPromise promise, Throwable cause) {
promise.setFailure(cause);
if (isCloseOnFailedFlush()) {
ctx.close();
}
}
/**
* Will get notified once {@link #flush(ChannelHandlerContext, ChannelPromise)} was called.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
protected void beginFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { }
/**
* Is called once a message is being flushed.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @param msg the message to handle
*/
protected abstract void flush(ChannelHandlerContext ctx, I msg) throws Exception;
/**
* Is called when {@link #flush(ChannelHandlerContext, ChannelPromise)} returns.
*
* Super-classes may-override this for special handling.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*/
protected void endFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { }
} }

View File

@ -39,12 +39,12 @@ public class DefaultChannelPipelineTest {
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
pipeline.addLast(new ChannelInboundMessageHandlerAdapter<Object>() { pipeline.addLast(new ChannelInboundMessageHandlerAdapter<Object>() {
@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
forwarded.set(ctx.nextInboundMessageBuffer().add(msg)); forwarded.set(ctx.nextInboundMessageBuffer().add(msg));
} }
@Override @Override
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception { public void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} }
}); });
@ -178,13 +178,13 @@ public class DefaultChannelPipelineTest {
boolean called; boolean called;
@Override @Override
protected boolean acceptInboundMessage(Object msg) throws Exception { public boolean acceptInboundMessage(Object msg) throws Exception {
called = true; called = true;
return super.acceptInboundMessage(msg); return super.acceptInboundMessage(msg);
} }
@Override @Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
fail(); fail();
} }
} }