Made the AIO transport adhere to Netty thread model strictly

- Fixed data races
- Simplified channel creation using dummy AsyncChannelGroup
This commit is contained in:
Trustin Lee 2012-07-08 00:53:56 +09:00
parent 613834f326
commit cef7dfc02f
12 changed files with 176 additions and 270 deletions

View File

@ -32,8 +32,6 @@ import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
@ -721,7 +719,6 @@ public class SslHandler
"SSLEngine.closeInbound() raised an exception after " +
"a handshake failure.", e);
}
}
for (;;) {
@ -757,7 +754,7 @@ public class SslHandler
@Override
public void run() {
if (future.setSuccess()) {
logger.debug("close_notify write attempt timed out. Force-closing the connection.");
logger.warn("close_notify write attempt timed out. Force-closing the connection.");
ctx.close(ctx.newFuture());
}
}
@ -768,8 +765,9 @@ public class SslHandler
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
timeoutFuture.cancel(false);
ctx.close(future);
if (timeoutFuture.cancel(false)) {
ctx.close(future);
}
}
});
}

View File

@ -19,7 +19,6 @@ import java.util.concurrent.ThreadFactory;
public abstract class MultithreadEventLoop extends MultithreadEventExecutor implements EventLoop {
protected MultithreadEventLoop(int nThreads, ThreadFactory threadFactory,
Object... args) {
super(nThreads, threadFactory, args);

View File

@ -36,15 +36,19 @@ final class LocalChildEventLoop extends SingleThreadEventLoop {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
if (isShutdown()) {
task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}

View File

@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractAioChannel extends AbstractChannel {
protected volatile AsynchronousChannel ch;
private final AsynchronousChannel ch;
/**
* The future of the current connection attempt. If not null, subsequent
@ -39,23 +39,18 @@ public abstract class AbstractAioChannel extends AbstractChannel {
protected ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
protected AbstractAioChannel(Channel parent, Integer id) {
protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) {
super(parent, id);
this.ch = ch;
}
@Override
public InetSocketAddress localAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.localAddress();
}
@Override
public InetSocketAddress remoteAddress() {
if (ch == null) {
return null;
}
return (InetSocketAddress) super.remoteAddress();
}
@ -65,7 +60,7 @@ public abstract class AbstractAioChannel extends AbstractChannel {
@Override
public boolean isOpen() {
return ch == null || ch.isOpen();
return ch.isOpen();
}
@Override

View File

@ -36,15 +36,19 @@ final class AioChildEventLoop extends SingleThreadEventLoop {
// Waken up by interruptThread()
}
if (isShutdown() && peekTask() == null) {
break;
if (isShutdown()) {
task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}

View File

@ -37,31 +37,59 @@ abstract class AioCompletionHandler<V, A extends Channel> implements CompletionH
*/
protected abstract void failed0(Throwable exc, A channel);
private static final int MAX_STACK_DEPTH = 4;
private static final ThreadLocal<Integer> STACK_DEPTH = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
@Override
public final void completed(final V result, final A channel) {
if (channel.eventLoop().inEventLoop()) {
completed0(result, channel);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
Integer d = STACK_DEPTH.get();
if (d < MAX_STACK_DEPTH) {
STACK_DEPTH.set(d + 1);
try {
completed0(result, channel);
} finally {
STACK_DEPTH.set(d);
}
});
return;
}
}
loop.execute(new Runnable() {
@Override
public void run() {
completed0(result, channel);
}
});
}
@Override
public final void failed(final Throwable exc, final A channel) {
if (channel.eventLoop().inEventLoop()) {
failed0(exc, channel);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
Integer d = STACK_DEPTH.get();
if (d < MAX_STACK_DEPTH) {
STACK_DEPTH.set(d + 1);
try {
failed0(exc, channel);
} finally {
STACK_DEPTH.set(d);
}
});
return;
}
}
loop.execute(new Runnable() {
@Override
public void run() {
failed0(exc, channel);
}
});
}
}

View File

@ -0,0 +1,61 @@
package io.netty.channel.socket.aio;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;
final class AioGroup {
static final AsynchronousChannelGroup GROUP;
static {
AsynchronousChannelGroup group;
try {
group = AsynchronousChannelGroup.withThreadPool(new AioGroupExecutor());
} catch (Exception e) {
throw new Error(e);
}
GROUP = group;
}
private AioGroup() {
// Unused
}
static final class AioGroupExecutor extends AbstractExecutorService {
@Override
public void shutdown() {
// Unstoppable
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
Thread.sleep(unit.toMillis(timeout));
return false;
}
@Override
public void execute(Runnable command) {
command.run();
}
}
}

View File

@ -26,7 +26,6 @@ import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
@ -39,11 +38,20 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AioServerSocketChannel.class);
private final AioServerSocketChannelConfig config = new AioServerSocketChannelConfig();
private final AioServerSocketChannelConfig config;
private boolean closed;
private static AsynchronousServerSocketChannel newSocket() {
try {
return AsynchronousServerSocketChannel.open(AioGroup.GROUP);
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public AioServerSocketChannel() {
super(null, null);
super(null, null, newSocket());
config = new AioServerSocketChannelConfig(javaChannel());
}
@Override
@ -53,15 +61,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
@Override
public boolean isActive() {
AsynchronousServerSocketChannel channel = javaChannel();
try {
if (channel != null && channel.getLocalAddress() != null) {
return true;
}
} catch (IOException e) {
return true;
}
return false;
return javaChannel().isOpen() && localAddress0() != null;
}
@Override
@ -85,9 +85,9 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
javaChannel().accept(this, ACCEPT_HANDLER);
AsynchronousServerSocketChannel ch = javaChannel();
ch.bind(localAddress);
ch.accept(this, ACCEPT_HANDLER);
}
@Override
@ -116,9 +116,6 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
@Override
protected Runnable doRegister() throws Exception {
ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
config.setChannel(javaChannel());
return null;
}

View File

@ -33,26 +33,11 @@ import java.util.Map;
final class AioServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig {
private volatile AsynchronousServerSocketChannel channel;
private volatile Integer receiveBufferSize;
private volatile Boolean reuseAddress;
private final AsynchronousServerSocketChannel channel;
private volatile int backlog = NetworkConstants.SOMAXCONN;
void setChannel(AsynchronousServerSocketChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (this.channel != null) {
throw new IllegalStateException();
}
AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) {
this.channel = channel;
if (receiveBufferSize != null) {
setReceiveBufferSize(receiveBufferSize);
}
if (reuseAddress != null) {
setReuseAddress(reuseAddress);
}
}
@Override
@ -94,14 +79,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
@Override
public boolean isReuseAddress() {
AsynchronousServerSocketChannel channel = this.channel;
if (channel == null) {
if (reuseAddress == null) {
return false;
} else {
return reuseAddress;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
@ -111,10 +88,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
@Override
public void setReuseAddress(boolean reuseAddress) {
AsynchronousServerSocketChannel channel = this.channel;
if (channel == null) {
this.reuseAddress = reuseAddress;
}
try {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
} catch (IOException e) {
@ -124,14 +97,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
@Override
public int getReceiveBufferSize() {
AsynchronousServerSocketChannel channel = this.channel;
if (channel == null) {
if (receiveBufferSize == null) {
return 0;
} else {
return receiveBufferSize;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException e) {
@ -141,12 +106,6 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
AsynchronousServerSocketChannel channel = this.channel;
if (channel == null) {
this.receiveBufferSize = receiveBufferSize;
return;
}
try {
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.aio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
@ -26,12 +27,10 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NetworkChannel;
public class AioSocketChannel extends AbstractAioChannel implements SocketChannel {
@ -42,29 +41,30 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private static final CompletionHandler<Integer, AioSocketChannel> READ_HANDLER = new ReadHandler();
private static final CompletionHandler<Integer, AioSocketChannel> WRITE_HANDLER = new WriteHandler();
private final AioSocketChannelConfig config = new AioSocketChannelConfig();
private final AioSocketChannelConfig config;
private boolean closed;
private boolean flushing;
public AioSocketChannel() {
this(null, null, null);
private static AsynchronousSocketChannel newSocket() {
try {
return AsynchronousSocketChannel.open(AioGroup.GROUP);
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel channel) {
super(parent, id);
ch = channel;
if (ch != null) {
config.setChannel(channel);
}
public AioSocketChannel() {
this(null, null, newSocket());
}
AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) {
super(parent, id, ch);
config = new AioSocketChannelConfig(ch);
}
@Override
public boolean isActive() {
if (ch == null) {
return false;
}
AsynchronousSocketChannel ch = javaChannel();
return ch.isOpen() && remoteAddress() != null;
return javaChannel().isOpen() && remoteAddress0() != null;
}
@Override
@ -79,7 +79,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelFuture future) {
assert ch != null;
if (localAddress != null) {
try {
javaChannel().bind(localAddress);
@ -112,19 +111,16 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected Runnable doRegister() throws Exception {
if (ch == null) {
ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop()));
config.setChannel((NetworkChannel) ch);
if (remoteAddress() == null) {
return null;
} else if (remoteAddress() != null) {
return new Runnable() {
@Override
public void run() {
read();
}
};
}
return null;
return new Runnable() {
@Override
public void run() {
read();
}
};
}
/**
@ -132,7 +128,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
*/
void read() {
ByteBuf byteBuf = pipeline().inboundByteBuffer();
expandReadBuffer(byteBuf);
if (!byteBuf.readable()) {
byteBuf.clear();
} else {
expandReadBuffer(byteBuf);
}
// Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffer, this, READ_HANDLER);
@ -264,6 +264,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
// This is needed as the ByteBuffer and the ByteBuf does not share
// each others index
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
expandReadBuffer(byteBuf);
read = true;
} else if (localReadAmount < 0) {
@ -338,11 +339,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
public AioSocketChannelConfig config() {
if (config == null) {
throw new IllegalStateException("Channel not open yet");
}
return config;
}
}

View File

@ -32,48 +32,17 @@ import java.util.Map;
final class AioSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
private volatile NetworkChannel channel;
private volatile Integer receiveBufferSize;
private volatile Integer sendBufferSize;
private volatile Boolean tcpNoDelay;
private volatile Boolean keepAlive;
private volatile Boolean reuseAddress;
private volatile Integer soLinger;
private volatile Integer trafficClass;
private final NetworkChannel channel;
/**
* Creates a new instance.
*/
void setChannel(NetworkChannel channel) {
AioSocketChannelConfig(NetworkChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (this.channel != null) {
throw new IllegalStateException();
}
this.channel = channel;
if (receiveBufferSize != null) {
setReceiveBufferSize(receiveBufferSize);
}
if (sendBufferSize != null) {
setSendBufferSize(sendBufferSize);
}
if (reuseAddress != null) {
setReuseAddress(reuseAddress);
}
if (tcpNoDelay != null) {
setTcpNoDelay(tcpNoDelay);
}
if (keepAlive != null) {
setKeepAlive(keepAlive);
}
if (soLinger != null) {
setSoLinger(soLinger);
}
if (trafficClass != null) {
setTrafficClass(trafficClass);
}
this.channel = channel;
}
@Override
@ -137,15 +106,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public int getReceiveBufferSize() {
NetworkChannel channel = this.channel;
if (channel == null) {
if (receiveBufferSize == null) {
return 0;
} else {
return receiveBufferSize;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException e) {
@ -155,15 +115,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public int getSendBufferSize() {
NetworkChannel channel = this.channel;
if (channel == null) {
if (sendBufferSize == null) {
return 0;
} else {
return sendBufferSize;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_SNDBUF);
} catch (IOException e) {
@ -173,15 +124,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public int getSoLinger() {
NetworkChannel channel = this.channel;
if (channel == null) {
if (soLinger == null) {
return 1;
} else {
return soLinger;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_LINGER);
} catch (IOException e) {
@ -191,15 +133,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public int getTrafficClass() {
NetworkChannel channel = this.channel;
if (channel == null) {
if (trafficClass == null) {
return 0;
} else {
return trafficClass;
}
}
try {
return channel.getOption(StandardSocketOptions.IP_TOS);
} catch (IOException e) {
@ -209,15 +142,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public boolean isKeepAlive() {
NetworkChannel channel = this.channel;
if (channel == null) {
if (keepAlive == null) {
return false;
} else {
return keepAlive;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_KEEPALIVE);
} catch (IOException e) {
@ -227,15 +151,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public boolean isReuseAddress() {
NetworkChannel channel = this.channel;
if (channel == null) {
if (reuseAddress == null) {
return false;
} else {
return reuseAddress;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
@ -245,15 +160,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public boolean isTcpNoDelay() {
NetworkChannel channel = this.channel;
if (channel == null) {
if (tcpNoDelay == null) {
return false;
} else {
return tcpNoDelay;
}
}
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
@ -263,12 +169,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public void setKeepAlive(boolean keepAlive) {
NetworkChannel channel = this.channel;
if (channel == null) {
this.keepAlive = keepAlive;
return;
}
try {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive);
} catch (IOException e) {
@ -284,12 +184,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
NetworkChannel channel = this.channel;
if (channel == null) {
this.receiveBufferSize = receiveBufferSize;
return;
}
try {
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
@ -299,12 +193,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public void setReuseAddress(boolean reuseAddress) {
NetworkChannel channel = this.channel;
if (channel == null) {
this.reuseAddress = reuseAddress;
return;
}
try {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
} catch (IOException e) {
@ -314,12 +202,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public void setSendBufferSize(int sendBufferSize) {
NetworkChannel channel = this.channel;
if (channel == null) {
this.sendBufferSize = sendBufferSize;
return;
}
try {
channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
} catch (IOException e) {
@ -329,12 +211,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public void setSoLinger(int soLinger) {
NetworkChannel channel = this.channel;
if (channel == null) {
this.soLinger = soLinger;
return;
}
try {
channel.setOption(StandardSocketOptions.SO_LINGER, soLinger);
} catch (IOException e) {
@ -344,12 +220,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public void setTcpNoDelay(boolean tcpNoDelay) {
NetworkChannel channel = this.channel;
if (channel == null) {
this.tcpNoDelay = tcpNoDelay;
return;
}
try {
channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
} catch (IOException e) {
@ -359,11 +229,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override
public void setTrafficClass(int trafficClass) {
NetworkChannel channel = this.channel;
if (channel == null) {
this.trafficClass = trafficClass;
}
try {
channel.setOption(StandardSocketOptions.IP_TOS, trafficClass);
} catch (IOException e) {

View File

@ -290,7 +290,7 @@ public class SingleThreadEventLoopTest {
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
if (!inEventLoop && isShutdown()) {
interruptThread();
}
}