System call needs to be made at least once so that it can fail when a user attempts to write on a closed channel

This commit is contained in:
Trustin Lee 2010-06-16 03:33:29 +00:00
parent 752e60a723
commit de90cd6a3c
4 changed files with 32 additions and 27 deletions

View File

@ -530,20 +530,26 @@ class NioDatagramWorker implements Runnable {
long localWrittenBytes = 0; long localWrittenBytes = 0;
SocketAddress raddr = evt.getRemoteAddress(); SocketAddress raddr = evt.getRemoteAddress();
if (raddr == null) { if (raddr == null) {
for (int i = writeSpinCount; i > 0 && !buf.finished(); i --) { for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch); localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
} }
if (buf.finished()) {
break;
}
} }
} else { } else {
for (int i = writeSpinCount; i > 0 && !buf.finished(); i --) { for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch, raddr); localWrittenBytes = buf.transferTo(ch, raddr);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
} }
if (buf.finished()) {
break;
}
} }
} }

View File

@ -465,12 +465,15 @@ class NioWorker implements Runnable {
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
try { try {
long localWrittenBytes = 0; long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0 && !buf.finished(); i --) { for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch); localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) { if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
} }
if (buf.finished()) {
break;
}
} }
if (buf.finished()) { if (buf.finished()) {

View File

@ -101,26 +101,24 @@ class OioDatagramWorker implements Runnable {
try { try {
ChannelBuffer buf = (ChannelBuffer) message; ChannelBuffer buf = (ChannelBuffer) message;
int length = buf.readableBytes(); int length = buf.readableBytes();
if (length > 0) { ByteBuffer nioBuf = buf.toByteBuffer();
ByteBuffer nioBuf = buf.toByteBuffer(); DatagramPacket packet;
DatagramPacket packet; if (nioBuf.hasArray()) {
if (nioBuf.hasArray()) { // Avoid copy if the buffer is backed by an array.
// Avoid copy if the buffer is backed by an array. packet = new DatagramPacket(
packet = new DatagramPacket( nioBuf.array(), nioBuf.arrayOffset(), length);
nioBuf.array(), nioBuf.arrayOffset(), length); } else {
} else { // Otherwise it will be expensive.
// Otherwise it will be expensive. byte[] arrayBuf = new byte[length];
byte[] arrayBuf = new byte[length]; buf.getBytes(0, arrayBuf);
buf.getBytes(0, arrayBuf); packet = new DatagramPacket(arrayBuf, length);
packet = new DatagramPacket(arrayBuf, length);
}
if (remoteAddress != null) {
packet.setSocketAddress(remoteAddress);
}
channel.socket.send(packet);
fireWriteComplete(channel, length);
} }
if (remoteAddress != null) {
packet.setSocketAddress(remoteAddress);
}
channel.socket.send(packet);
fireWriteComplete(channel, length);
future.setSuccess(); future.setSuccess();
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);

View File

@ -115,12 +115,10 @@ class OioWorker implements Runnable {
try { try {
ChannelBuffer a = (ChannelBuffer) message; ChannelBuffer a = (ChannelBuffer) message;
int length = a.readableBytes(); int length = a.readableBytes();
if (length > 0) { synchronized (out) {
synchronized (out) { a.getBytes(a.readerIndex(), out, length);
a.getBytes(a.readerIndex(), out, length);
}
fireWriteComplete(channel, length);
} }
fireWriteComplete(channel, length);
future.setSuccess(); future.setSuccess();
} catch (Throwable t) { } catch (Throwable t) {
// Convert 'SocketException: Socket closed' to // Convert 'SocketException: Socket closed' to