From 4fa8e3adfc1e2f7a8260be0d7966de44c5234a5e Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 16 Mar 2009 04:48:36 +0000 Subject: [PATCH] Reduced code duplication of AbstractXnioChannelHandler --- .../xnio/AbstractXnioChannelHandler.java | 241 +++++------------- 1 file changed, 59 insertions(+), 182 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java b/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java index 20d1d0ff1c..3944db8ecd 100644 --- a/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java +++ b/src/main/java/org/jboss/netty/channel/xnio/AbstractXnioChannelHandler.java @@ -109,38 +109,37 @@ public abstract class AbstractXnioChannelHandler implements IoHandler writeBuffer = c.writeBuffer; - synchronized (c.writeLock) { - evt = c.currentWriteEvent; - for (;;) { + Queue writeBuffer = c.writeBuffer; + synchronized (c.writeLock) { + evt = c.currentWriteEvent; + for (;;) { + if (evt == null) { + evt = writeBuffer.poll(); if (evt == null) { - evt = writeBuffer.poll(); - if (evt == null) { - c.currentWriteEvent = null; - break; - } - - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = buf.readerIndex(); - } else { - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = c.currentWriteIndex; + c.currentWriteEvent = null; + break; } - try { - final int writeSpinCount = c.getConfig().getWriteSpinCount(); - for (int i = writeSpinCount; i > 0; i --) { + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = buf.readerIndex(); + } else { + buf = (ChannelBuffer) evt.getMessage(); + bufIdx = c.currentWriteIndex; + } + + try { + final int writeSpinCount = c.getConfig().getWriteSpinCount(); + boolean sent = false; + for (int i = writeSpinCount; i > 0; i --) { + if (channel instanceof GatheringByteChannel) { int localWrittenBytes = buf.getBytes( bufIdx, (GatheringByteChannel) channel, @@ -151,148 +150,20 @@ public abstract class AbstractXnioChannelHandler implements IoHandler writeBuffer = c.writeBuffer; - synchronized (c.writeLock) { - evt = c.currentWriteEvent; - for (;;) { - if (evt == null) { - evt = writeBuffer.poll(); - if (evt == null) { - c.currentWriteEvent = null; - break; - } - - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = buf.readerIndex(); - } else { - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = c.currentWriteIndex; - } - - remoteAddress = evt.getRemoteAddress(); - if (remoteAddress == null) { - remoteAddress = c.getRemoteAddress(); - } - - try { - final int writeSpinCount = c.getConfig().getWriteSpinCount(); - boolean sent = false; - for (int i = writeSpinCount; i > 0; i --) { + } else if (channel instanceof MultipointWritableMessageChannel) { ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx); int nioBufSize = nioBuf.remaining(); + SocketAddress remoteAddress = evt.getRemoteAddress(); + if (remoteAddress == null) { + remoteAddress = c.getRemoteAddress(); + } sent = ((MultipointWritableMessageChannel) channel).send(remoteAddress, nioBuf); if (sent) { bufIdx += nioBufSize; writtenBytes += nioBufSize; break; } - } - - if (sent) { - // Successful write - proceed to the next message. - evt.getFuture().setSuccess(); - evt = null; - } else { - // Not written fully - perhaps the kernel buffer is full. - c.currentWriteEvent = evt; - c.currentWriteIndex = bufIdx; - addOpWrite = true; - break; - } - } catch (AsynchronousCloseException e) { - // Doesn't need a user attention - ignore. - } catch (Throwable t) { - evt.getFuture().setFailure(t); - evt = null; - fireExceptionCaught(c, t); - if (t instanceof IOException) { - open = false; - c.closeNow(succeededFuture(c)); - } - } - } - } - - fireWriteComplete(c, writtenBytes); - - if (open && addOpWrite) { - ((SuspendableWriteChannel) channel).resumeWrites(); - } - } else if (channel instanceof WritableMessageChannel) { - boolean open = true; - boolean addOpWrite = false; - - MessageEvent evt; - ChannelBuffer buf; - int bufIdx; - int writtenBytes = 0; - - Queue writeBuffer = c.writeBuffer; - synchronized (c.writeLock) { - evt = c.currentWriteEvent; - for (;;) { - if (evt == null) { - evt = writeBuffer.poll(); - if (evt == null) { - c.currentWriteEvent = null; - break; - } - - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = buf.readerIndex(); - } else { - buf = (ChannelBuffer) evt.getMessage(); - bufIdx = c.currentWriteIndex; - } - - try { - final int writeSpinCount = c.getConfig().getWriteSpinCount(); - boolean sent = false; - for (int i = writeSpinCount; i > 0; i --) { + } else if (channel instanceof WritableMessageChannel) { ByteBuffer nioBuf = buf.toByteBuffer(bufIdx, buf.writerIndex() - bufIdx); int nioBufSize = nioBuf.remaining(); sent = ((WritableMessageChannel) channel).send(nioBuf); @@ -301,36 +172,42 @@ public abstract class AbstractXnioChannelHandler implements IoHandler 0) { fireWriteComplete(c, writtenBytes); + } - if (open && addOpWrite) { + if (open) { + if (addOpWrite && channel instanceof SuspendableWriteChannel) { ((SuspendableWriteChannel) channel).resumeWrites(); } }