Fix a race condition where flush() can be triggered before write()
.. when a handler overrides write() but not flush().
This commit is contained in:
parent
94d6e44bba
commit
3e8a1ed611
@ -157,10 +157,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
"write", ChannelHandlerContext.class,
|
"write", ChannelHandlerContext.class,
|
||||||
Object.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
|
Object.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
|
||||||
flags |= MASK_WRITE;
|
flags |= MASK_WRITE;
|
||||||
}
|
|
||||||
if (handlerType.getMethod(
|
// flush() is skipped only when write() is also skipped to avoid the situation where
|
||||||
"flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
|
// flush() is handled by the event loop before write() in staged execution.
|
||||||
flags |= MASK_FLUSH;
|
if (handlerType.getMethod(
|
||||||
|
"flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
|
||||||
|
flags |= MASK_FLUSH;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Should never reach here.
|
// Should never reach here.
|
||||||
|
@ -87,7 +87,7 @@ public class LocalTransportThreadModelTest {
|
|||||||
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2"));
|
EventLoopGroup e2 = new DefaultEventLoopGroup(4, new DefaultThreadFactory("e2"));
|
||||||
ThreadNameAuditor h1 = new ThreadNameAuditor();
|
ThreadNameAuditor h1 = new ThreadNameAuditor();
|
||||||
ThreadNameAuditor h2 = new ThreadNameAuditor();
|
ThreadNameAuditor h2 = new ThreadNameAuditor();
|
||||||
ThreadNameAuditor h3 = new ThreadNameAuditor();
|
ThreadNameAuditor h3 = new ThreadNameAuditor(true);
|
||||||
|
|
||||||
Channel ch = new LocalChannel(l.next());
|
Channel ch = new LocalChannel(l.next());
|
||||||
// With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'.
|
// With no EventExecutor specified, h1 will be always invoked by EventLoop 'l'.
|
||||||
@ -362,6 +362,15 @@ public class LocalTransportThreadModelTest {
|
|||||||
private final Queue<String> inboundThreadNames = new ConcurrentLinkedQueue<String>();
|
private final Queue<String> inboundThreadNames = new ConcurrentLinkedQueue<String>();
|
||||||
private final Queue<String> outboundThreadNames = new ConcurrentLinkedQueue<String>();
|
private final Queue<String> outboundThreadNames = new ConcurrentLinkedQueue<String>();
|
||||||
private final Queue<String> removalThreadNames = new ConcurrentLinkedQueue<String>();
|
private final Queue<String> removalThreadNames = new ConcurrentLinkedQueue<String>();
|
||||||
|
private final boolean discard;
|
||||||
|
|
||||||
|
ThreadNameAuditor() {
|
||||||
|
this(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
ThreadNameAuditor(boolean discard) {
|
||||||
|
this.discard = discard;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||||
@ -371,7 +380,9 @@ public class LocalTransportThreadModelTest {
|
|||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
inboundThreadNames.add(Thread.currentThread().getName());
|
inboundThreadNames.add(Thread.currentThread().getName());
|
||||||
ctx.fireChannelRead(msg);
|
if (!discard) {
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user