Reduced code duplication of AbstractXnioChannelHandler

This commit is contained in:
Trustin Lee 2009-03-16 04:48:36 +00:00
parent 880241366b
commit 4fa8e3adfc

View File

@ -109,38 +109,37 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
} }
public void handleWritable(java.nio.channels.Channel channel) { public void handleWritable(java.nio.channels.Channel channel) {
// TODO Code cleanup & optimization
BaseXnioChannel c = XnioChannelRegistry.getChannel(channel); BaseXnioChannel c = XnioChannelRegistry.getChannel(channel);
if (channel instanceof GatheringByteChannel) { int writtenBytes = 0;
boolean open = true; boolean open = true;
boolean addOpWrite = false; boolean addOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
MessageEvent evt; Queue<MessageEvent> writeBuffer = c.writeBuffer;
ChannelBuffer buf; synchronized (c.writeLock) {
int bufIdx; evt = c.currentWriteEvent;
int writtenBytes = 0; for (;;) {
if (evt == null) {
Queue<MessageEvent> writeBuffer = c.writeBuffer; evt = writeBuffer.poll();
synchronized (c.writeLock) {
evt = c.currentWriteEvent;
for (;;) {
if (evt == null) { if (evt == null) {
evt = writeBuffer.poll(); c.currentWriteEvent = null;
if (evt == null) { break;
c.currentWriteEvent = null;
break;
}
buf = (ChannelBuffer) evt.getMessage();
bufIdx = buf.readerIndex();
} else {
buf = (ChannelBuffer) evt.getMessage();
bufIdx = c.currentWriteIndex;
} }
try { buf = (ChannelBuffer) evt.getMessage();
final int writeSpinCount = c.getConfig().getWriteSpinCount(); bufIdx = buf.readerIndex();
for (int i = writeSpinCount; i > 0; i --) { } else {
buf = (ChannelBuffer) evt.getMessage();
bufIdx = c.currentWriteIndex;
}
try {
final int writeSpinCount = c.getConfig().getWriteSpinCount();
boolean sent = false;
for (int i = writeSpinCount; i > 0; i --) {
if (channel instanceof GatheringByteChannel) {
int localWrittenBytes = buf.getBytes( int localWrittenBytes = buf.getBytes(
bufIdx, bufIdx,
(GatheringByteChannel) channel, (GatheringByteChannel) channel,
@ -151,148 +150,20 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
writtenBytes += localWrittenBytes; writtenBytes += localWrittenBytes;
break; break;
} }
} } else if (channel instanceof MultipointWritableMessageChannel) {
if (bufIdx == buf.writerIndex()) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
// Not written fully - perhaps the kernel buffer is full.
c.currentWriteEvent = evt;
c.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
evt.getFuture().setFailure(t);
evt = null;
fireExceptionCaught(c, t);
if (t instanceof IOException) {
open = false;
c.closeNow(succeededFuture(c));
}
}
}
}
fireWriteComplete(c, writtenBytes);
if (open) {
if (addOpWrite && channel instanceof SuspendableWriteChannel) {
((SuspendableWriteChannel) channel).resumeWrites();
}
}
} else if (channel instanceof MultipointWritableMessageChannel) {
boolean open = true;
boolean addOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
int writtenBytes = 0;
SocketAddress remoteAddress;
Queue<MessageEvent> writeBuffer = c.writeBuffer;
synchronized (c.writeLock) {
evt = c.currentWriteEvent;
for (;;) {
if (evt == null) {
evt = writeBuffer.poll();
if (evt == null) {
c.currentWriteEvent = null;
break;
}
buf = (ChannelBuffer) evt.getMessage();
bufIdx = buf.readerIndex();
} else {
buf = (ChannelBuffer) evt.getMessage();
bufIdx = c.currentWriteIndex;
}
remoteAddress = evt.getRemoteAddress();
if (remoteAddress == null) {
remoteAddress = c.getRemoteAddress();
}
try {
final int writeSpinCount = c.getConfig().getWriteSpinCount();
boolean sent = false;
for (int i = writeSpinCount; i > 0; i --) {
ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx); ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx);
int nioBufSize = nioBuf.remaining(); int nioBufSize = nioBuf.remaining();
SocketAddress remoteAddress = evt.getRemoteAddress();
if (remoteAddress == null) {
remoteAddress = c.getRemoteAddress();
}
sent = ((MultipointWritableMessageChannel) channel).send(remoteAddress, nioBuf); sent = ((MultipointWritableMessageChannel) channel).send(remoteAddress, nioBuf);
if (sent) { if (sent) {
bufIdx += nioBufSize; bufIdx += nioBufSize;
writtenBytes += nioBufSize; writtenBytes += nioBufSize;
break; break;
} }
} } else if (channel instanceof WritableMessageChannel) {
if (sent) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else {
// Not written fully - perhaps the kernel buffer is full.
c.currentWriteEvent = evt;
c.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
evt.getFuture().setFailure(t);
evt = null;
fireExceptionCaught(c, t);
if (t instanceof IOException) {
open = false;
c.closeNow(succeededFuture(c));
}
}
}
}
fireWriteComplete(c, writtenBytes);
if (open && addOpWrite) {
((SuspendableWriteChannel) channel).resumeWrites();
}
} else if (channel instanceof WritableMessageChannel) {
boolean open = true;
boolean addOpWrite = false;
MessageEvent evt;
ChannelBuffer buf;
int bufIdx;
int writtenBytes = 0;
Queue<MessageEvent> writeBuffer = c.writeBuffer;
synchronized (c.writeLock) {
evt = c.currentWriteEvent;
for (;;) {
if (evt == null) {
evt = writeBuffer.poll();
if (evt == null) {
c.currentWriteEvent = null;
break;
}
buf = (ChannelBuffer) evt.getMessage();
bufIdx = buf.readerIndex();
} else {
buf = (ChannelBuffer) evt.getMessage();
bufIdx = c.currentWriteIndex;
}
try {
final int writeSpinCount = c.getConfig().getWriteSpinCount();
boolean sent = false;
for (int i = writeSpinCount; i > 0; i --) {
ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx); ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx);
int nioBufSize = nioBuf.remaining(); int nioBufSize = nioBuf.remaining();
sent = ((WritableMessageChannel) channel).send(nioBuf); sent = ((WritableMessageChannel) channel).send(nioBuf);
@ -301,36 +172,42 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
writtenBytes += nioBufSize; writtenBytes += nioBufSize;
break; break;
} }
}
if (sent) {
// Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null;
} else { } else {
// Not written fully - perhaps the kernel buffer is full. throw new IllegalArgumentException("Unsupported channel type: " + channel.getClass().getName());
c.currentWriteEvent = evt;
c.currentWriteIndex = bufIdx;
addOpWrite = true;
break;
} }
} catch (AsynchronousCloseException e) { }
// Doesn't need a user attention - ignore.
} catch (Throwable t) { if (bufIdx == buf.writerIndex() || sent) {
evt.getFuture().setFailure(t); // Successful write - proceed to the next message.
evt.getFuture().setSuccess();
evt = null; evt = null;
fireExceptionCaught(c, t); } else {
if (t instanceof IOException) { // Not written fully - perhaps the kernel buffer is full.
open = false; c.currentWriteEvent = evt;
c.closeNow(succeededFuture(c)); c.currentWriteIndex = bufIdx;
} addOpWrite = true;
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
evt.getFuture().setFailure(t);
evt = null;
fireExceptionCaught(c, t);
if (t instanceof IOException) {
open = false;
c.closeNow(succeededFuture(c));
} }
} }
} }
}
if (writtenBytes > 0) {
fireWriteComplete(c, writtenBytes); fireWriteComplete(c, writtenBytes);
}
if (open && addOpWrite) { if (open) {
if (addOpWrite && channel instanceof SuspendableWriteChannel) {
((SuspendableWriteChannel) channel).resumeWrites(); ((SuspendableWriteChannel) channel).resumeWrites();
} }
} }