Epoll and Kqueue shouldn't read by default (#8024)

Motivation:
Epoll and Kqueue channels have internal state which forces
a single read operation after channel construction. This
violates the Channel#read() interface which indicates that
data shouldn't be delivered until this method is called.
The behavior is also inconsistent with the NIO transport.

Modifications:
- Epoll and Kqueue shouldn't unconditionally read upon
initialization, and instead should rely upon Channel#read()
or auto_read.

Result:
Epoll and Kqueue are more consistent with NIO.
This commit is contained in:
Scott Mitchell 2018-06-15 01:28:50 -07:00 committed by Norman Maurer
parent c7c8e6a3ec
commit 12f6500a4f
11 changed files with 378 additions and 22 deletions

View File

@ -0,0 +1,184 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.channel.ChannelOption.AUTO_READ;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class SocketDataReadInitialStateTest extends AbstractSocketTest {
@Test(timeout = 10000)
public void testAutoReadOffNoDataReadUntilReadCalled() throws Throwable {
run();
}
public void testAutoReadOffNoDataReadUntilReadCalled(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
final int sleepMs = 100;
try {
sb.option(AUTO_READ, false);
sb.childOption(AUTO_READ, false);
cb.option(AUTO_READ, false);
final CountDownLatch serverReadyLatch = new CountDownLatch(1);
final CountDownLatch acceptorReadLatch = new CountDownLatch(1);
final CountDownLatch serverReadLatch = new CountDownLatch(1);
final CountDownLatch clientReadLatch = new CountDownLatch(1);
final AtomicReference<Channel> serverConnectedChannelRef = new AtomicReference<Channel>();
sb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
acceptorReadLatch.countDown();
ctx.fireChannelRead(msg);
}
});
}
});
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
serverConnectedChannelRef.set(ch);
ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
ctx.writeAndFlush(msg.retainedDuplicate());
serverReadLatch.countDown();
}
});
serverReadyLatch.countDown();
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
clientReadLatch.countDown();
}
});
}
});
serverChannel = sb.bind().sync().channel();
clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
clientChannel.writeAndFlush(clientChannel.alloc().buffer().writeZero(1)).syncUninterruptibly();
// The acceptor shouldn't read any data until we call read() below, but give it some time to see if it will.
Thread.sleep(sleepMs);
assertEquals(1, acceptorReadLatch.getCount());
serverChannel.read();
serverReadyLatch.await();
Channel serverConnectedChannel = serverConnectedChannelRef.get();
assertNotNull(serverConnectedChannel);
// Allow some amount of time for the server peer to receive the message (which isn't expected to happen
// until we call read() below).
Thread.sleep(sleepMs);
assertEquals(1, serverReadLatch.getCount());
serverConnectedChannel.read();
serverReadLatch.await();
// Allow some amount of time for the client to read the echo.
Thread.sleep(sleepMs);
assertEquals(1, clientReadLatch.getCount());
clientChannel.read();
clientReadLatch.await();
} finally {
if (serverChannel != null) {
serverChannel.close().sync();
}
if (clientChannel != null) {
clientChannel.close().sync();
}
}
}
@Test(timeout = 10000)
public void testAutoReadOnDataReadImmediately() throws Throwable {
run();
}
public void testAutoReadOnDataReadImmediately(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
sb.option(AUTO_READ, true);
sb.childOption(AUTO_READ, true);
cb.option(AUTO_READ, true);
final CountDownLatch serverReadLatch = new CountDownLatch(1);
final CountDownLatch clientReadLatch = new CountDownLatch(1);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
ctx.writeAndFlush(msg.retainedDuplicate());
serverReadLatch.countDown();
}
});
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
clientReadLatch.countDown();
}
});
}
});
serverChannel = sb.bind().sync().channel();
clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
clientChannel.writeAndFlush(clientChannel.alloc().buffer().writeZero(1)).syncUninterruptibly();
serverReadLatch.await();
clientReadLatch.await();
} finally {
if (serverChannel != null) {
serverChannel.close().sync();
}
if (clientChannel != null) {
clientChannel.close().sync();
}
}
}
}

View File

@ -60,7 +60,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractEpollChannel.class, "doClose()");
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final int readFlag;
final LinuxSocket socket;
/**
* The future of the current connection attempt. If not null, subsequent
@ -79,15 +78,13 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected volatile boolean active;
AbstractEpollChannel(LinuxSocket fd, int flag) {
this(null, fd, flag, false);
AbstractEpollChannel(LinuxSocket fd) {
this(null, fd, false);
}
AbstractEpollChannel(Channel parent, LinuxSocket fd, int flag, boolean active) {
AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active) {
super(parent);
socket = checkNotNull(fd, "fd");
readFlag = flag;
flags |= flag;
this.active = active;
if (active) {
// Directly cache the remote and local addresses
@ -97,11 +94,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
}
}
AbstractEpollChannel(Channel parent, LinuxSocket fd, int flag, SocketAddress remote) {
AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
super(parent);
socket = checkNotNull(fd, "fd");
readFlag = flag;
flags |= flag;
active = true;
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
@ -228,7 +223,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// We must set the read flag here as it is possible the user didn't read in the last read loop, the
// executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
// never get data after this.
setFlag(readFlag);
setFlag(Native.EPOLLIN);
// If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
// again if we didn't consume all the data. So we force a read operation here if there maybe more data.
@ -268,7 +263,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
flags &= ~readFlag;
flags &= ~Native.EPOLLIN;
}
}
@ -535,7 +530,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
assert eventLoop().inEventLoop();
try {
readPending = false;
clearFlag(readFlag);
clearFlag(Native.EPOLLIN);
} catch (IOException e) {
// When this happens there is something completely wrong with either the filedescriptor or epoll,
// so fire the exception through the pipeline and close the Channel.

View File

@ -39,7 +39,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
}
AbstractEpollServerChannel(LinuxSocket fd, boolean active) {
super(null, fd, Native.EPOLLIN, active);
super(null, fd, active);
}
@Override

View File

@ -99,19 +99,19 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
super(parent, fd, Native.EPOLLIN, true);
super(parent, fd, true);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
}
AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
super(parent, fd, Native.EPOLLIN, remote);
super(parent, fd, remote);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
}
protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
super(null, fd, Native.EPOLLIN, active);
super(null, fd, active);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
}

View File

@ -59,7 +59,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
private volatile boolean connected;
public EpollDatagramChannel() {
super(newSocketDgram(), Native.EPOLLIN);
super(newSocketDgram());
config = new EpollDatagramChannelConfig(this);
}
@ -68,7 +68,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
EpollDatagramChannel(LinuxSocket fd) {
super(null, fd, Native.EPOLLIN, true);
super(null, fd, true);
config = new EpollDatagramChannelConfig(this);
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketDataReadInitialStateTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketDataReadInitialStateTest;
import java.util.List;
public class EpollETSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketDataReadInitialStateTest;
import java.util.List;
public class EpollLTSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}

View File

@ -65,7 +65,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
private SocketAddress requestedRemoteAddress;
final BsdSocket socket;
private boolean readFilterEnabled = true;
private boolean readFilterEnabled;
private boolean writeFilterEnabled;
boolean readReadyRunnablePending;
boolean inputClosedSeenErrorOnRead;
@ -187,9 +187,6 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
((KQueueEventLoop) eventLoop()).remove(this);
// Set the filters back to the initial state in case this channel is registered with another event loop.
readFilterEnabled = true;
}
@Override

View File

@ -0,0 +1,36 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketDataReadInitialStateTest;
import java.net.SocketAddress;
import java.util.List;
public class KQueueDomainSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
@Override
protected SocketAddress newSocketAddress() {
return KQueueSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketDataReadInitialStateTest;
import java.util.List;
public class KQueueETSocketDataReadInitialStateTest extends SocketDataReadInitialStateTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.socket();
}
}