Resolved issue: NETTY-256 (A race condition during the recommended server shutdown procedure)

* Fixed a bug in the socket transport implementations where a new connection can be accepted after the ChannelFuture of ServerSocketChannel.close() is complete.
* Introduced a lock to ensure that the boss thread is terminated before notifying the future
This commit is contained in:
Trustin Lee 2009-11-30 11:59:00 +00:00
parent 1688569758
commit 42d2f79239
4 changed files with 42 additions and 14 deletions

View File

@ -20,6 +20,8 @@ import static org.jboss.netty.channel.Channels.*;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.channel.AbstractServerChannel; import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
@ -46,6 +48,7 @@ class NioServerSocketChannel extends AbstractServerChannel
InternalLoggerFactory.getInstance(NioServerSocketChannel.class); InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
final ServerSocketChannel socket; final ServerSocketChannel socket;
final Lock shutdownLock = new ReentrantLock();
private final ServerSocketChannelConfig config; private final ServerSocketChannelConfig config;
NioServerSocketChannel( NioServerSocketChannel(

View File

@ -175,14 +175,23 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
boolean bound = channel.isBound(); boolean bound = channel.isBound();
try { try {
channel.socket.close(); channel.socket.close();
if (channel.setClosed()) {
future.setSuccess(); // Make sure the boss thread is not running so that that the future
if (bound) { // is notified after a new connection cannot be accepted anymore.
fireChannelUnbound(channel); // See NETTY-256 for more information.
channel.shutdownLock.lock();
try {
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
} }
fireChannelClosed(channel); } finally {
} else { channel.shutdownLock.unlock();
future.setSuccess();
} }
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -218,6 +227,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
public void run() { public void run() {
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
for (;;) { for (;;) {
try { try {
if (selector.select(1000) > 0) { if (selector.select(1000) > 0) {
@ -249,6 +259,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
} }
} }
channel.shutdownLock.unlock();
closeSelector(); closeSelector();
} }

View File

@ -20,6 +20,8 @@ import static org.jboss.netty.channel.Channels.*;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.channel.AbstractServerChannel; import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
@ -47,6 +49,7 @@ class OioServerSocketChannel extends AbstractServerChannel
InternalLoggerFactory.getInstance(OioServerSocketChannel.class); InternalLoggerFactory.getInstance(OioServerSocketChannel.class);
final ServerSocket socket; final ServerSocket socket;
final Lock shutdownLock = new ReentrantLock();
private final ServerSocketChannelConfig config; private final ServerSocketChannelConfig config;
OioServerSocketChannel( OioServerSocketChannel(

View File

@ -164,14 +164,23 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
boolean bound = channel.isBound(); boolean bound = channel.isBound();
try { try {
channel.socket.close(); channel.socket.close();
if (channel.setClosed()) {
future.setSuccess(); // Make sure the boss thread is not running so that that the future
if (bound) { // is notified after a new connection cannot be accepted anymore.
fireChannelUnbound(channel); // See NETTY-256 for more information.
channel.shutdownLock.lock();
try {
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
} }
fireChannelClosed(channel); } finally {
} else { channel.shutdownLock.unlock();
future.setSuccess();
} }
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -187,6 +196,7 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
} }
public void run() { public void run() {
channel.shutdownLock.lock();
while (channel.isBound()) { while (channel.isBound()) {
try { try {
Socket acceptedSocket = channel.socket.accept(); Socket acceptedSocket = channel.socket.accept();
@ -238,6 +248,7 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
} }
} }
} }
channel.shutdownLock.unlock();
} }
} }
} }