Resurrected the good old direct buffer pool as a read buffer pool instead of using it both for reads and writes. (write buffer pool needs different treatment.)

This commit is contained in:
Trustin Lee 2010-02-18 15:51:06 +00:00
parent a9c90d0e4a
commit 545acfdf42
2 changed files with 144 additions and 10 deletions

View File

@ -79,6 +79,8 @@ class NioWorker implements Runnable {
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final ReadBufferPool readBufferPool = new ReadBufferPool();
NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId;
this.id = id;
@ -316,18 +318,40 @@ class NioWorker implements Runnable {
int ret = 0;
int readBytes = 0;
boolean failure = true;
try {
while ((ret = buffer.writeBytes(ch, buffer.writableBytes())) > 0) {
readBytes += ret;
if (!buffer.writable()) {
break;
if (buffer.isDirect()) {
try {
while ((ret = buffer.writeBytes(ch, buffer.writableBytes())) > 0) {
readBytes += ret;
if (!buffer.writable()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
} else {
ByteBuffer bb = readBufferPool.acquire(buffer.writableBytes());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
} finally {
bb.flip();
buffer.writeBytes(bb);
readBufferPool.release(bb);
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {

View File

@ -0,0 +1,110 @@
/*
* Copyright 2010 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class ReadBufferPool {
private static final int POOL_SIZE = 4;
@SuppressWarnings("unchecked")
private final SoftReference<ByteBuffer>[] pool = new SoftReference[POOL_SIZE];
ReadBufferPool() {
super();
}
final ByteBuffer acquire(ChannelBuffer src) {
ByteBuffer dst = acquire(src.readableBytes());
src.getBytes(src.readerIndex(), dst);
dst.rewind();
return dst;
}
final ByteBuffer acquire(int size) {
for (int i = 0; i < POOL_SIZE; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
if (ref == null) {
continue;
}
ByteBuffer buf = ref.get();
if (buf == null) {
pool[i] = null;
continue;
}
if (buf.capacity() < size) {
continue;
}
pool[i] = null;
buf.rewind();
buf.limit(size);
return buf;
}
ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size));
buf.limit(size);
return buf;
}
final void release(ByteBuffer buffer) {
for (int i = 0; i < POOL_SIZE; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
if (ref == null || ref.get() == null) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
}
}
// pool is full - replace one
final int capacity = buffer.capacity();
for (int i = 0; i< POOL_SIZE; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
ByteBuffer pooled = ref.get();
if (pooled == null) {
pool[i] = null;
continue;
}
if (pooled.capacity() < capacity) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
}
}
}
private static final int normalizeCapacity(int capacity) {
// Normalize to multiple of 1024
int q = capacity >>> 10;
int r = capacity & 1023;
if (r != 0) {
q ++;
}
return q << 10;
}
}