diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 74dd8b4af5..5c3b91d130 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -25,10 +25,7 @@ package org.jboss.netty.channel.socket.nio; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SocketChannel; -import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.jboss.netty.channel.AbstractChannel; @@ -54,10 +51,9 @@ abstract class NioSocketChannel extends AbstractChannel final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); final Runnable writeTask = new WriteTask(); - final BlockingQueue writeBuffer = - new LinkedBlockingQueue(); - final Queue internalWriteBuffer = - new LinkedList(); + final Object writeLock = new Object(); + final WriteMessageQueue writeBuffer = new WriteMessageQueue(); + private Queue internalWriteBuffer; MessageEvent currentWriteEvent; int currentWriteIndex; @@ -74,6 +70,17 @@ abstract class NioSocketChannel extends AbstractChannel abstract NioWorker getWorker(); abstract void setWorker(NioWorker worker); + Queue getInternalWriteBuffer() { + if (internalWriteBuffer == null) { + internalWriteBuffer = writeBuffer.drainAll(); + } + return internalWriteBuffer; + } + + void clearInternalWriteBuffer() { + internalWriteBuffer = null; + } + public NioSocketChannelConfig getConfig() { return config; } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 03455cacd1..024dd2cf5c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -361,21 +361,18 @@ class NioWorker implements Runnable { boolean addOpWrite = false; boolean removeOpWrite = false; - Queue internalWriteBuffer = channel.internalWriteBuffer; MessageEvent evt; ChannelBuffer buf; int bufIdx; - synchronized (internalWriteBuffer) { - if (internalWriteBuffer.isEmpty()) { - channel.writeBuffer.drainTo(internalWriteBuffer); - } - + synchronized (channel.writeLock) { + Queue internalWriteBuffer = channel.getInternalWriteBuffer(); evt = channel.currentWriteEvent; for (;;) { if (evt == null) { evt = internalWriteBuffer.poll(); if (evt == null) { + channel.clearInternalWriteBuffer(); channel.currentWriteEvent = null; removeOpWrite = true; break; @@ -440,22 +437,19 @@ class NioWorker implements Runnable { boolean addOpWrite = false; boolean removeOpWrite = false; - Queue internalWriteBuffer = channel.internalWriteBuffer; MessageEvent evt; ChannelBuffer buf; int bufIdx; int writtenBytes = 0; - synchronized (internalWriteBuffer) { - if (internalWriteBuffer.isEmpty()) { - channel.writeBuffer.drainTo(internalWriteBuffer); - } - + synchronized (channel.writeLock) { + Queue internalWriteBuffer = channel.getInternalWriteBuffer(); evt = channel.currentWriteEvent; for (;;) { if (evt == null) { evt = internalWriteBuffer.poll(); if (evt == null) { + channel.clearInternalWriteBuffer(); channel.currentWriteEvent = null; removeOpWrite = true; break; @@ -689,12 +683,7 @@ class NioWorker implements Runnable { } // Clean up the stale messages in the write buffer. - Queue internalWriteBuffer = channel.internalWriteBuffer; - synchronized (internalWriteBuffer) { - if (internalWriteBuffer.isEmpty()) { - channel.writeBuffer.drainTo(internalWriteBuffer); - } - + synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; if (evt != null) { channel.currentWriteEvent = null; @@ -703,12 +692,12 @@ class NioWorker implements Runnable { fireExceptionCaught(channel, cause); } + Queue internalWriteBuffer = channel.getInternalWriteBuffer(); for (;;) { evt = internalWriteBuffer.poll(); if (evt == null) { break; } - evt.getFuture().setFailure(cause); fireExceptionCaught(channel, cause); } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/WriteMessageQueue.java b/src/main/java/org/jboss/netty/channel/socket/nio/WriteMessageQueue.java new file mode 100644 index 0000000000..633d44cb76 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/socket/nio/WriteMessageQueue.java @@ -0,0 +1,140 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel.socket.nio; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Queue; + +import org.jboss.netty.channel.MessageEvent; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + * + */ +class WriteMessageQueue { + + private static final Queue EMPTY_QUEUE = new EmptyQueue(); + + private MessageEvent[] elements; + private int size; + + WriteMessageQueue() { + super(); + } + + public synchronized void offer(MessageEvent e) { + if (elements == null) { + elements = new MessageEvent[4]; + } else if (size == elements.length) { + MessageEvent[] newElements = new MessageEvent[size << 1]; + System.arraycopy(elements, 0, newElements, 0, size); + elements = newElements; + } + + elements[size ++] = e; + } + + public synchronized Queue drainAll() { + if (size == 0) { + return EMPTY_QUEUE; + } + + Queue drainedQueue = new DrainedQueue(elements, size); + elements = null; + size = 0; + return drainedQueue; + } + + private static class DrainedQueue extends AbstractQueue { + + private final MessageEvent[] elements; + private final int nElements; + private int index; + + DrainedQueue(MessageEvent[] elements, int nElements) { + this.elements = elements; + this.nElements = nElements; + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return nElements - index; + } + + public boolean offer(MessageEvent e) { + return false; + } + + public MessageEvent peek() { + if (index < nElements) { + return elements[index]; + } + return null; + } + + public MessageEvent poll() { + if (index < nElements) { + return elements[index ++]; + } + return null; + } + } + + private static class EmptyQueue extends AbstractQueue { + + EmptyQueue() { + super(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return 0; + } + + public boolean offer(MessageEvent e) { + return false; + } + + public MessageEvent peek() { + return null; + } + + public MessageEvent poll() { + return null; + } + } +}