NIO read spin event loop spin when half closed (#7801)

Motivation:
AbstractNioByteChannel will detect that the remote end of the socket has
been closed and propagate a user event through the pipeline. However if
the user has auto read on, or calls read again, we may propagate the
same user events again. If the underlying transport continuously
notifies us that there is read activity this will happen in a spin loop
which consumes unnecessary CPU.

Modifications:
- AbstractNioByteChannel's unsafe read() should check if the input side
of the socket has been shutdown before processing the event. This is
consistent with EPOLL and KQUEUE transports.
- add unit test with @normanmaurer's help, and make transports consistent with respect to user events

Result:
No more read spin loop in NIO when the channel is half closed.
This commit is contained in:
Scott Mitchell 2018-03-28 11:02:57 -07:00 committed by Norman Maurer
parent b309271e49
commit ed0668384b
5 changed files with 97 additions and 6 deletions

View File

@ -40,10 +40,82 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class SocketHalfClosedTest extends AbstractSocketTest {
@Test(timeout = 10000)
public void testHalfClosureOnlyOneEventWhenAutoRead() throws Throwable {
run();
}
public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
try {
cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
.option(ChannelOption.AUTO_READ, true);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
((DuplexChannel) ctx).shutdownOutput();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});
final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
if (evt == ChannelInputShutdownEvent.INSTANCE) {
shutdownEventReceivedCounter.incrementAndGet();
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
shutdownReadCompleteEventReceivedCounter.incrementAndGet();
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close();
}
}, 100, MILLISECONDS);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});
serverChannel = sb.bind().sync().channel();
Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
clientChannel.closeFuture().await();
assertEquals(1, shutdownEventReceivedCounter.get());
assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
} finally {
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}
@Test
public void testAllDataReadAfterHalfClosure() throws Throwable {
run();

View File

@ -33,6 +33,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannel;
@ -241,8 +242,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
}
private static boolean isAllowHalfClosure(ChannelConfig config) {
return config instanceof EpollSocketChannelConfig &&
((EpollSocketChannelConfig) config).isAllowHalfClosure();
return config instanceof SocketChannelConfig &&
((SocketChannelConfig) config).isAllowHalfClosure();
}
final void clearEpollIn() {

View File

@ -33,6 +33,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil;
@ -322,8 +323,8 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
}
private static boolean isAllowHalfClosure(ChannelConfig config) {
return config instanceof KQueueSocketChannelConfig &&
((KQueueSocketChannelConfig) config).isAllowHalfClosure();
return config instanceof SocketChannelConfig &&
((SocketChannelConfig) config).isAllowHalfClosure();
}
final void clearReadFilter() {

View File

@ -21,7 +21,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
@ -29,6 +28,7 @@ import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.internal.ChannelUtils;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
@ -54,6 +54,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
((AbstractNioUnsafe) unsafe()).flush0();
}
};
private boolean inputClosedSeenErrorOnRead;
/**
* Create a new instance
@ -84,17 +85,27 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
return METADATA;
}
final boolean shouldBreakReadReady(ChannelConfig config) {
return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
}
private static boolean isAllowHalfClosure(ChannelConfig config) {
return config instanceof SocketChannelConfig &&
((SocketChannelConfig) config).isAllowHalfClosure();
}
protected class NioByteUnsafe extends AbstractNioUnsafe {
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
if (isAllowHalfClosure(config())) {
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else {
inputClosedSeenErrorOnRead = true;
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
@ -120,6 +131,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

View File

@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
@ -73,6 +74,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
} else {
unsafe().close(unsafe().voidPromise());
}
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}