Implemented AbstractXnioChannels.handleWritable() properly (needs cleanup and optimization though)
This commit is contained in:
parent
49c0f33461
commit
63cb4a023f
@ -109,6 +109,7 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
|
||||
}
|
||||
|
||||
public void handleWritable(java.nio.channels.Channel channel) {
|
||||
// TODO Code cleanup & optimization
|
||||
BaseXnioChannel c = XnioChannelRegistry.getChannel(channel);
|
||||
if (channel instanceof GatheringByteChannel) {
|
||||
boolean open = true;
|
||||
@ -185,9 +186,157 @@ public abstract class AbstractXnioChannelHandler implements IoHandler<java.nio.c
|
||||
}
|
||||
}
|
||||
} else if (channel instanceof MultipointWritableMessageChannel) {
|
||||
// TODO implement me
|
||||
boolean open = true;
|
||||
boolean addOpWrite = false;
|
||||
|
||||
MessageEvent evt;
|
||||
ChannelBuffer buf;
|
||||
int bufIdx;
|
||||
int writtenBytes = 0;
|
||||
SocketAddress remoteAddress;
|
||||
|
||||
Queue<MessageEvent> writeBuffer = c.writeBuffer;
|
||||
synchronized (c.writeLock) {
|
||||
evt = c.currentWriteEvent;
|
||||
for (;;) {
|
||||
if (evt == null) {
|
||||
evt = writeBuffer.poll();
|
||||
if (evt == null) {
|
||||
c.currentWriteEvent = null;
|
||||
break;
|
||||
}
|
||||
|
||||
buf = (ChannelBuffer) evt.getMessage();
|
||||
bufIdx = buf.readerIndex();
|
||||
} else {
|
||||
buf = (ChannelBuffer) evt.getMessage();
|
||||
bufIdx = c.currentWriteIndex;
|
||||
}
|
||||
|
||||
remoteAddress = evt.getRemoteAddress();
|
||||
if (remoteAddress == null) {
|
||||
remoteAddress = c.getRemoteAddress();
|
||||
}
|
||||
|
||||
try {
|
||||
final int writeSpinCount = c.getConfig().getWriteSpinCount();
|
||||
boolean sent = false;
|
||||
for (int i = writeSpinCount; i > 0; i --) {
|
||||
ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx);
|
||||
int nioBufSize = nioBuf.remaining();
|
||||
sent = ((MultipointWritableMessageChannel) channel).send(remoteAddress, nioBuf);
|
||||
if (sent) {
|
||||
bufIdx += nioBufSize;
|
||||
writtenBytes += nioBufSize;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (sent) {
|
||||
// Successful write - proceed to the next message.
|
||||
evt.getFuture().setSuccess();
|
||||
evt = null;
|
||||
} else {
|
||||
// Not written fully - perhaps the kernel buffer is full.
|
||||
c.currentWriteEvent = evt;
|
||||
c.currentWriteIndex = bufIdx;
|
||||
addOpWrite = true;
|
||||
break;
|
||||
}
|
||||
} catch (AsynchronousCloseException e) {
|
||||
// Doesn't need a user attention - ignore.
|
||||
} catch (Throwable t) {
|
||||
evt.getFuture().setFailure(t);
|
||||
evt = null;
|
||||
fireExceptionCaught(c, t);
|
||||
if (t instanceof IOException) {
|
||||
open = false;
|
||||
c.closeNow(succeededFuture(c));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fireWriteComplete(c, writtenBytes);
|
||||
|
||||
if (open) {
|
||||
if (addOpWrite && channel instanceof SuspendableWriteChannel) {
|
||||
((SuspendableWriteChannel) channel).resumeWrites();
|
||||
}
|
||||
}
|
||||
} else if (channel instanceof WritableMessageChannel) {
|
||||
// TODO implement me
|
||||
boolean open = true;
|
||||
boolean addOpWrite = false;
|
||||
|
||||
MessageEvent evt;
|
||||
ChannelBuffer buf;
|
||||
int bufIdx;
|
||||
int writtenBytes = 0;
|
||||
|
||||
Queue<MessageEvent> writeBuffer = c.writeBuffer;
|
||||
synchronized (c.writeLock) {
|
||||
evt = c.currentWriteEvent;
|
||||
for (;;) {
|
||||
if (evt == null) {
|
||||
evt = writeBuffer.poll();
|
||||
if (evt == null) {
|
||||
c.currentWriteEvent = null;
|
||||
break;
|
||||
}
|
||||
|
||||
buf = (ChannelBuffer) evt.getMessage();
|
||||
bufIdx = buf.readerIndex();
|
||||
} else {
|
||||
buf = (ChannelBuffer) evt.getMessage();
|
||||
bufIdx = c.currentWriteIndex;
|
||||
}
|
||||
|
||||
try {
|
||||
final int writeSpinCount = c.getConfig().getWriteSpinCount();
|
||||
boolean sent = false;
|
||||
for (int i = writeSpinCount; i > 0; i --) {
|
||||
ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx);
|
||||
int nioBufSize = nioBuf.remaining();
|
||||
sent = ((WritableMessageChannel) channel).send(nioBuf);
|
||||
if (sent) {
|
||||
bufIdx += nioBufSize;
|
||||
writtenBytes += nioBufSize;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (sent) {
|
||||
// Successful write - proceed to the next message.
|
||||
evt.getFuture().setSuccess();
|
||||
evt = null;
|
||||
} else {
|
||||
// Not written fully - perhaps the kernel buffer is full.
|
||||
c.currentWriteEvent = evt;
|
||||
c.currentWriteIndex = bufIdx;
|
||||
addOpWrite = true;
|
||||
break;
|
||||
}
|
||||
} catch (AsynchronousCloseException e) {
|
||||
// Doesn't need a user attention - ignore.
|
||||
} catch (Throwable t) {
|
||||
evt.getFuture().setFailure(t);
|
||||
evt = null;
|
||||
fireExceptionCaught(c, t);
|
||||
if (t instanceof IOException) {
|
||||
open = false;
|
||||
c.closeNow(succeededFuture(c));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fireWriteComplete(c, writtenBytes);
|
||||
|
||||
if (open) {
|
||||
if (addOpWrite && channel instanceof SuspendableWriteChannel) {
|
||||
((SuspendableWriteChannel) channel).resumeWrites();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user