[#559] Fix SocketSuspendTest.testSuspendAccept()

- Reimplemented the test
- Fixed various bugs related with read/accept suspension found while testing
  - defaultInterestOps of NioServerSocketChannel should be OP_ACCEPT
  - There's no need do deregister and re-register to suspend/resume accept()
  - Occational infinite loop with 100% CPU consumption in OioEventLoop, caused by OioSocketChannel
  - Even if read/accept is suspended, what's read or accepted should be notified to a user
This commit is contained in:
Trustin Lee 2012-08-28 15:55:51 +09:00
parent f3c940d208
commit e55a1f11b5
14 changed files with 273 additions and 247 deletions

View File

@ -0,0 +1,66 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation.Factory;
import io.netty.testsuite.util.TestUtils;
import io.netty.util.NetworkConstants;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
import org.junit.Rule;
import org.junit.rules.TestName;
public abstract class AbstractServerSocketTest {
private static final List<Factory<ServerBootstrap>> COMBO = SocketTestPermutation.serverSocket();
@Rule
public final TestName testName = new TestName();
protected final InternalLogger logger = InternalLoggerFactory.getInstance(getClass());
protected volatile ServerBootstrap sb;
protected volatile InetSocketAddress addr;
protected void run() throws Throwable {
int i = 0;
for (Factory<ServerBootstrap> e: COMBO) {
sb = e.newInstance();
addr = new InetSocketAddress(
NetworkConstants.LOCALHOST, TestUtils.getFreePort());
sb.localAddress(addr);
logger.info(String.format(
"Running: %s %d of %d", testName.getMethodName(), ++ i, COMBO.size()));
try {
Method m = getClass().getDeclaredMethod(
testName.getMethodName(), ServerBootstrap.class);
m.invoke(this, sb);
} catch (InvocationTargetException ex) {
throw ex.getCause();
} finally {
sb.shutdown();
}
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.util.NetworkConstants;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.junit.Test;
public class ServerSocketSuspendTest extends AbstractServerSocketTest {
private static final int NUM_CHANNELS = 10;
private static final long TIMEOUT = 3000000000L;
@Test
public void testSuspendAndResumeAccept() throws Throwable {
run();
}
public void testSuspendAndResumeAccept(ServerBootstrap sb) throws Throwable {
AcceptedChannelCounter counter = new AcceptedChannelCounter(NUM_CHANNELS);
sb.option(ChannelOption.SO_BACKLOG, 1);
sb.childHandler(counter);
Channel sc = sb.bind().sync().channel();
sc.pipeline().firstContext().readable(false);
List<Socket> sockets = new ArrayList<Socket>();
try {
long startTime = System.nanoTime();
for (int i = 0; i < NUM_CHANNELS; i ++) {
sockets.add(new Socket(
NetworkConstants.LOCALHOST, ((InetSocketAddress) sc.localAddress()).getPort()));
}
sc.pipeline().firstContext().readable(true);
counter.latch.await();
long endTime = System.nanoTime();
Assert.assertTrue(endTime - startTime > TIMEOUT);
} finally {
for (Socket s: sockets) {
s.close();
}
}
try {
long startTime = System.nanoTime();
for (int i = 0; i < NUM_CHANNELS; i ++) {
sockets.add(new Socket(
NetworkConstants.LOCALHOST, ((InetSocketAddress) sc.localAddress()).getPort()));
}
long endTime = System.nanoTime();
Assert.assertTrue(endTime - startTime < TIMEOUT);
} finally {
for (Socket s: sockets) {
s.close();
}
}
}
@ChannelHandler.Sharable
private final class AcceptedChannelCounter extends ChannelInboundByteHandlerAdapter {
final CountDownLatch latch;
AcceptedChannelCounter(int nChannels) {
latch = new CountDownLatch(nChannels);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// Unused
}
}
}

View File

@ -1,128 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Ignore;
import org.junit.Test;
public class SocketSuspendTest extends AbstractSocketTest {
private static final Random random = new Random();
static final byte[] data = new byte[1048576];
static {
random.nextBytes(data);
}
@Ignore
@Test
public void testSuspendAccept() throws Throwable {
run();
}
public void testSuspendAccept(ServerBootstrap sb, Bootstrap cb) throws Throwable {
ServerHandler handler = new ServerHandler();
GroupHandler sh = new GroupHandler();
GroupHandler ch = new GroupHandler();
sb.handler(handler);
sb.childHandler(sh);
Channel sc = sb.bind().sync().channel();
cb.handler(ch);
cb.connect().sync();
Thread.sleep(1000);
Bootstrap cb2 = currentBootstrap.newInstance();
cb2.handler(ch);
cb2.remoteAddress(addr);
ChannelFuture cf = cb2.connect();
assertFalse(cf.await(2, TimeUnit.SECONDS));
sc.pipeline().context(handler).readable(true);
assertTrue(cf.await(2, TimeUnit.SECONDS));
sh.group.close().awaitUninterruptibly();
ch.group.close().awaitUninterruptibly();
sc.close().sync();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
private static class ServerHandler extends ChannelInboundMessageHandlerAdapter<SocketChannel> {
@Override
public void messageReceived(ChannelHandlerContext ctx, SocketChannel msg) throws Exception {
ctx.nextInboundMessageBuffer().add(msg);
ctx.readable(false);
}
}
@ChannelHandler.Sharable
private static class GroupHandler extends ChannelInboundByteHandlerAdapter {
final ChannelGroup group = new DefaultChannelGroup();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
group.add(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
ctx.close();
}
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
in.clear();
}
}
}

View File

@ -41,34 +41,7 @@ final class SocketTestPermutation {
new ArrayList<Entry<Factory<ServerBootstrap>, Factory<Bootstrap>>>();
// Make the list of ServerBootstrap factories.
List<Factory<ServerBootstrap>> sbfs =
new ArrayList<Factory<ServerBootstrap>>();
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new NioEventLoopGroup(), new NioEventLoopGroup()).
channel(new NioServerSocketChannel());
}
});
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
AioEventLoopGroup parentGroup = new AioEventLoopGroup();
AioEventLoopGroup childGroup = new AioEventLoopGroup();
return new ServerBootstrap().
group(parentGroup, childGroup).
channel(new AioServerSocketChannel(parentGroup, childGroup));
}
});
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new OioEventLoopGroup(), new OioEventLoopGroup()).
channel(new OioServerSocketChannel());
}
});
List<Factory<ServerBootstrap>> sbfs = serverSocket();
// Make the list of Bootstrap factories.
List<Factory<Bootstrap>> cbfs =
@ -170,6 +143,41 @@ final class SocketTestPermutation {
return list;
}
static List<Factory<ServerBootstrap>> serverSocket() {
List<Factory<ServerBootstrap>> list = new ArrayList<Factory<ServerBootstrap>>();
// Make the list of ServerBootstrap factories.
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new NioEventLoopGroup(), new NioEventLoopGroup()).
channel(new NioServerSocketChannel());
}
});
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
AioEventLoopGroup parentGroup = new AioEventLoopGroup();
AioEventLoopGroup childGroup = new AioEventLoopGroup();
return new ServerBootstrap().
group(parentGroup, childGroup).
channel(new AioServerSocketChannel(parentGroup, childGroup));
}
});
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new OioEventLoopGroup(), new OioEventLoopGroup()).
channel(new OioServerSocketChannel());
}
});
return list;
}
private SocketTestPermutation() {}
interface Factory<T> {

View File

@ -103,7 +103,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
AsynchronousServerSocketChannel ch = javaChannel();
ch.bind(localAddress);
ch.bind(localAddress, config.getBacklog());
doAccept();
}
@ -154,9 +154,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
// create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, channel.childGroup, ch));
if (!channel.readSuspended.get()) {
channel.pipeline().fireInboundBufferUpdated();
}
channel.pipeline().fireInboundBufferUpdated();
}
@Override

View File

@ -334,9 +334,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} catch (Throwable t) {
if (read) {
read = false;
if (!channel.readSuspended.get()) {
pipeline.fireInboundBufferUpdated();
}
pipeline.fireInboundBufferUpdated();
}
if (!(t instanceof ClosedChannelException)) {
@ -351,9 +349,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.readInProgress.set(false);
if (read) {
if (!channel.readSuspended.get()) {
pipeline.fireInboundBufferUpdated();
}
pipeline.fireInboundBufferUpdated();
}
if (closed && channel.isOpen()) {
channel.unsafe().close(channel.unsafe().voidFuture());

View File

@ -31,9 +31,11 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
@Override
protected abstract AbstractNioByteUnsafe newUnsafe();
protected NioByteUnsafe newUnsafe() {
return new NioByteUnsafe();
}
abstract class AbstractNioByteUnsafe extends AbstractNioUnsafe {
final class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();

View File

@ -38,9 +38,26 @@ public abstract class AbstractNioChannel extends AbstractChannel {
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private final SelectableChannel ch;
private final int defaultInterestOps;
private final int readInterestOp;
private volatile SelectionKey selectionKey;
final Runnable suspendReadTask = new Runnable() {
@Override
public void run() {
selectionKey().interestOps(selectionKey().interestOps() & ~readInterestOp);
}
};
final Runnable resumeReadTask = new Runnable() {
@Override
public void run() {
selectionKey().interestOps(selectionKey().interestOps() | readInterestOp);
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
@ -50,10 +67,10 @@ public abstract class AbstractNioChannel extends AbstractChannel {
private ConnectException connectTimeoutException;
protected AbstractNioChannel(
Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
Channel parent, Integer id, SelectableChannel ch, int readInterestOp) {
super(parent, id);
this.ch = ch;
this.defaultInterestOps = defaultInterestOps;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
@ -107,6 +124,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
@Override
public java.nio.channels.Channel ch() {
return javaChannel();
@ -187,6 +205,26 @@ public abstract class AbstractNioChannel extends AbstractChannel {
connectFuture = null;
}
}
@Override
public void suspendRead() {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
suspendReadTask.run();
} else {
loop.execute(suspendReadTask);
}
}
@Override
public void resumeRead() {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
resumeReadTask.run();
} else {
loop.execute(resumeReadTask);
}
}
}
@Override
@ -204,7 +242,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
protected Runnable doRegister() throws Exception {
NioEventLoop loop = (NioEventLoop) eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? defaultInterestOps : 0, this);
loop.selector, isActive()? readInterestOp : 0, this);
return null;
}

View File

@ -25,14 +25,16 @@ import java.nio.channels.SelectableChannel;
abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(
Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
super(parent, id, ch, defaultInterestOps);
Channel parent, Integer id, SelectableChannel ch, int readInterestOp) {
super(parent, id, ch, readInterestOp);
}
@Override
protected abstract AbstractNioMessageUnsafe newUnsafe();
protected NioMessageUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
abstract class AbstractNioMessageUnsafe extends AbstractNioUnsafe {
final class NioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();

View File

@ -457,22 +457,4 @@ public final class NioDatagramChannel
}
return future;
}
@Override
protected AbstractNioMessageUnsafe newUnsafe() {
return new NioDatagramChannelUnsafe();
}
private final class NioDatagramChannelUnsafe extends AbstractNioMessageUnsafe {
@Override
public void suspendRead() {
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
}
@Override
public void resumeRead() {
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
}
}
}

View File

@ -46,7 +46,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
private final ServerSocketChannelConfig config;
public NioServerSocketChannel() {
super(null, null, newSocket(), 0);
super(null, null, newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(javaChannel().socket());
}
@ -82,7 +82,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress);
javaChannel().socket().bind(localAddress, config.getBacklog());
SelectionKey selectionKey = selectionKey();
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
}
@ -128,26 +128,4 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected AbstractNioMessageUnsafe newUnsafe() {
return new NioServerSocketUnsafe();
}
private final class NioServerSocketUnsafe extends AbstractNioMessageUnsafe {
@Override
public void suspendRead() {
selectionKey().cancel();
}
@Override
public void resumeRead() {
try {
doRegister();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

View File

@ -191,22 +191,4 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
return writtenBytes;
}
@Override
protected AbstractNioByteUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
private final class NioSocketChannelUnsafe extends AbstractNioByteUnsafe {
@Override
public void suspendRead() {
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
}
@Override
public void resumeRead() {
selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_READ);
}
}
}

View File

@ -126,7 +126,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
socket.bind(localAddress);
socket.bind(localAddress, config.getBacklog());
}
@Override
@ -154,9 +154,6 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
s = socket.accept();
if (s != null) {
buf.add(new OioSocketChannel(this, null, s));
if (readSuspended) {
return 0;
}
return 1;
}
} catch (SocketTimeoutException e) {

View File

@ -172,13 +172,7 @@ public class OioSocketChannel extends AbstractOioByteChannel
}
try {
int read = buf.writeBytes(is, buf.writableBytes());
if (read > 0 && !readSuspended) {
return read;
} else {
// so the read bytes were 0 or the read was suspend
return 0;
}
return buf.writeBytes(is, buf.writableBytes());
} catch (SocketTimeoutException e) {
return 0;
}