From 63cb4a023f0613d30425af7b428eb3a07facdc59 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 26 Feb 2009 01:38:29 +0000 Subject: [PATCH] Implemented AbstractXnioChannels.handleWritable() properly (needs cleanup and optimization though) --- .../xnio/AbstractXnioChannelHandler.java | 153 +++++++++++++++++- 1 file changed, 151 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java b/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java index 325f93ac33..6f69767791 100644 --- a/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java +++ b/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java @@ -109,6 +109,7 @@ public abstract class AbstractXnioChannelHandler implements IoHandler writeBuffer = c.writeBuffer; + synchronized (c.writeLock) { + evt = c.currentWriteEvent; + for (;;) { + if (evt == null) { + evt = writeBuffer.poll(); + if (evt == null) { + c.currentWriteEvent = null; + break; + } + + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = buf.readerIndex(); + } else { + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = c.currentWriteIndex; + } + + remoteAddress = evt.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = c.getRemoteAddress(); + } + + try { + final int writeSpinCount = c.getConfig().getWriteSpinCount(); + boolean sent = false; + for (int i = writeSpinCount; i > 0; i --) { + ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx); + int nioBufSize = nioBuf.remaining(); + sent = ((MultipointWritableMessageChannel) channel).send(remoteAddress, nioBuf); + if (sent) { + bufIdx += nioBufSize; + writtenBytes += nioBufSize; + break; + } + } + + if (sent) { + // Successful write - proceed to the next message. + evt.getFuture().setSuccess(); + evt = null; + } else { + // Not written fully - perhaps the kernel buffer is full. + c.currentWriteEvent = evt; + c.currentWriteIndex = bufIdx; + addOpWrite = true; + break; + } + } catch (AsynchronousCloseException e) { + // Doesn't need a user attention - ignore. + } catch (Throwable t) { + evt.getFuture().setFailure(t); + evt = null; + fireExceptionCaught(c, t); + if (t instanceof IOException) { + open = false; + c.closeNow(succeededFuture(c)); + } + } + } + } + + fireWriteComplete(c, writtenBytes); + + if (open) { + if (addOpWrite && channel instanceof SuspendableWriteChannel) { + ((SuspendableWriteChannel) channel).resumeWrites(); + } + } } else if (channel instanceof WritableMessageChannel) { - // TODO implement me + boolean open = true; + boolean addOpWrite = false; + + MessageEvent evt; + ChannelBuffer buf; + int bufIdx; + int writtenBytes = 0; + + Queue writeBuffer = c.writeBuffer; + synchronized (c.writeLock) { + evt = c.currentWriteEvent; + for (;;) { + if (evt == null) { + evt = writeBuffer.poll(); + if (evt == null) { + c.currentWriteEvent = null; + break; + } + + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = buf.readerIndex(); + } else { + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = c.currentWriteIndex; + } + + try { + final int writeSpinCount = c.getConfig().getWriteSpinCount(); + boolean sent = false; + for (int i = writeSpinCount; i > 0; i --) { + ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx); + int nioBufSize = nioBuf.remaining(); + sent = ((WritableMessageChannel) channel).send(nioBuf); + if (sent) { + bufIdx += nioBufSize; + writtenBytes += nioBufSize; + break; + } + } + + if (sent) { + // Successful write - proceed to the next message. + evt.getFuture().setSuccess(); + evt = null; + } else { + // Not written fully - perhaps the kernel buffer is full. + c.currentWriteEvent = evt; + c.currentWriteIndex = bufIdx; + addOpWrite = true; + break; + } + } catch (AsynchronousCloseException e) { + // Doesn't need a user attention - ignore. + } catch (Throwable t) { + evt.getFuture().setFailure(t); + evt = null; + fireExceptionCaught(c, t); + if (t instanceof IOException) { + open = false; + c.closeNow(succeededFuture(c)); + } + } + } + } + + fireWriteComplete(c, writtenBytes); + + if (open) { + if (addOpWrite && channel instanceof SuspendableWriteChannel) { + ((SuspendableWriteChannel) channel).resumeWrites(); + } + } } }