From e55a1f11b54333697d0bf72047695bcb914c2333 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 28 Aug 2012 15:55:51 +0900 Subject: [PATCH] [#559] Fix SocketSuspendTest.testSuspendAccept() - Reimplemented the test - Fixed various bugs related with read/accept suspension found while testing - defaultInterestOps of NioServerSocketChannel should be OP_ACCEPT - There's no need do deregister and re-register to suspend/resume accept() - Occational infinite loop with 100% CPU consumption in OioEventLoop, caused by OioSocketChannel - Even if read/accept is suspended, what's read or accepted should be notified to a user --- .../socket/AbstractServerSocketTest.java | 66 +++++++++ .../socket/ServerSocketSuspendTest.java | 111 +++++++++++++++ .../transport/socket/SocketSuspendTest.java | 128 ------------------ .../socket/SocketTestPermutation.java | 64 +++++---- .../socket/aio/AioServerSocketChannel.java | 6 +- .../channel/socket/aio/AioSocketChannel.java | 8 +- .../socket/nio/AbstractNioByteChannel.java | 6 +- .../socket/nio/AbstractNioChannel.java | 46 ++++++- .../socket/nio/AbstractNioMessageChannel.java | 10 +- .../socket/nio/NioDatagramChannel.java | 18 --- .../socket/nio/NioServerSocketChannel.java | 26 +--- .../channel/socket/nio/NioSocketChannel.java | 18 --- .../socket/oio/OioServerSocketChannel.java | 5 +- .../channel/socket/oio/OioSocketChannel.java | 8 +- 14 files changed, 273 insertions(+), 247 deletions(-) create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractServerSocketTest.java create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java delete mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSuspendTest.java diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractServerSocketTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractServerSocketTest.java new file mode 100644 index 0000000000..5e3cc1db1b --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractServerSocketTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import io.netty.testsuite.transport.socket.SocketTestPermutation.Factory; +import io.netty.testsuite.util.TestUtils; +import io.netty.util.NetworkConstants; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.List; + +import org.junit.Rule; +import org.junit.rules.TestName; + +public abstract class AbstractServerSocketTest { + + private static final List> COMBO = SocketTestPermutation.serverSocket(); + + @Rule + public final TestName testName = new TestName(); + + protected final InternalLogger logger = InternalLoggerFactory.getInstance(getClass()); + + protected volatile ServerBootstrap sb; + protected volatile InetSocketAddress addr; + + protected void run() throws Throwable { + int i = 0; + for (Factory e: COMBO) { + sb = e.newInstance(); + addr = new InetSocketAddress( + NetworkConstants.LOCALHOST, TestUtils.getFreePort()); + sb.localAddress(addr); + + logger.info(String.format( + "Running: %s %d of %d", testName.getMethodName(), ++ i, COMBO.size())); + try { + Method m = getClass().getDeclaredMethod( + testName.getMethodName(), ServerBootstrap.class); + m.invoke(this, sb); + } catch (InvocationTargetException ex) { + throw ex.getCause(); + } finally { + sb.shutdown(); + } + } + } +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java new file mode 100644 index 0000000000..60d6427b7e --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/ServerSocketSuspendTest.java @@ -0,0 +1,111 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundByteHandlerAdapter; +import io.netty.channel.ChannelOption; +import io.netty.util.NetworkConstants; + +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import junit.framework.Assert; + +import org.junit.Test; + +public class ServerSocketSuspendTest extends AbstractServerSocketTest { + + private static final int NUM_CHANNELS = 10; + private static final long TIMEOUT = 3000000000L; + + @Test + public void testSuspendAndResumeAccept() throws Throwable { + run(); + } + + public void testSuspendAndResumeAccept(ServerBootstrap sb) throws Throwable { + AcceptedChannelCounter counter = new AcceptedChannelCounter(NUM_CHANNELS); + + sb.option(ChannelOption.SO_BACKLOG, 1); + sb.childHandler(counter); + + Channel sc = sb.bind().sync().channel(); + sc.pipeline().firstContext().readable(false); + + List sockets = new ArrayList(); + + try { + long startTime = System.nanoTime(); + for (int i = 0; i < NUM_CHANNELS; i ++) { + sockets.add(new Socket( + NetworkConstants.LOCALHOST, ((InetSocketAddress) sc.localAddress()).getPort())); + } + + sc.pipeline().firstContext().readable(true); + counter.latch.await(); + + long endTime = System.nanoTime(); + Assert.assertTrue(endTime - startTime > TIMEOUT); + } finally { + for (Socket s: sockets) { + s.close(); + } + } + + try { + long startTime = System.nanoTime(); + for (int i = 0; i < NUM_CHANNELS; i ++) { + sockets.add(new Socket( + NetworkConstants.LOCALHOST, ((InetSocketAddress) sc.localAddress()).getPort())); + } + long endTime = System.nanoTime(); + + Assert.assertTrue(endTime - startTime < TIMEOUT); + } finally { + for (Socket s: sockets) { + s.close(); + } + } + } + + @ChannelHandler.Sharable + private final class AcceptedChannelCounter extends ChannelInboundByteHandlerAdapter { + + final CountDownLatch latch; + + AcceptedChannelCounter(int nChannels) { + latch = new CountDownLatch(nChannels); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + // Unused + } + } +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSuspendTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSuspendTest.java deleted file mode 100644 index e39d9809f6..0000000000 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSuspendTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.testsuite.transport.socket; - -import static org.junit.Assert.*; -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundByteHandlerAdapter; -import io.netty.channel.ChannelInboundMessageHandlerAdapter; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.socket.SocketChannel; - -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Ignore; -import org.junit.Test; - -public class SocketSuspendTest extends AbstractSocketTest { - - private static final Random random = new Random(); - static final byte[] data = new byte[1048576]; - - static { - random.nextBytes(data); - } - - @Ignore - @Test - public void testSuspendAccept() throws Throwable { - run(); - } - - public void testSuspendAccept(ServerBootstrap sb, Bootstrap cb) throws Throwable { - ServerHandler handler = new ServerHandler(); - GroupHandler sh = new GroupHandler(); - GroupHandler ch = new GroupHandler(); - - sb.handler(handler); - sb.childHandler(sh); - Channel sc = sb.bind().sync().channel(); - - cb.handler(ch); - cb.connect().sync(); - Thread.sleep(1000); - - Bootstrap cb2 = currentBootstrap.newInstance(); - cb2.handler(ch); - - cb2.remoteAddress(addr); - - ChannelFuture cf = cb2.connect(); - assertFalse(cf.await(2, TimeUnit.SECONDS)); - sc.pipeline().context(handler).readable(true); - assertTrue(cf.await(2, TimeUnit.SECONDS)); - sh.group.close().awaitUninterruptibly(); - ch.group.close().awaitUninterruptibly(); - sc.close().sync(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); - } - } - private static class ServerHandler extends ChannelInboundMessageHandlerAdapter { - - @Override - public void messageReceived(ChannelHandlerContext ctx, SocketChannel msg) throws Exception { - ctx.nextInboundMessageBuffer().add(msg); - ctx.readable(false); - } - - } - - @ChannelHandler.Sharable - private static class GroupHandler extends ChannelInboundByteHandlerAdapter { - final ChannelGroup group = new DefaultChannelGroup(); - final AtomicReference exception = new AtomicReference(); - - @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - group.add(ctx.channel()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - if (exception.compareAndSet(null, cause)) { - ctx.close(); - } - } - - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - in.clear(); - } - } -} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java index 1891372eb9..fb6c3770b3 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketTestPermutation.java @@ -41,34 +41,7 @@ final class SocketTestPermutation { new ArrayList, Factory>>(); // Make the list of ServerBootstrap factories. - List> sbfs = - new ArrayList>(); - sbfs.add(new Factory() { - @Override - public ServerBootstrap newInstance() { - return new ServerBootstrap(). - group(new NioEventLoopGroup(), new NioEventLoopGroup()). - channel(new NioServerSocketChannel()); - } - }); - sbfs.add(new Factory() { - @Override - public ServerBootstrap newInstance() { - AioEventLoopGroup parentGroup = new AioEventLoopGroup(); - AioEventLoopGroup childGroup = new AioEventLoopGroup(); - return new ServerBootstrap(). - group(parentGroup, childGroup). - channel(new AioServerSocketChannel(parentGroup, childGroup)); - } - }); - sbfs.add(new Factory() { - @Override - public ServerBootstrap newInstance() { - return new ServerBootstrap(). - group(new OioEventLoopGroup(), new OioEventLoopGroup()). - channel(new OioServerSocketChannel()); - } - }); + List> sbfs = serverSocket(); // Make the list of Bootstrap factories. List> cbfs = @@ -170,6 +143,41 @@ final class SocketTestPermutation { return list; } + + static List> serverSocket() { + List> list = new ArrayList>(); + + // Make the list of ServerBootstrap factories. + list.add(new Factory() { + @Override + public ServerBootstrap newInstance() { + return new ServerBootstrap(). + group(new NioEventLoopGroup(), new NioEventLoopGroup()). + channel(new NioServerSocketChannel()); + } + }); + list.add(new Factory() { + @Override + public ServerBootstrap newInstance() { + AioEventLoopGroup parentGroup = new AioEventLoopGroup(); + AioEventLoopGroup childGroup = new AioEventLoopGroup(); + return new ServerBootstrap(). + group(parentGroup, childGroup). + channel(new AioServerSocketChannel(parentGroup, childGroup)); + } + }); + list.add(new Factory() { + @Override + public ServerBootstrap newInstance() { + return new ServerBootstrap(). + group(new OioEventLoopGroup(), new OioEventLoopGroup()). + channel(new OioServerSocketChannel()); + } + }); + + return list; + } + private SocketTestPermutation() {} interface Factory { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 147dff177a..4867179e9c 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -103,7 +103,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected void doBind(SocketAddress localAddress) throws Exception { AsynchronousServerSocketChannel ch = javaChannel(); - ch.bind(localAddress); + ch.bind(localAddress, config.getBacklog()); doAccept(); } @@ -154,9 +154,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server // create the socket add it to the buffer and fire the event channel.pipeline().inboundMessageBuffer().add( new AioSocketChannel(channel, null, channel.childGroup, ch)); - if (!channel.readSuspended.get()) { - channel.pipeline().fireInboundBufferUpdated(); - } + channel.pipeline().fireInboundBufferUpdated(); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index f47c329dda..bc28cf2180 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -334,9 +334,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } catch (Throwable t) { if (read) { read = false; - if (!channel.readSuspended.get()) { - pipeline.fireInboundBufferUpdated(); - } + pipeline.fireInboundBufferUpdated(); } if (!(t instanceof ClosedChannelException)) { @@ -351,9 +349,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne channel.readInProgress.set(false); if (read) { - if (!channel.readSuspended.get()) { - pipeline.fireInboundBufferUpdated(); - } + pipeline.fireInboundBufferUpdated(); } if (closed && channel.isOpen()) { channel.unsafe().close(channel.unsafe().voidFuture()); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java index 0c34a19995..90ddf6b597 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioByteChannel.java @@ -31,9 +31,11 @@ abstract class AbstractNioByteChannel extends AbstractNioChannel { } @Override - protected abstract AbstractNioByteUnsafe newUnsafe(); + protected NioByteUnsafe newUnsafe() { + return new NioByteUnsafe(); + } - abstract class AbstractNioByteUnsafe extends AbstractNioUnsafe { + final class NioByteUnsafe extends AbstractNioUnsafe { @Override public void read() { assert eventLoop().inEventLoop(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 0e82baf1be..03083c74e5 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -38,9 +38,26 @@ public abstract class AbstractNioChannel extends AbstractChannel { InternalLoggerFactory.getInstance(AbstractNioChannel.class); private final SelectableChannel ch; - private final int defaultInterestOps; + private final int readInterestOp; private volatile SelectionKey selectionKey; + final Runnable suspendReadTask = new Runnable() { + @Override + public void run() { + selectionKey().interestOps(selectionKey().interestOps() & ~readInterestOp); + } + + }; + + final Runnable resumeReadTask = new Runnable() { + @Override + public void run() { + selectionKey().interestOps(selectionKey().interestOps() | readInterestOp); + } + + }; + + /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. @@ -50,10 +67,10 @@ public abstract class AbstractNioChannel extends AbstractChannel { private ConnectException connectTimeoutException; protected AbstractNioChannel( - Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) { + Channel parent, Integer id, SelectableChannel ch, int readInterestOp) { super(parent, id); this.ch = ch; - this.defaultInterestOps = defaultInterestOps; + this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { @@ -107,6 +124,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { } protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { + @Override public java.nio.channels.Channel ch() { return javaChannel(); @@ -187,6 +205,26 @@ public abstract class AbstractNioChannel extends AbstractChannel { connectFuture = null; } } + + @Override + public void suspendRead() { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + suspendReadTask.run(); + } else { + loop.execute(suspendReadTask); + } + } + + @Override + public void resumeRead() { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + resumeReadTask.run(); + } else { + loop.execute(resumeReadTask); + } + } } @Override @@ -204,7 +242,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected Runnable doRegister() throws Exception { NioEventLoop loop = (NioEventLoop) eventLoop(); selectionKey = javaChannel().register( - loop.selector, isActive()? defaultInterestOps : 0, this); + loop.selector, isActive()? readInterestOp : 0, this); return null; } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index 8408ac02af..4275efc7e3 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -25,14 +25,16 @@ import java.nio.channels.SelectableChannel; abstract class AbstractNioMessageChannel extends AbstractNioChannel { protected AbstractNioMessageChannel( - Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) { - super(parent, id, ch, defaultInterestOps); + Channel parent, Integer id, SelectableChannel ch, int readInterestOp) { + super(parent, id, ch, readInterestOp); } @Override - protected abstract AbstractNioMessageUnsafe newUnsafe(); + protected NioMessageUnsafe newUnsafe() { + return new NioMessageUnsafe(); + } - abstract class AbstractNioMessageUnsafe extends AbstractNioUnsafe { + final class NioMessageUnsafe extends AbstractNioUnsafe { @Override public void read() { assert eventLoop().inEventLoop(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index ea99eaa158..19170c9093 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -457,22 +457,4 @@ public final class NioDatagramChannel } return future; } - - @Override - protected AbstractNioMessageUnsafe newUnsafe() { - return new NioDatagramChannelUnsafe(); - } - - private final class NioDatagramChannelUnsafe extends AbstractNioMessageUnsafe { - - @Override - public void suspendRead() { - selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ); - } - - @Override - public void resumeRead() { - selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ); - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 36d005faa3..aa881d7c29 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -46,7 +46,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel private final ServerSocketChannelConfig config; public NioServerSocketChannel() { - super(null, null, newSocket(), 0); + super(null, null, newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(javaChannel().socket()); } @@ -82,7 +82,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel @Override protected void doBind(SocketAddress localAddress) throws Exception { - javaChannel().socket().bind(localAddress); + javaChannel().socket().bind(localAddress, config.getBacklog()); SelectionKey selectionKey = selectionKey(); selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT); } @@ -128,26 +128,4 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel protected int doWriteMessages(MessageBuf buf, boolean lastSpin) throws Exception { throw new UnsupportedOperationException(); } - - @Override - protected AbstractNioMessageUnsafe newUnsafe() { - return new NioServerSocketUnsafe(); - } - - private final class NioServerSocketUnsafe extends AbstractNioMessageUnsafe { - @Override - public void suspendRead() { - selectionKey().cancel(); - } - - @Override - public void resumeRead() { - try { - doRegister(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 406ceee4c7..e7a270d73f 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -191,22 +191,4 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty return writtenBytes; } - - @Override - protected AbstractNioByteUnsafe newUnsafe() { - return new NioSocketChannelUnsafe(); - } - - private final class NioSocketChannelUnsafe extends AbstractNioByteUnsafe { - - @Override - public void suspendRead() { - selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ); - } - - @Override - public void resumeRead() { - selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_READ); - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index ec60234f71..be358b789f 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -126,7 +126,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel @Override protected void doBind(SocketAddress localAddress) throws Exception { - socket.bind(localAddress); + socket.bind(localAddress, config.getBacklog()); } @Override @@ -154,9 +154,6 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel s = socket.accept(); if (s != null) { buf.add(new OioSocketChannel(this, null, s)); - if (readSuspended) { - return 0; - } return 1; } } catch (SocketTimeoutException e) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 6310af890d..85b80e050c 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -172,13 +172,7 @@ public class OioSocketChannel extends AbstractOioByteChannel } try { - int read = buf.writeBytes(is, buf.writableBytes()); - if (read > 0 && !readSuspended) { - return read; - } else { - // so the read bytes were 0 or the read was suspend - return 0; - } + return buf.writeBytes(is, buf.writableBytes()); } catch (SocketTimeoutException e) { return 0; }