Ensure ChannelInboundMessageHandlerAdapter.endMessageReceived() is always called after handling the inbound message queue. / Call fireInboundBufferUpdated() automatically if the next inbound message buffer was changed.
This commit is contained in:
parent
9319e3ebd0
commit
ac72c3512e
@ -69,30 +69,30 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean unsupportedFound = false;
|
|
||||||
|
|
||||||
try {
|
|
||||||
MessageBuf<I> in = ctx.inboundMessageBuffer();
|
MessageBuf<I> in = ctx.inboundMessageBuffer();
|
||||||
MessageBuf<Object> out = null;
|
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
|
||||||
|
int oldOutSize = out.size();
|
||||||
|
try {
|
||||||
|
boolean unsupportedFound = false;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Object msg = in.poll();
|
Object msg = in.poll();
|
||||||
if (msg == null) {
|
if (msg == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!isSupported(msg)) {
|
if (!isSupported(msg)) {
|
||||||
if (out == null) {
|
|
||||||
out = ctx.nextInboundMessageBuffer();
|
|
||||||
}
|
|
||||||
out.add(msg);
|
out.add(msg);
|
||||||
unsupportedFound = true;
|
unsupportedFound = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unsupportedFound) {
|
if (unsupportedFound) {
|
||||||
// the last message were unsupported, but now we received one that is supported.
|
// the last message were unsupported, but now we received one that is supported.
|
||||||
// So reset the flag and notify the next context
|
// So reset the flag and notify the next context
|
||||||
unsupportedFound = false;
|
unsupportedFound = false;
|
||||||
ctx.fireInboundBufferUpdated();
|
ctx.fireInboundBufferUpdated();
|
||||||
|
oldOutSize = out.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -107,13 +107,13 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (unsupportedFound) {
|
if (oldOutSize != out.size()) {
|
||||||
ctx.fireInboundBufferUpdated();
|
ctx.fireInboundBufferUpdated();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
endMessageReceived(ctx);
|
endMessageReceived(ctx);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns {@code true} if and only if the specified message can be handled by this handler.
|
* Returns {@code true} if and only if the specified message can be handled by this handler.
|
||||||
@ -132,8 +132,9 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
|||||||
*
|
*
|
||||||
* This will return {@code true} by default, and may get overriden by sub-classes for
|
* This will return {@code true} by default, and may get overriden by sub-classes for
|
||||||
* special handling.
|
* special handling.
|
||||||
|
*
|
||||||
|
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unused")
|
|
||||||
protected boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
protected boolean beginMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -155,7 +156,6 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
|
|||||||
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
|
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
|
||||||
* @throws Exception thrown when an error accour
|
* @throws Exception thrown when an error accour
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unused")
|
|
||||||
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
|
||||||
// NOOP
|
// NOOP
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user