diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConditionalWritabilityTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConditionalWritabilityTest.java new file mode 100644 index 0000000000..d5265a1a4c --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConditionalWritabilityTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; + +public class SocketConditionalWritabilityTest extends AbstractSocketTest { + @Test(timeout = 30000) + public void testConditionalWritability() throws Throwable { + run(); + } + + public void testConditionalWritability(ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + final int expectedBytes = 100 * 1024 * 1024; + final int maxWriteChunkSize = 16 * 1024; + final CountDownLatch latch = new CountDownLatch(1); + sb.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 16 * 1024)); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new ChannelDuplexHandler() { + private int bytesWritten; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); + writeRemainingBytes(ctx); + } + + @Override + public void flush(ChannelHandlerContext ctx) { + if (ctx.channel().isWritable()) { + writeRemainingBytes(ctx); + } else { + ctx.flush(); + } + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + if (ctx.channel().isWritable()) { + writeRemainingBytes(ctx); + } + ctx.fireChannelWritabilityChanged(); + } + + private void writeRemainingBytes(ChannelHandlerContext ctx) { + while (ctx.channel().isWritable() && bytesWritten < expectedBytes) { + int chunkSize = Math.min(expectedBytes - bytesWritten, maxWriteChunkSize); + bytesWritten += chunkSize; + ctx.write(ctx.alloc().buffer(chunkSize).writeZero(chunkSize)); + } + ctx.flush(); + } + }); + } + }); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + private int totalRead; + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.writeAndFlush(ctx.alloc().buffer(1).writeByte(0)); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + totalRead += ((ByteBuf) msg).readableBytes(); + if (totalRead == expectedBytes) { + latch.countDown(); + } + } + ReferenceCountUtil.release(msg); + } + }); + } + }); + clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); + latch.await(); + } finally { + if (serverChannel != null) { + serverChannel.close(); + } + if (clientChannel != null) { + clientChannel.close(); + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 758052db70..8d3dc5bbcb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -507,14 +507,13 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } @Override - protected void flush0() { + protected final void flush0() { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. - if (isFlagSet(Native.EPOLLOUT)) { - return; + if (!isFlagSet(Native.EPOLLOUT)) { + super.flush0(); } - super.flush0(); } /** diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index e077b2903b..0c0710f8fb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -445,6 +445,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } while (writeSpinCount > 0); if (writeSpinCount == 0) { + // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use + // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still + // writable (as far as we know). We will find out next time we attempt to write if the socket is writable + // and set the EPOLLOUT if necessary. + clearFlag(Native.EPOLLOUT); + // We used our writeSpin quantum, and should try to write again later. eventLoop().execute(flushTask); } else { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java new file mode 100644 index 0000000000..773e3d63c8 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketConditionalWritabilityTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketConditionalWritabilityTest; + +import java.util.List; + +public class EpollETSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java new file mode 100644 index 0000000000..31cb81964d --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketConditionalWritabilityTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketConditionalWritabilityTest; + +import java.util.List; + +public class EpollLTSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index df0cd6e7fa..fbdbc4b71a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -498,6 +498,16 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan return allocHandle; } + @Override + protected final void flush0() { + // Flush immediately only when there's no pending flush. + // If there's a pending flush operation, event loop will call forceFlush() later, + // and thus there's no need to call it now. + if (!writeFilterEnabled) { + super.flush0(); + } + } + final void executeReadReadyRunnable(ChannelConfig config) { if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) { return; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index 50d2cf9725..ef5f48e820 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -283,6 +283,12 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel } while (writeSpinCount > 0); if (writeSpinCount == 0) { + // It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and + // then use our write quantum. In this case we no longer want to set the write filter because the socket is + // still writable (as far as we know). We will find out next time we attempt to write if the socket is + // writable and set the write filter if necessary. + writeFilter(false); + // We used our writeSpin quantum, and should try to write again later. eventLoop().execute(flushTask); } else { diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueETSocketConditionalWritabilityTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueETSocketConditionalWritabilityTest.java new file mode 100644 index 0000000000..1c3da7485e --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueETSocketConditionalWritabilityTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2018 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketConditionalWritabilityTest; + +import java.util.List; + +public class KQueueETSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest { + @Override + protected List> newFactories() { + return KQueueSocketTestPermutation.INSTANCE.socket(); + } +} diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 8570b3d936..5987dff2d7 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -46,7 +46,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + StringUtil.simpleClassName(FileRegion.class) + ')'; - private Runnable flushTask; + private final Runnable flushTask = new Runnable() { + @Override + public void run() { + flush(); + } + }; /** * Create a new instance @@ -266,16 +271,13 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (setOpWrite) { setOpWrite(); } else { + // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then + // use our write quantum. In this case we no longer want to set the write OP because the socket is still + // writable (as far as we know). We will find out next time we attempt to write if the socket is writable + // and set the write OP if necessary. + clearOpWrite(); + // Schedule flush again later so other tasks can be picked up in the meantime - Runnable flushTask = this.flushTask; - if (flushTask == null) { - flushTask = this.flushTask = new Runnable() { - @Override - public void run() { - flush(); - } - }; - } eventLoop().execute(flushTask); } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index f9ea574347..01467b9836 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -356,10 +356,9 @@ public abstract class AbstractNioChannel extends AbstractChannel { // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. - if (isFlushPending()) { - return; + if (!isFlushPending()) { + super.flush0(); } - super.flush0(); } @Override