diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index d3cd13c098..cf1333fd82 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -85,32 +85,30 @@ public abstract class ChannelInboundMessageHandlerAdapter break; } + if (!acceptInboundMessage(msg)) { + out.add(msg); + unsupportedFound = true; + continue; + } + + if (unsupportedFound) { + // the last message were unsupported, but now we received one that is supported. + // So reset the flag and notify the next context + unsupportedFound = false; + ctx.fireInboundBufferUpdated(); + oldOutSize = out.size(); + } + + @SuppressWarnings("unchecked") + I imsg = (I) msg; try { - if (!acceptInboundMessage(msg)) { - out.add(msg); - unsupportedFound = true; - continue; - } - - if (unsupportedFound) { - // the last message were unsupported, but now we received one that is supported. - // So reset the flag and notify the next context - unsupportedFound = false; - ctx.fireInboundBufferUpdated(); - oldOutSize = out.size(); - } - - @SuppressWarnings("unchecked") - I imsg = (I) msg; - try { - messageReceived(ctx, imsg); - } finally { - freeInboundMessage(imsg); - } - } catch (Throwable t) { - exceptionCaught(ctx, t); + messageReceived(ctx, imsg); + } finally { + freeInboundMessage(imsg); } } + } catch (Throwable t) { + exceptionCaught(ctx, t); } finally { if (oldOutSize != out.size()) { ctx.fireInboundBufferUpdated(); @@ -140,7 +138,8 @@ public abstract class ChannelInboundMessageHandlerAdapter * * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to */ - protected boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception { + protected boolean beginMessageReceived( + @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { return true; } @@ -149,19 +148,18 @@ public abstract class ChannelInboundMessageHandlerAdapter * * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param msg the message to handle - * @throws Exception thrown when an error accour */ protected abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception; /** - * Is called after all messages of the {@link MessageBuf} was consumed. + * Is called when {@link #messageReceived(ChannelHandlerContext, Object)} returns. * * Super-classes may-override this for special handling. * * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to - * @throws Exception thrown when an error accour */ - protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception { + protected void endMessageReceived( + @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { // NOOP } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java index ca701134c0..6af7b09654 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java @@ -18,6 +18,7 @@ package io.netty.channel; import io.netty.buffer.BufUtil; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; +import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.TypeParameterMatcher; /** @@ -29,6 +30,7 @@ public abstract class ChannelOutboundMessageHandlerAdapter extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler { private final TypeParameterMatcher msgMatcher; + private boolean closeOnFailedFlush = true; protected ChannelOutboundMessageHandlerAdapter() { this(ChannelOutboundMessageHandlerAdapter.class, 0); @@ -41,6 +43,14 @@ public abstract class ChannelOutboundMessageHandlerAdapter msgMatcher = TypeParameterMatcher.find(this, parameterizedHandlerType, messageTypeParamIndex); } + protected final boolean isCloseOnFailedFlush() { + return closeOnFailedFlush; + } + + protected final void setCloseOnFailedFlush(boolean closeOnFailedFlush) { + this.closeOnFailedFlush = closeOnFailedFlush; + } + @Override public MessageBuf newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { return Unpooled.messageBuffer(); @@ -64,52 +74,87 @@ public abstract class ChannelOutboundMessageHandlerAdapter public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { MessageBuf in = ctx.outboundMessageBuffer(); MessageBuf out = null; - ChannelPromise nextPromise = promise; + + final int inSize = in.size(); + int processed = 0; + try { + beginFlush(ctx); for (;;) { Object msg = in.poll(); if (msg == null) { break; } - try { - if (!acceptOutboundMessage(msg)) { - if (out == null) { - out = ctx.nextOutboundMessageBuffer(); - } - out.add(msg); - continue; + if (!acceptOutboundMessage(msg)) { + if (out == null) { + out = ctx.nextOutboundMessageBuffer(); } + out.add(msg); + processed ++; + continue; + } - @SuppressWarnings("unchecked") - I imsg = (I) msg; - try { - flush(ctx, imsg); - } finally { - freeOutboundMessage(imsg); - } - } catch (Throwable t) { - if (!promise.isDone()) { - promise.setFailure(new PartialFlushException( - "faied to encode all messages associated with the future", t)); - nextPromise = ctx.newPromise(); - } + @SuppressWarnings("unchecked") + I imsg = (I) msg; + try { + flush(ctx, imsg); + processed ++; + } finally { + freeOutboundMessage(imsg); } } - } finally { - ctx.flush(nextPromise); + } catch (Throwable t) { + fail(ctx, promise, new PartialFlushException( + processed + " out of " + inSize + " message(s) flushed; " + in.size() + " left", t)); + } + + try { + endFlush(ctx); + } catch (Throwable t) { + if (promise.isDone()) { + InternalLoggerFactory.getInstance(getClass()).warn( + "endFlush() raised a masked exception due to failed flush().", t); + } else { + fail(ctx, promise, t); + } + return; + } + + ctx.flush(promise); + } + + private void fail(ChannelHandlerContext ctx, ChannelPromise promise, Throwable cause) { + promise.setFailure(cause); + if (isCloseOnFailedFlush()) { + ctx.close(); } } + /** + * Will get notified once {@link #flush(ChannelHandlerContext, ChannelPromise)} was called. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to + */ + protected void beginFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { } + /** * Is called once a message is being flushed. * * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param msg the message to handle - * @throws Exception thrown when an error accour */ protected abstract void flush(ChannelHandlerContext ctx, I msg) throws Exception; + /** + * Is called when {@link #flush(ChannelHandlerContext, ChannelPromise)} returns. + * + * Super-classes may-override this for special handling. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to + */ + protected void endFlush(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx) throws Exception { } + /** * Is called after a message was processed via {@link #flush(ChannelHandlerContext, Object)} to free * up any resources that is held by the outbound message. You may want to override this if your implementation