* Fixed issue: NETTY-46 Excessive exception creation overhead when there are many pending write requests in a closed channel
* Added NioWorker.cleanUpWriteBuffer() * Fixed issue: NETTY-47 Channel is not closed immediately when write operation fails due to closure. * Made sure to call NioWorker.close() in NioWorker.write(Un)Fair() and to call NioWorker.cleanUpWriteBuffer() in NioWorker.close();
This commit is contained in:
parent
0caeb69ae6
commit
d52b238d62
@ -27,6 +27,7 @@ import static org.jboss.netty.channel.Channels.*;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.NotYetConnectedException;
|
||||
import java.nio.channels.ScatteringByteChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
@ -298,6 +299,11 @@ class NioWorker implements Runnable {
|
||||
}
|
||||
|
||||
static void write(NioSocketChannel channel, boolean mightNeedWakeup) {
|
||||
if (!channel.isConnected()) {
|
||||
cleanUpWriteBuffer(channel);
|
||||
return;
|
||||
}
|
||||
|
||||
final NioSocketChannelConfig cfg = channel.getConfig();
|
||||
final int writeSpinCount = cfg.getWriteSpinCount();
|
||||
final int maxWrittenBytes;
|
||||
@ -318,6 +324,7 @@ class NioWorker implements Runnable {
|
||||
private static void writeUnfair(NioSocketChannel channel,
|
||||
boolean mightNeedWakeup, final int writeSpinCount) {
|
||||
|
||||
boolean open = true;
|
||||
boolean addOpWrite = false;
|
||||
boolean removeOpWrite = false;
|
||||
|
||||
@ -370,14 +377,20 @@ class NioWorker implements Runnable {
|
||||
evt.getFuture().setFailure(t);
|
||||
evt = null;
|
||||
fireExceptionCaught(channel, t);
|
||||
if (t instanceof IOException) {
|
||||
open = false;
|
||||
close(channel, channel.getSucceededFuture());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (addOpWrite) {
|
||||
setOpWrite(channel, true, mightNeedWakeup);
|
||||
} else if (removeOpWrite) {
|
||||
setOpWrite(channel, false, mightNeedWakeup);
|
||||
if (open) {
|
||||
if (addOpWrite) {
|
||||
setOpWrite(channel, true, mightNeedWakeup);
|
||||
} else if (removeOpWrite) {
|
||||
setOpWrite(channel, false, mightNeedWakeup);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -385,6 +398,7 @@ class NioWorker implements Runnable {
|
||||
boolean mightNeedWakeup, final int writeSpinCount,
|
||||
final int maxWrittenBytes) {
|
||||
|
||||
boolean open = true;
|
||||
boolean addOpWrite = false;
|
||||
boolean removeOpWrite = false;
|
||||
|
||||
@ -441,14 +455,20 @@ class NioWorker implements Runnable {
|
||||
evt.getFuture().setFailure(t);
|
||||
evt = null;
|
||||
fireExceptionCaught(channel, t);
|
||||
if (t instanceof IOException) {
|
||||
open = false;
|
||||
close(channel, channel.getSucceededFuture());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (addOpWrite) {
|
||||
setOpWrite(channel, true, mightNeedWakeup);
|
||||
} else if (removeOpWrite) {
|
||||
setOpWrite(channel, false, mightNeedWakeup);
|
||||
if (open) {
|
||||
if (addOpWrite) {
|
||||
setOpWrite(channel, true, mightNeedWakeup);
|
||||
} else if (removeOpWrite) {
|
||||
setOpWrite(channel, false, mightNeedWakeup);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -605,6 +625,8 @@ class NioWorker implements Runnable {
|
||||
if (bound) {
|
||||
fireChannelUnbound(channel);
|
||||
}
|
||||
|
||||
cleanUpWriteBuffer(channel);
|
||||
fireChannelClosed(channel);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
@ -613,6 +635,38 @@ class NioWorker implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
private static void cleanUpWriteBuffer(NioSocketChannel channel) {
|
||||
// Create the exception only once to avoid the excessive overhead
|
||||
// caused by fillStackTrace.
|
||||
Exception cause;
|
||||
if (channel.isOpen()) {
|
||||
cause = new NotYetConnectedException();
|
||||
} else {
|
||||
cause = new ClosedChannelException();
|
||||
}
|
||||
|
||||
// Clean up the stale messages in the write buffer.
|
||||
synchronized (channel.writeBuffer) {
|
||||
MessageEvent evt = channel.currentWriteEvent;
|
||||
if (evt != null) {
|
||||
channel.currentWriteEvent = null;
|
||||
channel.currentWriteIndex = 0;
|
||||
evt.getFuture().setFailure(cause);
|
||||
fireExceptionCaught(channel, cause);
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
evt = channel.writeBuffer.poll();
|
||||
if (evt == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
evt.getFuture().setFailure(cause);
|
||||
fireExceptionCaught(channel, cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void setInterestOps(
|
||||
NioSocketChannel channel, ChannelFuture future, int interestOps) {
|
||||
NioWorker worker = channel.getWorker();
|
||||
|
Loading…
x
Reference in New Issue
Block a user