Epoll flush/writabilityChange deadlock

Motivation:
b215794de3 recently introduced a change in behavior where writeSpinCount provided a limit for how many write operations were attempted per flush operation. However when the write quantum was meet the selector write flag was not cleared, and the channel unsafe flush0 method has an optimization which prematurely exits if the write flag is set. This may lead to no write progress being made under the following scenario:
- flush is called, but the socket can't accept all data, we set the write flag
- the selector wakes us up because the socket is writable, we write data and use the writeSpinCount quantum
- we then schedule a flush() on the EventLoop to execute later, however it the flush0 optimization prematurely exits because the write flag is still set

In this scenario the socket is still writable so the EventLoop may never notify us that the socket is writable, and therefore we may never attempt to flush data to the OS.

Modifications:
- When the writeSpinCount quantum is exceeded we should clear the selector write flag

Result:
Fixes https://github.com/netty/netty/issues/7729
This commit is contained in:
Scott Mitchell 2018-02-18 11:49:08 -08:00 committed by Norman Maurer
parent 1e5fafe446
commit ce241bd11e
10 changed files with 271 additions and 17 deletions

View File

@ -0,0 +1,124 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class SocketConditionalWritabilityTest extends AbstractSocketTest {
@Test(timeout = 30000)
public void testConditionalWritability() throws Throwable {
run();
}
public void testConditionalWritability(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
final int expectedBytes = 100 * 1024 * 1024;
final int maxWriteChunkSize = 16 * 1024;
final CountDownLatch latch = new CountDownLatch(1);
sb.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 16 * 1024));
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ChannelDuplexHandler() {
private int bytesWritten;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
writeRemainingBytes(ctx);
}
@Override
public void flush(ChannelHandlerContext ctx) {
if (ctx.channel().isWritable()) {
writeRemainingBytes(ctx);
} else {
ctx.flush();
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
if (ctx.channel().isWritable()) {
writeRemainingBytes(ctx);
}
ctx.fireChannelWritabilityChanged();
}
private void writeRemainingBytes(ChannelHandlerContext ctx) {
while (ctx.channel().isWritable() && bytesWritten < expectedBytes) {
int chunkSize = Math.min(expectedBytes - bytesWritten, maxWriteChunkSize);
bytesWritten += chunkSize;
ctx.write(ctx.alloc().buffer(chunkSize).writeZero(chunkSize));
}
ctx.flush();
}
});
}
});
serverChannel = sb.bind().syncUninterruptibly().channel();
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
private int totalRead;
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(ctx.alloc().buffer(1).writeByte(0));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
totalRead += ((ByteBuf) msg).readableBytes();
if (totalRead == expectedBytes) {
latch.countDown();
}
}
ReferenceCountUtil.release(msg);
}
});
}
});
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
latch.await();
} finally {
if (serverChannel != null) {
serverChannel.close();
}
if (clientChannel != null) {
clientChannel.close();
}
}
}
}

View File

@ -507,15 +507,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
}
@Override
protected void flush0() {
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlagSet(Native.EPOLLOUT)) {
return;
}
if (!isFlagSet(Native.EPOLLOUT)) {
super.flush0();
}
}
/**
* Called once a EPOLLOUT event is ready to be processed

View File

@ -445,6 +445,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
// our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the EPOLLOUT if necessary.
clearFlag(Native.EPOLLOUT);
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {

View File

@ -0,0 +1,39 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketConditionalWritabilityTest;
import java.util.List;
public class EpollETSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketConditionalWritabilityTest;
import java.util.List;
public class EpollLTSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}

View File

@ -498,6 +498,16 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
return allocHandle;
}
@Override
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (!writeFilterEnabled) {
super.flush0();
}
}
final void executeReadReadyRunnable(ChannelConfig config) {
if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
return;

View File

@ -283,6 +283,12 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
// then use our write quantum. In this case we no longer want to set the write filter because the socket is
// still writable (as far as we know). We will find out next time we attempt to write if the socket is
// writable and set the write filter if necessary.
writeFilter(false);
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {

View File

@ -0,0 +1,30 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketConditionalWritabilityTest;
import java.util.List;
public class KQueueETSocketConditionalWritabilityTest extends SocketConditionalWritabilityTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.socket();
}
}

View File

@ -46,7 +46,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private Runnable flushTask;
private final Runnable flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
/**
* Create a new instance
@ -266,16 +271,13 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (setOpWrite) {
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
clearOpWrite();
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}

View File

@ -356,11 +356,10 @@ public abstract class AbstractNioChannel extends AbstractChannel {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
if (!isFlushPending()) {
super.flush0();
}
}
@Override
public final void forceFlush() {