Fixed a bug where some ChannelPipelineSinks do not always release its shutdownLock on an Error

This commit is contained in:
Trustin Lee 2010-08-26 03:13:14 +00:00
parent 30d5136973
commit e8fcbd4e75
2 changed files with 78 additions and 73 deletions

View File

@ -234,39 +234,41 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock(); channel.shutdownLock.lock();
for (;;) { try {
try { for (;;) {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (IOException e) {
logger.warn(
"Failed to accept a connection.", e);
try { try {
Thread.sleep(1000); if (selector.select(1000) > 0) {
} catch (InterruptedException e1) { selector.selectedKeys().clear();
// Ignore }
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedSelectorException e) {
// Raised by accept() when the server socket was closed.
} catch (ClosedChannelException e) {
// Closed as requested.
break;
} catch (Throwable e) {
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
} }
} }
} finally {
channel.shutdownLock.unlock();
closeSelector();
} }
channel.shutdownLock.unlock();
closeSelector();
} }
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {

View File

@ -201,57 +201,60 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
public void run() { public void run() {
channel.shutdownLock.lock(); channel.shutdownLock.lock();
while (channel.isBound()) { try {
try { while (channel.isBound()) {
Socket acceptedSocket = channel.socket.accept();
try { try {
ChannelPipeline pipeline = Socket acceptedSocket = channel.socket.accept();
channel.getConfig().getPipelineFactory().getPipeline();
final OioAcceptedSocketChannel acceptedChannel =
new OioAcceptedSocketChannel(
channel,
channel.getFactory(),
pipeline,
OioServerSocketPipelineSink.this,
acceptedSocket);
workerExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new OioWorker(acceptedChannel),
"OldIO", "ServerWorker",
String.valueOf(id),
String.valueOf(acceptedChannel.getId()),
acceptedChannel.toString())));
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);
try { try {
acceptedSocket.close(); ChannelPipeline pipeline =
} catch (IOException e2) { channel.getConfig().getPipelineFactory().getPipeline();
final OioAcceptedSocketChannel acceptedChannel =
new OioAcceptedSocketChannel(
channel,
channel.getFactory(),
pipeline,
OioServerSocketPipelineSink.this,
acceptedSocket);
workerExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new OioWorker(acceptedChannel),
"OldIO", "ServerWorker",
String.valueOf(id),
String.valueOf(acceptedChannel.getId()),
acceptedChannel.toString())));
} catch (Exception e) {
logger.warn( logger.warn(
"Failed to close a partially accepted socket.", "Failed to initialize an accepted socket.", e);
e2); try {
acceptedSocket.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
} catch (SocketTimeoutException e) {
// Thrown every second to stop when requested.
} catch (Throwable e) {
// Do not log the exception if the server socket was closed
// by a user.
if (!channel.socket.isBound() || channel.socket.isClosed()) {
break;
}
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
} }
} }
} catch (SocketTimeoutException e) {
// Thrown every second to stop when requested.
} catch (IOException e) {
// Do not log the exception if the server socket was closed
// by a user.
if (!channel.socket.isBound() || channel.socket.isClosed()) {
break;
}
logger.warn(
"Failed to accept a connection.", e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
} }
} finally {
channel.shutdownLock.unlock();
} }
channel.shutdownLock.unlock();
} }
} }
} }