FlushConsolidationHandler may suppress flushes by mistake (#9931)
Motivation: When `consolidatedWhenNoReadInProgress` is true, `channel.writeAndFlush (data) .addListener (f-> channel.writeAndFlush (data2))` Will cause data2 to never be flushed. Because the flush operation will synchronously execute the `channel.writeAndFlush (data2))` in the `listener`, and at this time, since the current execution thread is still an `eventloop`(`executor.inEventLoop()` was true), all handlers will be executed synchronously. At this time, since `nextScheduledFlush` is still not null, the `flush` operation of `data2` will be ignored in `FlushConsolidationHandler#scheduleFlush`. Modification: - reset `nextScheduledFlush` before `ctx.flush` - use `ObjectUtil` to polish code Result: Fixes https://github.com/netty/netty/issues/9923
This commit is contained in:
parent
06a5173e8d
commit
2be3d888e7
@ -23,6 +23,7 @@ import io.netty.channel.ChannelOutboundHandler;
|
|||||||
import io.netty.channel.ChannelOutboundInvoker;
|
import io.netty.channel.ChannelOutboundInvoker;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import io.netty.util.internal.ObjectUtil;
|
||||||
|
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
@ -95,20 +96,17 @@ public class FlushConsolidationHandler extends ChannelDuplexHandler {
|
|||||||
* ongoing.
|
* ongoing.
|
||||||
*/
|
*/
|
||||||
public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
|
public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
|
||||||
if (explicitFlushAfterFlushes <= 0) {
|
this.explicitFlushAfterFlushes =
|
||||||
throw new IllegalArgumentException("explicitFlushAfterFlushes: "
|
ObjectUtil.checkPositive(explicitFlushAfterFlushes, "explicitFlushAfterFlushes");
|
||||||
+ explicitFlushAfterFlushes + " (expected: > 0)");
|
|
||||||
}
|
|
||||||
this.explicitFlushAfterFlushes = explicitFlushAfterFlushes;
|
|
||||||
this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
|
this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
|
||||||
flushTask = consolidateWhenNoReadInProgress ?
|
this.flushTask = consolidateWhenNoReadInProgress ?
|
||||||
new Runnable() {
|
new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (flushPendingCount > 0 && !readInProgress) {
|
if (flushPendingCount > 0 && !readInProgress) {
|
||||||
flushPendingCount = 0;
|
flushPendingCount = 0;
|
||||||
ctx.flush();
|
|
||||||
nextScheduledFlush = null;
|
nextScheduledFlush = null;
|
||||||
|
ctx.flush();
|
||||||
} // else we'll flush when the read completes
|
} // else we'll flush when the read completes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,8 @@ import io.netty.channel.ChannelHandlerContext;
|
|||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
|
import io.netty.util.concurrent.Future;
|
||||||
|
import io.netty.util.concurrent.GenericFutureListener;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -152,6 +154,26 @@ public class FlushConsolidationHandlerTest {
|
|||||||
assertFalse(channel.finish());
|
assertFalse(channel.finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See https://github.com/netty/netty/issues/9923
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testResend() throws Exception {
|
||||||
|
final AtomicInteger flushCount = new AtomicInteger();
|
||||||
|
final EmbeddedChannel channel = newChannel(flushCount, true);
|
||||||
|
channel.writeAndFlush(1L).addListener(new GenericFutureListener<Future<? super Void>>() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(Future<? super Void> future) throws Exception {
|
||||||
|
channel.writeAndFlush(1L);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
channel.flushOutbound();
|
||||||
|
assertEquals(1L, channel.readOutbound());
|
||||||
|
assertEquals(1L, channel.readOutbound());
|
||||||
|
assertNull(channel.readOutbound());
|
||||||
|
assertFalse(channel.finish());
|
||||||
|
}
|
||||||
|
|
||||||
private static EmbeddedChannel newChannel(final AtomicInteger flushCount, boolean consolidateWhenNoReadInProgress) {
|
private static EmbeddedChannel newChannel(final AtomicInteger flushCount, boolean consolidateWhenNoReadInProgress) {
|
||||||
return new EmbeddedChannel(
|
return new EmbeddedChannel(
|
||||||
new ChannelOutboundHandlerAdapter() {
|
new ChannelOutboundHandlerAdapter() {
|
||||||
|
Loading…
Reference in New Issue
Block a user