From cef7dfc02f8949c25d60565c3a7689a1ee3aacc5 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 8 Jul 2012 00:53:56 +0900 Subject: [PATCH] Made the AIO transport adhere to Netty thread model strictly - Fixed data races - Simplified channel creation using dummy AsyncChannelGroup --- .../java/io/netty/handler/ssl/SslHandler.java | 10 +- .../netty/channel/MultithreadEventLoop.java | 1 - .../channel/local/LocalChildEventLoop.java | 10 +- .../socket/aio/AbstractAioChannel.java | 13 +- .../channel/socket/aio/AioChildEventLoop.java | 10 +- .../socket/aio/AioCompletionHandler.java | 56 +++++-- .../io/netty/channel/socket/aio/AioGroup.java | 61 ++++++++ .../socket/aio/AioServerSocketChannel.java | 33 ++-- .../aio/AioServerSocketChannelConfig.java | 45 +----- .../channel/socket/aio/AioSocketChannel.java | 64 ++++---- .../socket/aio/AioSocketChannelConfig.java | 141 +----------------- .../channel/SingleThreadEventLoopTest.java | 2 +- 12 files changed, 176 insertions(+), 270 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/aio/AioGroup.java diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index b0ded28cc6..1ea275690e 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -32,8 +32,6 @@ import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DetectionUtil; import java.io.IOException; -import java.net.DatagramSocket; -import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.DatagramChannel; @@ -721,7 +719,6 @@ public class SslHandler "SSLEngine.closeInbound() raised an exception after " + "a handshake failure.", e); } - } for (;;) { @@ -757,7 +754,7 @@ public class SslHandler @Override public void run() { if (future.setSuccess()) { - logger.debug("close_notify write attempt timed out. Force-closing the connection."); + logger.warn("close_notify write attempt timed out. Force-closing the connection."); ctx.close(ctx.newFuture()); } } @@ -768,8 +765,9 @@ public class SslHandler @Override public void operationComplete(ChannelFuture f) throws Exception { - timeoutFuture.cancel(false); - ctx.close(future); + if (timeoutFuture.cancel(false)) { + ctx.close(future); + } } }); } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java index 30ba36134c..2af04ba366 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java @@ -19,7 +19,6 @@ import java.util.concurrent.ThreadFactory; public abstract class MultithreadEventLoop extends MultithreadEventExecutor implements EventLoop { - protected MultithreadEventLoop(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads, threadFactory, args); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java b/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java index cba470b10e..31d0145a94 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChildEventLoop.java @@ -36,15 +36,19 @@ final class LocalChildEventLoop extends SingleThreadEventLoop { // Waken up by interruptThread() } - if (isShutdown() && peekTask() == null) { - break; + if (isShutdown()) { + task = pollTask(); + if (task == null) { + break; + } + task.run(); } } } @Override protected void wakeup(boolean inEventLoop) { - if (!inEventLoop) { + if (!inEventLoop && isShutdown()) { interruptThread(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index 8a16d6366d..21ae6f7a9b 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit; public abstract class AbstractAioChannel extends AbstractChannel { - protected volatile AsynchronousChannel ch; + private final AsynchronousChannel ch; /** * The future of the current connection attempt. If not null, subsequent @@ -39,23 +39,18 @@ public abstract class AbstractAioChannel extends AbstractChannel { protected ScheduledFuture connectTimeoutFuture; private ConnectException connectTimeoutException; - protected AbstractAioChannel(Channel parent, Integer id) { + protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) { super(parent, id); + this.ch = ch; } @Override public InetSocketAddress localAddress() { - if (ch == null) { - return null; - } return (InetSocketAddress) super.localAddress(); } @Override public InetSocketAddress remoteAddress() { - if (ch == null) { - return null; - } return (InetSocketAddress) super.remoteAddress(); } @@ -65,7 +60,7 @@ public abstract class AbstractAioChannel extends AbstractChannel { @Override public boolean isOpen() { - return ch == null || ch.isOpen(); + return ch.isOpen(); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java index 9b53267910..ee61b1e4ee 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java @@ -36,15 +36,19 @@ final class AioChildEventLoop extends SingleThreadEventLoop { // Waken up by interruptThread() } - if (isShutdown() && peekTask() == null) { - break; + if (isShutdown()) { + task = pollTask(); + if (task == null) { + break; + } + task.run(); } } } @Override protected void wakeup(boolean inEventLoop) { - if (!inEventLoop) { + if (!inEventLoop && isShutdown()) { interruptThread(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java index 42201f7d43..d87318787e 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java @@ -37,31 +37,59 @@ abstract class AioCompletionHandler implements CompletionH */ protected abstract void failed0(Throwable exc, A channel); + private static final int MAX_STACK_DEPTH = 4; + private static final ThreadLocal STACK_DEPTH = new ThreadLocal() { + @Override + protected Integer initialValue() { + return 0; + } + }; + @Override public final void completed(final V result, final A channel) { - if (channel.eventLoop().inEventLoop()) { - completed0(result, channel); - } else { - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { + EventLoop loop = channel.eventLoop(); + if (loop.inEventLoop()) { + Integer d = STACK_DEPTH.get(); + if (d < MAX_STACK_DEPTH) { + STACK_DEPTH.set(d + 1); + try { completed0(result, channel); + } finally { + STACK_DEPTH.set(d); } - }); + return; + } } + + loop.execute(new Runnable() { + @Override + public void run() { + completed0(result, channel); + } + }); } @Override public final void failed(final Throwable exc, final A channel) { - if (channel.eventLoop().inEventLoop()) { - failed0(exc, channel); - } else { - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { + EventLoop loop = channel.eventLoop(); + if (loop.inEventLoop()) { + Integer d = STACK_DEPTH.get(); + if (d < MAX_STACK_DEPTH) { + STACK_DEPTH.set(d + 1); + try { failed0(exc, channel); + } finally { + STACK_DEPTH.set(d); } - }); + return; + } } + + loop.execute(new Runnable() { + @Override + public void run() { + failed0(exc, channel); + } + }); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioGroup.java b/transport/src/main/java/io/netty/channel/socket/aio/AioGroup.java new file mode 100644 index 0000000000..f45db8433e --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioGroup.java @@ -0,0 +1,61 @@ +package io.netty.channel.socket.aio; + +import java.nio.channels.AsynchronousChannelGroup; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.TimeUnit; + +final class AioGroup { + + static final AsynchronousChannelGroup GROUP; + + static { + AsynchronousChannelGroup group; + try { + group = AsynchronousChannelGroup.withThreadPool(new AioGroupExecutor()); + } catch (Exception e) { + throw new Error(e); + } + + GROUP = group; + } + + private AioGroup() { + // Unused + } + + static final class AioGroupExecutor extends AbstractExecutorService { + + @Override + public void shutdown() { + // Unstoppable + } + + @Override + public List shutdownNow() { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + Thread.sleep(unit.toMillis(timeout)); + return false; + } + + @Override + public void execute(Runnable command) { + command.run(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index b45c375b52..9d27d5dc00 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -26,7 +26,6 @@ import io.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; -import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; @@ -39,11 +38,20 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server private static final InternalLogger logger = InternalLoggerFactory.getInstance(AioServerSocketChannel.class); - private final AioServerSocketChannelConfig config = new AioServerSocketChannelConfig(); + private final AioServerSocketChannelConfig config; private boolean closed; + private static AsynchronousServerSocketChannel newSocket() { + try { + return AsynchronousServerSocketChannel.open(AioGroup.GROUP); + } catch (IOException e) { + throw new ChannelException("Failed to open a socket.", e); + } + } + public AioServerSocketChannel() { - super(null, null); + super(null, null, newSocket()); + config = new AioServerSocketChannelConfig(javaChannel()); } @Override @@ -53,15 +61,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override public boolean isActive() { - AsynchronousServerSocketChannel channel = javaChannel(); - try { - if (channel != null && channel.getLocalAddress() != null) { - return true; - } - } catch (IOException e) { - return true; - } - return false; + return javaChannel().isOpen() && localAddress0() != null; } @Override @@ -85,9 +85,9 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected void doBind(SocketAddress localAddress) throws Exception { - javaChannel().bind(localAddress); - javaChannel().accept(this, ACCEPT_HANDLER); - + AsynchronousServerSocketChannel ch = javaChannel(); + ch.bind(localAddress); + ch.accept(this, ACCEPT_HANDLER); } @Override @@ -116,9 +116,6 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected Runnable doRegister() throws Exception { - ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - config.setChannel(javaChannel()); - return null; } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java index cd74cddc17..990e8172b5 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java @@ -33,26 +33,11 @@ import java.util.Map; final class AioServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { - private volatile AsynchronousServerSocketChannel channel; - private volatile Integer receiveBufferSize; - private volatile Boolean reuseAddress; + private final AsynchronousServerSocketChannel channel; private volatile int backlog = NetworkConstants.SOMAXCONN; - void setChannel(AsynchronousServerSocketChannel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - if (this.channel != null) { - throw new IllegalStateException(); - } + AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) { this.channel = channel; - - if (receiveBufferSize != null) { - setReceiveBufferSize(receiveBufferSize); - } - if (reuseAddress != null) { - setReuseAddress(reuseAddress); - } } @Override @@ -94,14 +79,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public boolean isReuseAddress() { - AsynchronousServerSocketChannel channel = this.channel; - if (channel == null) { - if (reuseAddress == null) { - return false; - } else { - return reuseAddress; - } - } try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -111,10 +88,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public void setReuseAddress(boolean reuseAddress) { - AsynchronousServerSocketChannel channel = this.channel; - if (channel == null) { - this.reuseAddress = reuseAddress; - } try { channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); } catch (IOException e) { @@ -124,14 +97,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public int getReceiveBufferSize() { - AsynchronousServerSocketChannel channel = this.channel; - if (channel == null) { - if (receiveBufferSize == null) { - return 0; - } else { - return receiveBufferSize; - } - } try { return channel.getOption(StandardSocketOptions.SO_RCVBUF); } catch (IOException e) { @@ -141,12 +106,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public void setReceiveBufferSize(int receiveBufferSize) { - AsynchronousServerSocketChannel channel = this.channel; - if (channel == null) { - this.receiveBufferSize = receiveBufferSize; - return; - } - try { channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); } catch (IOException e) { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 3d33d55d5d..64b0e06cff 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.aio; import io.netty.buffer.ByteBuf; import io.netty.buffer.ChannelBufType; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelPipeline; @@ -26,12 +27,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; -import java.nio.channels.NetworkChannel; public class AioSocketChannel extends AbstractAioChannel implements SocketChannel { @@ -42,29 +41,30 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne private static final CompletionHandler READ_HANDLER = new ReadHandler(); private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); - private final AioSocketChannelConfig config = new AioSocketChannelConfig(); + private final AioSocketChannelConfig config; private boolean closed; private boolean flushing; - public AioSocketChannel() { - this(null, null, null); + private static AsynchronousSocketChannel newSocket() { + try { + return AsynchronousSocketChannel.open(AioGroup.GROUP); + } catch (IOException e) { + throw new ChannelException("Failed to open a socket.", e); + } } - public AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) { - super(parent, id); - ch = channel; - if (ch != null) { - config.setChannel(channel); - } + public AioSocketChannel() { + this(null, null, newSocket()); + } + + AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) { + super(parent, id, ch); + config = new AioSocketChannelConfig(ch); } @Override public boolean isActive() { - if (ch == null) { - return false; - } - AsynchronousSocketChannel ch = javaChannel(); - return ch.isOpen() && remoteAddress() != null; + return javaChannel().isOpen() && remoteAddress0() != null; } @Override @@ -79,7 +79,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) { - assert ch != null; if (localAddress != null) { try { javaChannel().bind(localAddress); @@ -112,19 +111,16 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected Runnable doRegister() throws Exception { - if (ch == null) { - ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - config.setChannel((NetworkChannel) ch); + if (remoteAddress() == null) { return null; - } else if (remoteAddress() != null) { - return new Runnable() { - @Override - public void run() { - read(); - } - }; } - return null; + + return new Runnable() { + @Override + public void run() { + read(); + } + }; } /** @@ -132,7 +128,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne */ void read() { ByteBuf byteBuf = pipeline().inboundByteBuffer(); - expandReadBuffer(byteBuf); + if (!byteBuf.readable()) { + byteBuf.clear(); + } else { + expandReadBuffer(byteBuf); + } // Get a ByteBuffer view on the ByteBuf ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); javaChannel().read(buffer, this, READ_HANDLER); @@ -264,6 +264,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne // This is needed as the ByteBuffer and the ByteBuf does not share // each others index byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + expandReadBuffer(byteBuf); read = true; } else if (localReadAmount < 0) { @@ -338,11 +339,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override public AioSocketChannelConfig config() { - if (config == null) { - throw new IllegalStateException("Channel not open yet"); - } return config; } - - } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java index 5cb4f6057f..9b2bf14f51 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -32,48 +32,17 @@ import java.util.Map; final class AioSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { - private volatile NetworkChannel channel; - private volatile Integer receiveBufferSize; - private volatile Integer sendBufferSize; - private volatile Boolean tcpNoDelay; - private volatile Boolean keepAlive; - private volatile Boolean reuseAddress; - private volatile Integer soLinger; - private volatile Integer trafficClass; + private final NetworkChannel channel; /** * Creates a new instance. */ - void setChannel(NetworkChannel channel) { + AioSocketChannelConfig(NetworkChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } - if (this.channel != null) { - throw new IllegalStateException(); - } - this.channel = channel; - if (receiveBufferSize != null) { - setReceiveBufferSize(receiveBufferSize); - } - if (sendBufferSize != null) { - setSendBufferSize(sendBufferSize); - } - if (reuseAddress != null) { - setReuseAddress(reuseAddress); - } - if (tcpNoDelay != null) { - setTcpNoDelay(tcpNoDelay); - } - if (keepAlive != null) { - setKeepAlive(keepAlive); - } - if (soLinger != null) { - setSoLinger(soLinger); - } - if (trafficClass != null) { - setTrafficClass(trafficClass); - } + this.channel = channel; } @Override @@ -137,15 +106,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getReceiveBufferSize() { - NetworkChannel channel = this.channel; - if (channel == null) { - if (receiveBufferSize == null) { - return 0; - } else { - return receiveBufferSize; - } - } - try { return channel.getOption(StandardSocketOptions.SO_RCVBUF); } catch (IOException e) { @@ -155,15 +115,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getSendBufferSize() { - NetworkChannel channel = this.channel; - if (channel == null) { - if (sendBufferSize == null) { - return 0; - } else { - return sendBufferSize; - } - } - try { return channel.getOption(StandardSocketOptions.SO_SNDBUF); } catch (IOException e) { @@ -173,15 +124,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getSoLinger() { - NetworkChannel channel = this.channel; - if (channel == null) { - if (soLinger == null) { - return 1; - } else { - return soLinger; - } - } - try { return channel.getOption(StandardSocketOptions.SO_LINGER); } catch (IOException e) { @@ -191,15 +133,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getTrafficClass() { - NetworkChannel channel = this.channel; - if (channel == null) { - if (trafficClass == null) { - return 0; - } else { - return trafficClass; - } - } - try { return channel.getOption(StandardSocketOptions.IP_TOS); } catch (IOException e) { @@ -209,15 +142,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isKeepAlive() { - NetworkChannel channel = this.channel; - if (channel == null) { - if (keepAlive == null) { - return false; - } else { - return keepAlive; - } - } - try { return channel.getOption(StandardSocketOptions.SO_KEEPALIVE); } catch (IOException e) { @@ -227,15 +151,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isReuseAddress() { - NetworkChannel channel = this.channel; - if (channel == null) { - if (reuseAddress == null) { - return false; - } else { - return reuseAddress; - } - } - try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -245,15 +160,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isTcpNoDelay() { - NetworkChannel channel = this.channel; - if (channel == null) { - if (tcpNoDelay == null) { - return false; - } else { - return tcpNoDelay; - } - } - try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -263,12 +169,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setKeepAlive(boolean keepAlive) { - NetworkChannel channel = this.channel; - if (channel == null) { - this.keepAlive = keepAlive; - return; - } - try { channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive); } catch (IOException e) { @@ -284,12 +184,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setReceiveBufferSize(int receiveBufferSize) { - NetworkChannel channel = this.channel; - if (channel == null) { - this.receiveBufferSize = receiveBufferSize; - return; - } - try { channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); } catch (IOException e) { @@ -299,12 +193,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setReuseAddress(boolean reuseAddress) { - NetworkChannel channel = this.channel; - if (channel == null) { - this.reuseAddress = reuseAddress; - return; - } - try { channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); } catch (IOException e) { @@ -314,12 +202,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setSendBufferSize(int sendBufferSize) { - NetworkChannel channel = this.channel; - if (channel == null) { - this.sendBufferSize = sendBufferSize; - return; - } - try { channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); } catch (IOException e) { @@ -329,12 +211,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setSoLinger(int soLinger) { - NetworkChannel channel = this.channel; - if (channel == null) { - this.soLinger = soLinger; - return; - } - try { channel.setOption(StandardSocketOptions.SO_LINGER, soLinger); } catch (IOException e) { @@ -344,12 +220,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setTcpNoDelay(boolean tcpNoDelay) { - NetworkChannel channel = this.channel; - if (channel == null) { - this.tcpNoDelay = tcpNoDelay; - return; - } - try { channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); } catch (IOException e) { @@ -359,11 +229,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setTrafficClass(int trafficClass) { - NetworkChannel channel = this.channel; - if (channel == null) { - this.trafficClass = trafficClass; - } - try { channel.setOption(StandardSocketOptions.IP_TOS, trafficClass); } catch (IOException e) { diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index e115fdeb9c..87dce9a70a 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -290,7 +290,7 @@ public class SingleThreadEventLoopTest { @Override protected void wakeup(boolean inEventLoop) { - if (!inEventLoop) { + if (!inEventLoop && isShutdown()) { interruptThread(); } }