Add PooledByteBufAllocator + microbenchmark module

This pull request introduces the new default ByteBufAllocator implementation based on jemalloc, with a some differences:

* Minimum possible buffer capacity is 16 (jemalloc: 2)
* Uses binary heap with random branching (jemalloc: red-black tree)
* No thread-local cache yet (jemalloc has thread-local cache)
* Default page size is 8 KiB (jemalloc: 4 KiB)
* Default chunk size is 16 MiB (jemalloc: 2 MiB)
* Cannot allocate a buffer bigger than the chunk size (jemalloc: possible) because we don't have control over memory layout in Java. A user can work around this issue by creating a composite buffer, but it's not always a feasible option. Although 16 MiB is a pretty big default, a user's handler might need to deal with the bounded buffers when the user wants to deal with a large message.

Also, to ensure the new allocator performs good enough, I wrote a microbenchmark for it and made it a dedicated Maven module. It uses Google's Caliper framework to run and publish the test result (example)

Miscellaneous changes:

* Made some ByteBuf implementations public so that those who implements a new allocator can make use of them.
* Added ByteBufAllocator.compositeBuffer() and its variants.
* ByteBufAllocator.ioBuffer() creates a buffer with 0 capacity.
This commit is contained in:
Trustin Lee 2012-12-06 06:11:48 +09:00
parent 9d42acbc2a
commit b47fc77522
29 changed files with 2382 additions and 85 deletions

View File

@ -18,18 +18,28 @@ package io.netty.buffer;
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
private final int bufferMaxCapacity;
private final boolean directByDefault;
private final ByteBuf emptyBuf;
protected AbstractByteBufAllocator() {
this(false);
protected AbstractByteBufAllocator(int bufferMaxCapacity) {
this(bufferMaxCapacity, false);
}
protected AbstractByteBufAllocator(boolean directByDefault) {
protected AbstractByteBufAllocator(int bufferMaxCapacity, boolean directByDefault) {
if (bufferMaxCapacity <= 0) {
throw new IllegalArgumentException("bufferMaxCapacity: " + bufferMaxCapacity + " (expected: 1+)");
}
this.directByDefault = directByDefault;
this.bufferMaxCapacity = bufferMaxCapacity;
emptyBuf = new UnpooledHeapByteBuf(this, 0, 0);
}
@Override
public int bufferMaxCapacity() {
return bufferMaxCapacity;
}
@Override
public ByteBuf buffer() {
if (directByDefault) {
@ -56,12 +66,12 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
@Override
public ByteBuf heapBuffer() {
return heapBuffer(256, Integer.MAX_VALUE);
return heapBuffer(256, bufferMaxCapacity());
}
@Override
public ByteBuf heapBuffer(int initialCapacity) {
return buffer(initialCapacity, Integer.MAX_VALUE);
return heapBuffer(initialCapacity, bufferMaxCapacity());
}
@Override
@ -69,17 +79,18 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf directBuffer() {
return directBuffer(256, Integer.MAX_VALUE);
return directBuffer(256, bufferMaxCapacity());
}
@Override
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, Integer.MAX_VALUE);
return directBuffer(initialCapacity, bufferMaxCapacity());
}
@Override
@ -87,9 +98,58 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
@Override
public CompositeByteBuf compositeBuffer() {
if (directByDefault) {
return compositeDirectBuffer();
}
return compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
if (directByDefault) {
return compositeDirectBuffer(maxNumComponents);
}
return compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
return compositeHeapBuffer(16);
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
return new DefaultCompositeByteBuf(this, false, maxNumComponents);
}
@Override
public CompositeByteBuf compositeDirectBuffer() {
return compositeDirectBuffer(16);
}
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
return new DefaultCompositeByteBuf(this, true, maxNumComponents);
}
private void validate(int initialCapacity, int maxCapacity) {
if (maxCapacity > bufferMaxCapacity()) {
throw new IllegalArgumentException(
"maxCapacity: " + maxCapacity + " (expected: not greater than " + bufferMaxCapacity());
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity: %d (expected: not greater than maxCapacity(%d)",
initialCapacity, maxCapacity));
}
}
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
}

View File

@ -1745,7 +1745,7 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
ByteBuffer[] nioBuffers();
/**
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length
* Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified index and length
* The returned buffer shares the content with this buffer, while changing the position and limit
* of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does
* not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the
@ -1756,7 +1756,7 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
* @throws UnsupportedOperationException
* if this buffer cannot create a {@link ByteBuffer} that shares the content with itself
*/
ByteBuffer[] nioBuffers(int offset, int length);
ByteBuffer[] nioBuffers(int index, int length);
/**
* Returns {@code true} if and only if this buffer has a backing byte array.

View File

@ -15,8 +15,6 @@
*/
package io.netty.buffer;
import java.util.concurrent.TimeUnit;
public interface ByteBufAllocator {
ByteBuf buffer();
@ -30,8 +28,12 @@ public interface ByteBufAllocator {
ByteBuf directBuffer(int initialCapacity, int maxCapacity);
ByteBuf ioBuffer();
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
CompositeByteBuf compositeBuffer();
CompositeByteBuf compositeBuffer(int maxNumComponents);
CompositeByteBuf compositeHeapBuffer();
CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
CompositeByteBuf compositeDirectBuffer();
CompositeByteBuf compositeDirectBuffer(int maxNumComponents);
int bufferMaxCapacity();
}

View File

@ -40,11 +40,12 @@ import java.util.Queue;
* is recommended to use {@link Unpooled#wrappedBuffer(ByteBuf...)}
* instead of calling the constructor explicitly.
*/
final class DefaultCompositeByteBuf extends AbstractByteBuf implements CompositeByteBuf, Unsafe {
public class DefaultCompositeByteBuf extends AbstractByteBuf implements CompositeByteBuf, Unsafe {
private static final ByteBuffer[] EMPTY_NIOBUFFERS = new ByteBuffer[0];
private final ByteBufAllocator alloc;
private final boolean direct;
private final List<Component> components = new ArrayList<Component>();
private final int maxNumComponents;
@ -53,16 +54,17 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
private boolean freed;
private Queue<ByteBuf> suspendedDeallocations;
public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents) {
public DefaultCompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
super(Integer.MAX_VALUE);
if (alloc == null) {
throw new NullPointerException("alloc");
}
this.alloc = alloc;
this.direct = direct;
this.maxNumComponents = maxNumComponents;
}
public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents, ByteBuf... buffers) {
public DefaultCompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf... buffers) {
super(Integer.MAX_VALUE);
if (alloc == null) {
throw new NullPointerException("alloc");
@ -73,6 +75,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
}
this.alloc = alloc;
this.direct = direct;
this.maxNumComponents = maxNumComponents;
addComponents0(0, buffers);
@ -80,7 +83,8 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
setIndex(0, capacity());
}
public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents, Iterable<ByteBuf> buffers) {
public DefaultCompositeByteBuf(
ByteBufAllocator alloc, boolean direct, int maxNumComponents, Iterable<ByteBuf> buffers) {
super(Integer.MAX_VALUE);
if (alloc == null) {
throw new NullPointerException("alloc");
@ -91,6 +95,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
}
this.alloc = alloc;
this.direct = direct;
this.maxNumComponents = maxNumComponents;
addComponents0(0, buffers);
consolidateIfNeeded();
@ -260,7 +265,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
if (numComponents > maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset;
ByteBuf consolidated = alloc().buffer(capacity);
ByteBuf consolidated = allocBuffer(capacity);
// We're not using foreach to avoid creating an iterator.
// noinspection ForLoopReplaceableByForEach
@ -440,13 +445,16 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
if (newCapacity > oldCapacity) {
final int paddingLength = newCapacity - oldCapacity;
ByteBuf padding;
if (components.isEmpty()) {
padding = alloc().buffer(paddingLength, paddingLength);
int nComponents = components.size();
if (nComponents < maxNumComponents) {
padding = allocBuffer(paddingLength);
padding.setIndex(0, paddingLength);
addComponent0(0, padding, true);
} else {
padding = alloc().buffer(paddingLength);
padding = allocBuffer(paddingLength);
padding.setIndex(0, paddingLength);
// FIXME: No need to create a padding buffer and consolidate.
// Just create a big single buffer and put the current content there.
addComponent0(components.size(), padding, true);
consolidateIfNeeded();
}
@ -1133,7 +1141,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
final Component last = components.get(numComponents - 1);
final int capacity = last.endOffset;
final ByteBuf consolidated = alloc().buffer(capacity);
final ByteBuf consolidated = allocBuffer(capacity);
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
@ -1158,7 +1166,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
final int endCIndex = cIndex + numComponents;
final Component last = components.get(endCIndex - 1);
final int capacity = last.endOffset - components.get(cIndex).offset;
final ByteBuf consolidated = alloc().buffer(capacity);
final ByteBuf consolidated = allocBuffer(capacity);
for (int i = cIndex; i < endCIndex; i ++) {
Component c = components.get(i);
@ -1253,6 +1261,13 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite
return this;
}
private ByteBuf allocBuffer(int capacity) {
if (direct) {
return alloc().directBuffer(capacity);
}
return alloc().heapBuffer(capacity);
}
@Override
public String toString() {
String result = super.toString();

View File

@ -31,7 +31,7 @@ import java.nio.channels.ScatteringByteChannel;
* parent. It is recommended to use {@link ByteBuf#duplicate()} instead
* of calling the constructor explicitly.
*/
final class DuplicatedByteBuf extends AbstractByteBuf implements Unsafe {
public class DuplicatedByteBuf extends AbstractByteBuf implements Unsafe {
private final ByteBuf buffer;
@ -235,8 +235,8 @@ final class DuplicatedByteBuf extends AbstractByteBuf implements Unsafe {
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
return buffer.nioBuffers(offset, length);
public ByteBuffer[] nioBuffers(int index, int length) {
return buffer.nioBuffers(index, length);
}
@Override

View File

@ -0,0 +1,370 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
abstract class PoolArena<T> {
final PooledByteBufAllocator parent;
private final int pageSize;
private final int maxOrder;
private final int pageShifts;
private final int chunkSize;
private final int subpageOverflowMask;
private final Deque<PoolSubpage<T>>[] tinySubpagePools;
private final Deque<PoolSubpage<T>>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(512 >>> 4);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = new ArrayDeque<PoolSubpage<T>>();
}
smallSubpagePools = newSubpagePoolArray(pageShifts - 9);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = new ArrayDeque<PoolSubpage<T>>();
}
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE);
q075 = new PoolChunkList<T>(this, q100, 75, 100);
q050 = new PoolChunkList<T>(this, q075, 50, 100);
q025 = new PoolChunkList<T>(this, q050, 25, 75);
q000 = new PoolChunkList<T>(this, q025, 1, 50);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25);
q100.prevList = q075;
q075.prevList = q050;
q050.prevList = q025;
q025.prevList = q000;
q000.prevList = null;
qInit.prevList = qInit;
}
@SuppressWarnings("unchecked")
private Deque<PoolSubpage<T>>[] newSubpagePoolArray(int size) {
return new Deque[size];
}
PooledByteBuf<T> allocate(PoolThreadCache cache, int minCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, minCapacity);
return buf;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, int minCapacity) {
final int capacity = normalizeCapacity(minCapacity);
if ((capacity & subpageOverflowMask) == 0) { // capacity < pageSize
int tableIdx;
Deque<PoolSubpage<T>>[] table;
if ((capacity & 0xFFFFFE00) == 0) { // < 512
tableIdx = capacity >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
int i = capacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
synchronized (this) {
Deque<PoolSubpage<T>> subpages = table[tableIdx];
for (;;) {
PoolSubpage<T> s = subpages.peekFirst();
if (s == null) {
break;
}
if (!s.doNotDestroy || s.elemSize != capacity) {
// The subpage has been destroyed or being used for different element size.
subpages.removeFirst();
continue;
}
long handle = s.allocate();
if (handle < 0) {
subpages.removeFirst();
} else {
s.chunk.initBufWithSubpage(buf, handle);
return;
}
}
}
}
allocateNormal(buf, capacity);
}
private synchronized void allocateNormal(PooledByteBuf<T> buf, int capacity) {
if (q050.allocate(buf, capacity) || q025.allocate(buf, capacity) ||
q000.allocate(buf, capacity) || qInit.allocate(buf, capacity) ||
q075.allocate(buf, capacity)) {
return;
}
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(capacity);
assert handle > 0;
c.initBuf(buf, handle);
qInit.add(c);
}
synchronized void free(PoolChunk<T> chunk, long handle) {
chunk.parent.free(chunk, handle);
}
void addSubpage(PoolSubpage<T> subpage) {
int tableIdx;
int elemSize = subpage.elemSize;
Deque<PoolSubpage<T>>[] table;
if ((elemSize & 0xFFFFFE00) == 0) { // < 512
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
elemSize >>>= 10;
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
table[tableIdx].addFirst(subpage);
}
private int normalizeCapacity(int capacity) {
if (capacity < 0 || capacity > chunkSize) {
throw new IllegalArgumentException("capacity: " + capacity + " (expected: 0-" + chunkSize + ')');
}
if ((capacity & 0xFFFFFE00) != 0) { // >= 512
// Doubled
int normalizedCapacity = 512;
while (normalizedCapacity < capacity) {
normalizedCapacity <<= 1;
}
return normalizedCapacity;
}
// Quantum-spaced
if ((capacity & 15) == 0) {
return capacity;
}
return (capacity & ~15) + 16;
}
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}
PoolChunk<T> oldChunk = buf.chunk;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
allocate(parent.threadCache.get(), buf, newCapacity);
if (newCapacity > oldCapacity) {
memoryCopy(
oldMemory, oldOffset + readerIndex,
buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
} else if (newCapacity < oldCapacity) {
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex = newCapacity;
}
memoryCopy(
oldMemory, oldOffset + readerIndex,
buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
} else {
readerIndex = writerIndex = newCapacity;
}
}
buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) {
free(oldChunk, oldHandle);
}
}
protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length);
protected abstract void destroyChunk(PoolChunk<T> chunk);
public synchronized String toString() {
StringBuilder buf = new StringBuilder();
buf.append("Chunk(s) at 0~25%:");
buf.append(StringUtil.NEWLINE);
buf.append(qInit);
buf.append(StringUtil.NEWLINE);
buf.append("Chunk(s) at 0~50%:");
buf.append(StringUtil.NEWLINE);
buf.append(q000);
buf.append(StringUtil.NEWLINE);
buf.append("Chunk(s) at 25~75%:");
buf.append(StringUtil.NEWLINE);
buf.append(q025);
buf.append(StringUtil.NEWLINE);
buf.append("Chunk(s) at 50~100%:");
buf.append(StringUtil.NEWLINE);
buf.append(q050);
buf.append(StringUtil.NEWLINE);
buf.append("Chunk(s) at 75~100%:");
buf.append(StringUtil.NEWLINE);
buf.append(q075);
buf.append(StringUtil.NEWLINE);
buf.append("Chunk(s) at 100%:");
buf.append(StringUtil.NEWLINE);
buf.append(q100);
buf.append(StringUtil.NEWLINE);
buf.append("tiny subpages:");
for (int i = 1; i < tinySubpagePools.length; i ++) {
Deque<PoolSubpage<T>> subpages = tinySubpagePools[i];
if (subpages.isEmpty()) {
continue;
}
buf.append(StringUtil.NEWLINE);
buf.append(i);
buf.append(": ");
buf.append(subpages);
}
buf.append(StringUtil.NEWLINE);
buf.append("small subpages:");
for (int i = 1; i < smallSubpagePools.length; i ++) {
Deque<PoolSubpage<T>> subpages = smallSubpagePools[i];
if (subpages.isEmpty()) {
continue;
}
buf.append(StringUtil.NEWLINE);
buf.append(i);
buf.append(": ");
buf.append(subpages);
}
buf.append(StringUtil.NEWLINE);
return buf.toString();
}
static final class HeapArena extends PoolArena<byte[]> {
HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
protected void destroyChunk(PoolChunk<byte[]> chunk) {
// Rely on GC.
}
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return new PooledHeapByteBuf(maxCapacity);
}
@Override
protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
if (length == 0) {
return;
}
System.arraycopy(src, srcOffset, dst, dstOffset, length);
}
}
static final class DirectArena extends PoolArena<ByteBuffer> {
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>(
this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);
}
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
UnpooledDirectByteBuf.freeDirect(chunk.memory);
}
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
return new PooledDirectByteBuf(maxCapacity);
}
@Override
protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
if (length == 0) {
return;
}
// We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
src = src.duplicate();
dst = dst.duplicate();
src.position(srcOffset).limit(srcOffset + length);
dst.position(dstOffset);
dst.put(src);
}
}
}

View File

@ -0,0 +1,332 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
final class PoolChunk<T> {
private static final int ST_UNUSED = 0;
private static final int ST_BRANCH = 1;
private static final int ST_ALLOCATED = 2;
private static final int ST_ALLOCATED_SUBPAGE = ST_ALLOCATED | 1;
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;
final PoolArena<T> arena;
final T memory;
private final int[] memoryMap;
private final PoolSubpage<T>[] subpages;
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask;
private final int pageSize;
private final int pageShifts;
private final int chunkSize;
private final int maxSubpageAllocs;
private long random = (System.nanoTime() ^ multiplier) & mask;
private int freeBytes;
PoolChunkList<T> parent;
PoolChunk<T> prev;
PoolChunk<T> next;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;
int chunkSizeInPages = chunkSize >>> pageShifts;
maxSubpageAllocs = 1 << maxOrder;
// Generate the memory map.
memoryMap = new int[maxSubpageAllocs << 1];
int memoryMapIndex = 1;
for (int i = 0; i <= maxOrder; i ++) {
int runSizeInPages = chunkSizeInPages >>> i;
for (int j = 0; j < chunkSizeInPages; j += runSizeInPages) {
//noinspection PointlessBitwiseExpression
memoryMap[memoryMapIndex ++] = j << 17 | runSizeInPages << 2 | ST_UNUSED;
}
}
subpages = newSubpageArray(maxSubpageAllocs);
}
@SuppressWarnings("unchecked")
private PoolSubpage<T>[] newSubpageArray(int size) {
return new PoolSubpage[size];
}
int usage() {
if (freeBytes == 0) {
return 100;
}
int freePercentage = (int) (freeBytes * 100L / chunkSize);
if (freePercentage == 0) {
return 99;
}
return 100 - freePercentage;
}
long allocate(int capacity) {
int firstVal = memoryMap[1];
if ((capacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(capacity, 1, firstVal);
} else {
return allocateSubpage(capacity, 1, firstVal);
}
}
private long allocateRun(int capacity, int curIdx, int val) {
for (;;) {
if ((val & ST_ALLOCATED) != 0) { // state == ST_ALLOCATED || state == ST_ALLOCATED_SUBPAGE
return -1;
}
if ((val & ST_BRANCH) != 0) { // state == ST_BRANCH
int nextIdx = curIdx << 1 ^ nextRandom();
long res = allocateRun(capacity, nextIdx, memoryMap[nextIdx]);
if (res > 0) {
return res;
}
curIdx = nextIdx ^ 1;
val = memoryMap[curIdx];
continue;
}
// state == ST_UNUSED
return allocateRunSimple(capacity, curIdx, val);
}
}
private long allocateRunSimple(int capacity, int curIdx, int val) {
int runLength = runLength(val);
if (capacity > runLength) {
return -1;
}
for (;;) {
if (capacity == runLength) {
// Found the run that fits.
// Note that capacity has been normalized already, so we don't need to deal with
// the values that are not power of 2.
memoryMap[curIdx] = val & ~3 | ST_ALLOCATED;
freeBytes -= runLength;
return curIdx;
}
int nextIdx = curIdx << 1 ^ nextRandom();
int unusedIdx = nextIdx ^ 1;
memoryMap[curIdx] = val & ~3 | ST_BRANCH;
//noinspection PointlessBitwiseExpression
memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED;
runLength >>>= 1;
curIdx = nextIdx;
val = memoryMap[curIdx];
}
}
private long allocateSubpage(int capacity, int curIdx, int val) {
int state = val & 3;
if (state == ST_BRANCH) {
int nextIdx = curIdx << 1 ^ nextRandom();
long res = branchSubpage(capacity, nextIdx);
if (res > 0) {
return res;
}
return branchSubpage(capacity, nextIdx ^ 1);
}
if (state == ST_UNUSED) {
return allocateSubpageSimple(capacity, curIdx, val);
}
if (state == ST_ALLOCATED_SUBPAGE) {
PoolSubpage<T> subpage = subpages[subpageIdx(curIdx)];
int elemSize = subpage.elemSize;
if (capacity != elemSize) {
return -1;
}
return subpage.allocate();
}
return -1;
}
private long allocateSubpageSimple(int capacity, int curIdx, int val) {
int runLength = runLength(val);
for (;;) {
if (runLength == pageSize) {
memoryMap[curIdx] = val & ~3 | ST_ALLOCATED_SUBPAGE;
freeBytes -= runLength;
int subpageIdx = subpageIdx(curIdx);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(this, curIdx, runOffset(val), pageSize, capacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(capacity);
}
arena.addSubpage(subpage);
return subpage.allocate();
}
int nextIdx = curIdx << 1 ^ nextRandom();
int unusedIdx = nextIdx ^ 1;
memoryMap[curIdx] = val & ~3 | ST_BRANCH;
//noinspection PointlessBitwiseExpression
memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED;
runLength >>>= 1;
curIdx = nextIdx;
val = memoryMap[curIdx];
}
}
private long branchSubpage(int capacity, int nextIdx) {
int nextVal = memoryMap[nextIdx];
if ((nextVal & 3) != ST_ALLOCATED) {
return allocateSubpage(capacity, nextIdx, nextVal);
}
return -1;
}
void free(long handle) {
int memoryMapIdx = (int) handle;
int bitmapIdx = (int) (handle >>> 32);
int val = memoryMap[memoryMapIdx];
int state = val & 3;
if (state == ST_ALLOCATED_SUBPAGE) {
assert bitmapIdx != 0;
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
if (subpage.free(bitmapIdx & 0x3FFFFFFF)) {
return;
}
} else {
assert state == ST_ALLOCATED : "state: " + state;
assert bitmapIdx == 0;
}
freeBytes += runLength(val);
for (;;) {
//noinspection PointlessBitwiseExpression
memoryMap[memoryMapIdx] = val & ~3 | ST_UNUSED;
if (memoryMapIdx == 1) {
assert freeBytes == chunkSize;
return;
}
if ((memoryMap[siblingIdx(memoryMapIdx)] & 3) != ST_UNUSED) {
break;
}
memoryMapIdx = parentIdx(memoryMapIdx);
val = memoryMap[memoryMapIdx];
}
}
void initBuf(PooledByteBuf<T> buf, long handle) {
int memoryMapIdx = (int) handle;
int bitmapIdx = (int) (handle >>> 32);
if (bitmapIdx == 0) {
int val = memoryMap[memoryMapIdx];
assert (val & 3) == ST_ALLOCATED : String.valueOf(val & 3);
buf.init(this, handle, memory, runOffset(val), runLength(val));
} else {
initBufWithSubpage(buf, handle, bitmapIdx);
}
}
void initBufWithSubpage(PooledByteBuf<T> buf, long handle) {
initBufWithSubpage(buf, handle, (int) (handle >>> 32));
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx) {
assert bitmapIdx != 0;
int memoryMapIdx = (int) handle;
int val = memoryMap[memoryMapIdx];
assert (val & 3) == ST_ALLOCATED_SUBPAGE;
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
buf.init(
this, handle, memory,
runOffset(val) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, subpage.elemSize);
}
private static int parentIdx(int memoryMapIdx) {
return memoryMapIdx >>> 1;
}
private static int siblingIdx(int memoryMapIdx) {
return memoryMapIdx ^ 1;
}
private int runLength(int val) {
return (val >>> 2 & 0x7FFF) << pageShifts;
}
private int runOffset(int val) {
return val >>> 17 << pageShifts;
}
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx - maxSubpageAllocs;
}
private int nextRandom() {
random = random * multiplier + addend & mask;
return (int) (random >>> 47) & 1;
}
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("Chunk(");
buf.append(Integer.toHexString(System.identityHashCode(this)));
buf.append(": ");
buf.append(usage());
buf.append("%, ");
buf.append(chunkSize - freeBytes);
buf.append('/');
buf.append(chunkSize);
buf.append(')');
return buf.toString();
}
}

View File

@ -0,0 +1,129 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.StringUtil;
final class PoolChunkList<T> {
private final PoolArena<T> arena;
private final PoolChunkList<T> nextList;
PoolChunkList<T> prevList;
private final int minUsage;
private final int maxUsage;
private PoolChunk<T> head;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolChunkList(PoolArena<T> arena, PoolChunkList<T> nextList, int minUsage, int maxUsage) {
this.arena = arena;
this.nextList = nextList;
this.minUsage = minUsage;
this.maxUsage = maxUsage;
}
boolean allocate(PooledByteBuf<T> buf, int capacity) {
if (head == null) {
return false;
}
for (PoolChunk<T> cur = head;;) {
long handle = cur.allocate(capacity);
if (handle < 0) {
cur = cur.next;
if (cur == null) {
return false;
}
} else {
cur.initBuf(buf, handle);
if (cur.usage() >= maxUsage) {
remove(cur);
nextList.add(cur);
}
return true;
}
}
}
void free(PoolChunk<T> chunk, long handle) {
chunk.free(handle);
if (chunk.usage() < minUsage) {
remove(chunk);
if (prevList == null) {
assert chunk.usage() == 0;
arena.destroyChunk(chunk);
} else {
prevList.add(chunk);
}
}
}
void add(PoolChunk<T> chunk) {
if (chunk.usage() >= maxUsage) {
nextList.add(chunk);
return;
}
chunk.parent = this;
if (head == null) {
head = chunk;
chunk.prev = null;
chunk.next = null;
} else {
chunk.prev = null;
chunk.next = head;
head.prev = chunk;
head = chunk;
}
}
private void remove(PoolChunk<T> cur) {
if (cur == head) {
head = cur.next;
if (head != null) {
head.prev = null;
}
} else {
PoolChunk<T> next = cur.next;
cur.prev.next = next;
if (next != null) {
next.prev = cur.prev;
}
}
}
@Override
public String toString() {
if (head == null) {
return "none";
}
StringBuilder buf = new StringBuilder();
for (PoolChunk<T> cur = head;;) {
buf.append(cur);
cur = cur.next;
if (cur == null) {
break;
}
buf.append(StringUtil.NEWLINE);
}
return buf.toString();
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
final class PoolSubpage<T> {
final PoolChunk<T> chunk;
final int memoryMapIdx;
final int runOffset;
final int pageSize;
final long[] bitmap;
boolean doNotDestroy;
int elemSize;
int maxNumElems;
int nextAvail;
int bitmapLength;
int numAvail;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolSubpage(PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(elemSize);
}
void init(int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize == 0) {
return;
}
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
/**
* Returns the bitmap index of the subpage allocation.
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
final int bitmapIdx = nextAvail;
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r;
if (-- numAvail == 0) {
nextAvail = -1;
} else {
nextAvail = findNextAvailable();
}
return toHandle(bitmapIdx);
}
/**
* @return {@code true} if this subpage is in use.
* {@code false} if this subpage is not used by its chunk and thus it's OK to be released.
*/
boolean free(int bitmapIdx) {
if (elemSize == 0) {
return true;
}
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
bitmap[q] ^= 1L << r;
if (numAvail ++ == 0) {
nextAvail = bitmapIdx;
chunk.arena.addSubpage(this);
return true;
}
if (numAvail < maxNumElems) {
return true;
} else {
doNotDestroy = false;
return false;
}
}
private int findNextAvailable() {
int newNextAvail = -1;
loop:
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
if (~bits != 0) {
for (int j = 0; j < 64; j ++) {
if ((bits & 1) == 0) {
newNextAvail = i << 6 | j;
break loop;
}
bits >>>= 1;
}
}
}
if (newNextAvail < maxNumElems) {
return newNextAvail;
} else {
return -1;
}
}
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
public String toString() {
if (!doNotDestroy) {
return "(" + memoryMapIdx + ": not in use)";
}
return String.valueOf('(') + memoryMapIdx + ": " + (maxNumElems - numAvail) + '/' + maxNumElems +
", offset: " + runOffset + ", length: " + pageSize + ", elemSize: " + elemSize + ')';
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.nio.ByteBuffer;
final class PoolThreadCache {
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena) {
this.heapArena = heapArena;
this.directArena = directArena;
}
}

View File

@ -0,0 +1,190 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.buffer.ByteBuf.Unsafe;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayDeque;
import java.util.Queue;
abstract class PooledByteBuf<T> extends AbstractByteBuf implements Unsafe {
protected PoolChunk<T> chunk;
protected long handle;
protected T memory;
protected int offset;
protected int length;
private ByteBuffer tmpNioBuf;
private Queue<Allocation<T>> suspendedDeallocations;
protected PooledByteBuf(int maxCapacity) {
super(maxCapacity);
}
void init(PoolChunk<T> chunk, long handle, T memory, int offset, int length) {
assert handle >= 0;
this.chunk = chunk;
this.handle = handle;
this.memory = memory;
this.offset = offset;
this.length = length;
setIndex(0, 0);
tmpNioBuf = null;
}
@Override
public int capacity() {
return length;
}
@Override
public ByteBuf capacity(int newCapacity) {
assert !isFreed();
if (suspendedDeallocations == null) {
chunk.arena.reallocate(this, newCapacity, true);
} else {
Allocation<T> old = new Allocation<T>(chunk, handle);
chunk.arena.reallocate(this, newCapacity, false);
suspendedDeallocations.add(old);
}
return this;
}
@Override
public ByteBufAllocator alloc() {
return chunk.arena.parent;
}
@Override
public ByteOrder order() {
return ByteOrder.BIG_ENDIAN;
}
@Override
public ByteBuf unwrap() {
return null;
}
@Override
public Unsafe unsafe() {
return this;
}
@Override
public ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);
}
return tmpNioBuf;
}
@Override
public ByteBuffer[] internalNioBuffers() {
return new ByteBuffer[] { internalNioBuffer() };
}
protected abstract ByteBuffer newInternalNioBuffer(T memory);
@Override
public void discardSomeReadBytes() {
final int readerIndex = readerIndex();
if (readerIndex == writerIndex()) {
discardReadBytes();
return;
}
if (readerIndex > 0 && readerIndex >= capacity() >>> 1) {
discardReadBytes();
}
}
@Override
public void suspendIntermediaryDeallocations() {
if (suspendedDeallocations == null) {
suspendedDeallocations = new ArrayDeque<Allocation<T>>(2);
}
}
@Override
public void resumeIntermediaryDeallocations() {
if (suspendedDeallocations == null) {
return;
}
Queue<Allocation<T>> suspendedDeallocations = this.suspendedDeallocations;
this.suspendedDeallocations = null;
if (suspendedDeallocations.isEmpty()) {
return;
}
for (Allocation<T> a: suspendedDeallocations) {
chunk.arena.free(a.chunk, a.handle);
}
}
@Override
public boolean isFreed() {
return handle < 0;
}
@Override
public void free() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
resumeIntermediaryDeallocations();
chunk.arena.free(chunk, handle);
}
}
protected int idx(int index) {
return offset + index;
}
protected void checkIndex(int index) {
assert !isFreed();
if (index < 0 || index >= length) {
throw new IndexOutOfBoundsException(String.format(
"index: %d (expected: range(0, %d))", index, length));
}
}
protected void checkIndex(int index, int fieldLength) {
assert !isFreed();
if (index < 0 || index > length - fieldLength) {
throw new IndexOutOfBoundsException(String.format(
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, length));
}
}
private static final class Allocation<T> {
final PoolChunk<T> chunk;
final long handle;
Allocation(PoolChunk<T> chunk, long handle) {
this.chunk = chunk;
this.handle = handle;
}
}
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_NUM_HEAP_ARENA = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_NUM_DIRECT_ARENA = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_PAGE_SIZE = 8192;
private static final int DEFAULT_MAX_ORDER = 11; // 8192 << 11 = 16 MiB per chunk
private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
public static final PooledByteBufAllocator DEFAULT = new PooledByteBufAllocator();
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
final ThreadLocal<PoolThreadCache> threadCache = new ThreadLocal<PoolThreadCache>() {
private final AtomicInteger index = new AtomicInteger();
@Override
protected PoolThreadCache initialValue() {
int idx = Math.abs(index.getAndIncrement() % heapArenas.length);
return new PoolThreadCache(heapArenas[idx], directArenas[idx]);
}
};
public PooledByteBufAllocator() {
this(false);
}
public PooledByteBufAllocator(boolean directByDefault) {
this(directByDefault, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
this(false, nHeapArena, nDirectArena, pageSize, maxOrder);
}
public PooledByteBufAllocator(
boolean directByDefault, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
super(validateAndCalculateChunkSize(pageSize, maxOrder), directByDefault);
if (nHeapArena <= 0) {
throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: 1+)");
}
if (nDirectArena <= 0) {
throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: 1+)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
int chunkSize = bufferMaxCapacity();
heapArenas = newArenaArray(nHeapArena);
for (int i = 0; i < heapArenas.length; i ++) {
heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
}
directArenas = newArenaArray(nDirectArena);
for (int i = 0; i < directArenas.length; i ++) {
directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
}
}
@SuppressWarnings("unchecked")
private static <T> PoolArena<T>[] newArenaArray(int size) {
return new PoolArena[size];
}
private static int validateAndCalculatePageShifts(int pageSize) {
if (pageSize < MIN_PAGE_SIZE) {
throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: 4096+)");
}
// Ensure pageSize is power of 2.
boolean found1 = false;
int pageShifts = 0;
for (int i = pageSize; i != 0 ; i >>= 1) {
if ((i & 1) != 0) {
if (!found1) {
found1 = true;
} else {
throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2");
}
} else {
if (!found1) {
pageShifts ++;
}
}
}
return pageShifts;
}
private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
if (maxOrder > 14) {
throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
}
// Ensure the resulting chunkSize does not overflow.
int chunkSize = pageSize;
for (int i = maxOrder; i > 0; i --) {
if (chunkSize > MAX_CHUNK_SIZE / 2) {
throw new IllegalArgumentException(String.format(
"pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
}
chunkSize <<= 1;
}
return chunkSize;
}
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
return cache.heapArena.allocate(cache, initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
return cache.directArena.allocate(cache, initialCapacity, maxCapacity);
}
@Override
public ByteBuf ioBuffer() {
return directBuffer(0);
}
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(heapArenas.length);
buf.append(" arena(s):");
buf.append(StringUtil.NEWLINE);
for (PoolArena<byte[]> a: heapArenas) {
buf.append(a);
}
return buf.toString();
}
}

View File

@ -0,0 +1,290 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
PooledDirectByteBuf(int maxCapacity) {
super(maxCapacity);
}
@Override
protected ByteBuffer newInternalNioBuffer(ByteBuffer memory) {
return memory.duplicate();
}
@Override
public boolean isDirect() {
return true;
}
@Override
public byte getByte(int index) {
checkIndex(index);
return memory.get(idx(index));
}
@Override
public short getShort(int index) {
checkIndex(index, 2);
return memory.getShort(idx(index));
}
@Override
public int getUnsignedMedium(int index) {
checkIndex(index, 3);
index = idx(index);
return (memory.get(index) & 0xff) << 16 | (memory.get(index + 1) & 0xff) << 8 | memory.get(index + 2) & 0xff;
}
@Override
public int getInt(int index) {
checkIndex(index, 4);
return memory.getInt(idx(index));
}
@Override
public long getLong(int index) {
checkIndex(index, 8);
return memory.getLong(idx(index));
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
checkIndex(index, length);
if (dst instanceof PooledDirectByteBuf) {
PooledDirectByteBuf bbdst = (PooledDirectByteBuf) dst;
ByteBuffer data = bbdst.internalNioBuffer();
dstIndex = bbdst.idx(dstIndex);
data.clear().position(dstIndex).limit(dstIndex + length);
getBytes(index, data);
} else if (dst.hasArray()) {
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else {
dst.setBytes(dstIndex, this, index, length);
}
return this;
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
tmpBuf.get(dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
checkIndex(index);
int bytesToCopy = Math.min(capacity() - index, dst.remaining());
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + bytesToCopy);
dst.put(tmpBuf);
return this;
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
checkIndex(index, length);
if (length == 0) {
return this;
}
byte[] tmp = new byte[length];
ByteBuffer tmpBuf = internalNioBuffer();
tmpBuf.clear().position(idx(index));
tmpBuf.get(tmp);
out.write(tmp);
return this;
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
checkIndex(index, length);
if (length == 0) {
return 0;
}
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
return out.write(tmpBuf);
}
@Override
public ByteBuf setByte(int index, int value) {
checkIndex(index);
memory.put(idx(index), (byte) value);
return this;
}
@Override
public ByteBuf setShort(int index, int value) {
checkIndex(index, 2);
memory.putShort(idx(index), (short) value);
return this;
}
@Override
public ByteBuf setMedium(int index, int value) {
checkIndex(index, 3);
index = idx(index);
memory.put(index, (byte) (value >>> 16));
memory.put(index + 1, (byte) (value >>> 8));
memory.put(index + 2, (byte) value);
return this;
}
@Override
public ByteBuf setInt(int index, int value) {
checkIndex(index, 4);
memory.putInt(idx(index), value);
return this;
}
@Override
public ByteBuf setLong(int index, long value) {
checkIndex(index, 8);
memory.putLong(idx(index), value);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
checkIndex(index, length);
if (src instanceof PooledDirectByteBuf) {
PooledDirectByteBuf bbsrc = (PooledDirectByteBuf) src;
ByteBuffer data = bbsrc.internalNioBuffer();
srcIndex = bbsrc.idx(srcIndex);
data.clear().position(srcIndex).limit(srcIndex + length);
setBytes(index, data);
} else if (src.hasArray()) {
setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
} else {
src.getBytes(srcIndex, this, index, length);
}
return this;
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
tmpBuf.put(src, srcIndex, length);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
checkIndex(index);
ByteBuffer tmpBuf = internalNioBuffer();
if (src == tmpBuf) {
src = src.duplicate();
}
index = idx(index);
tmpBuf.clear().position(index).limit(index + src.remaining());
tmpBuf.put(src);
return this;
}
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
checkIndex(index, length);
byte[] tmp = new byte[length];
int readBytes = in.read(tmp);
if (readBytes <= 0) {
return readBytes;
}
ByteBuffer tmpNioBuf = internalNioBuffer();
tmpNioBuf.clear().position(idx(index));
tmpNioBuf.put(tmp, 0, readBytes);
return readBytes;
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkIndex(index, length);
ByteBuffer tmpNioBuf = internalNioBuffer();
index = idx(index);
tmpNioBuf.clear().position(index).limit(index + length);
try {
return in.read(tmpNioBuf);
} catch (ClosedChannelException e) {
return -1;
}
}
@Override
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(capacity(), maxCapacity());
copy.writeBytes(this, index, length);
return copy();
}
@Override
public boolean hasNioBuffer() {
return true;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
index = idx(index);
return ((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)).slice();
}
@Override
public boolean hasNioBuffers() {
return true;
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
return new ByteBuffer[] { nioBuffer(index, length) };
}
@Override
public boolean hasArray() {
return false;
}
@Override
public byte[] array() {
throw new UnsupportedOperationException("direct buffer");
}
@Override
public int arrayOffset() {
throw new UnsupportedOperationException("direct buffer");
}
}

View File

@ -0,0 +1,265 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file tothe License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
final class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
PooledHeapByteBuf(int maxCapacity) {
super(maxCapacity);
}
@Override
public boolean isDirect() {
return false;
}
@Override
public byte getByte(int index) {
checkIndex(index);
return memory[idx(index)];
}
@Override
public short getShort(int index) {
checkIndex(index, 2);
index = idx(index);
return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
}
@Override
public int getUnsignedMedium(int index) {
checkIndex(index, 3);
index = idx(index);
return (memory[index] & 0xff) << 16 |
(memory[index + 1] & 0xff) << 8 |
memory[index + 2] & 0xff;
}
@Override
public int getInt(int index) {
checkIndex(index, 4);
index = idx(index);
return (memory[index] & 0xff) << 24 |
(memory[index + 1] & 0xff) << 16 |
(memory[index + 2] & 0xff) << 8 |
memory[index + 3] & 0xff;
}
@Override
public long getLong(int index) {
checkIndex(index, 8);
index = idx(index);
return ((long) memory[index] & 0xff) << 56 |
((long) memory[index + 1] & 0xff) << 48 |
((long) memory[index + 2] & 0xff) << 40 |
((long) memory[index + 3] & 0xff) << 32 |
((long) memory[index + 4] & 0xff) << 24 |
((long) memory[index + 5] & 0xff) << 16 |
((long) memory[index + 6] & 0xff) << 8 |
(long) memory[index + 7] & 0xff;
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
checkIndex(index, length);
if (dst.hasArray()) {
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else {
dst.setBytes(dstIndex, memory, idx(index), length);
}
return this;
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkIndex(index, length);
System.arraycopy(memory, idx(index), dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
checkIndex(index);
dst.put(memory, idx(index), Math.min(capacity() - index, dst.remaining()));
return this;
}
@Override
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
checkIndex(index, length);
out.write(memory, idx(index), length);
return this;
}
@Override
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
checkIndex(index, length);
index = idx(index);
return out.write((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
}
@Override
public ByteBuf setByte(int index, int value) {
checkIndex(index);
memory[idx(index)] = (byte) value;
return this;
}
@Override
public ByteBuf setShort(int index, int value) {
checkIndex(index, 2);
index = idx(index);
memory[index] = (byte) (value >>> 8);
memory[index + 1] = (byte) value;
return this;
}
@Override
public ByteBuf setMedium(int index, int value) {
checkIndex(index, 3);
index = idx(index);
memory[index] = (byte) (value >>> 16);
memory[index + 1] = (byte) (value >>> 8);
memory[index + 2] = (byte) value;
return this;
}
@Override
public ByteBuf setInt(int index, int value) {
checkIndex(index, 4);
index = idx(index);
memory[index] = (byte) (value >>> 24);
memory[index + 1] = (byte) (value >>> 16);
memory[index + 2] = (byte) (value >>> 8);
memory[index + 3] = (byte) value;
return this;
}
@Override
public ByteBuf setLong(int index, long value) {
checkIndex(index, 8);
index = idx(index);
memory[index] = (byte) (value >>> 56);
memory[index + 1] = (byte) (value >>> 48);
memory[index + 2] = (byte) (value >>> 40);
memory[index + 3] = (byte) (value >>> 32);
memory[index + 4] = (byte) (value >>> 24);
memory[index + 5] = (byte) (value >>> 16);
memory[index + 6] = (byte) (value >>> 8);
memory[index + 7] = (byte) value;
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
checkIndex(index, length);
if (src.hasArray()) {
setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
} else {
src.getBytes(srcIndex, memory, idx(index), length);
}
return this;
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkIndex(index, length);
System.arraycopy(src, srcIndex, memory, idx(index), length);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
int length = src.remaining();
checkIndex(index, length);
src.get(memory, idx(index), length);
return this;
}
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
checkIndex(index, length);
return in.read(memory, idx(index), length);
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkIndex(index, length);
index = idx(index);
try {
return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
} catch (ClosedChannelException e) {
return -1;
}
}
@Override
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().heapBuffer(length, maxCapacity());
copy.writeBytes(memory, idx(index), length);
return copy();
}
@Override
public boolean hasNioBuffer() {
return true;
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
checkIndex(index, length);
index = idx(index);
return ((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)).slice();
}
@Override
public boolean hasNioBuffers() {
return true;
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
return new ByteBuffer[] { nioBuffer(index, length) };
}
@Override
public boolean hasArray() {
return true;
}
@Override
public byte[] array() {
return memory;
}
@Override
public int arrayOffset() {
return offset;
}
@Override
protected ByteBuffer newInternalNioBuffer(byte[] memory) {
return ByteBuffer.wrap(memory);
}
}

View File

@ -31,7 +31,7 @@ import java.nio.channels.ScatteringByteChannel;
* recommended to use {@link Unpooled#unmodifiableBuffer(ByteBuf)}
* instead of calling the constructor explicitly.
*/
final class ReadOnlyByteBuf extends AbstractByteBuf implements Unsafe {
public class ReadOnlyByteBuf extends AbstractByteBuf implements Unsafe {
private final ByteBuf buffer;
@ -223,8 +223,8 @@ final class ReadOnlyByteBuf extends AbstractByteBuf implements Unsafe {
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
return buffer.nioBuffers(offset, length);
public ByteBuffer[] nioBuffers(int index, int length) {
return buffer.nioBuffers(index, length);
}
@Override

View File

@ -32,7 +32,7 @@ import java.nio.channels.ScatteringByteChannel;
* {@link ByteBuf#slice(int, int)} instead of calling the constructor
* explicitly.
*/
final class SlicedByteBuf extends AbstractByteBuf implements Unsafe {
public class SlicedByteBuf extends AbstractByteBuf implements Unsafe {
private final ByteBuf buffer;
private final int adjustment;

View File

@ -735,8 +735,8 @@ public final class SwappedByteBuf implements ByteBuf {
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
ByteBuffer[] nioBuffers = buf.nioBuffers(offset, length);
public ByteBuffer[] nioBuffers(int index, int length) {
ByteBuffer[] nioBuffers = buf.nioBuffers(index, length);
for (int i = 0; i < nioBuffers.length; i++) {
nioBuffers[i] = nioBuffers[i].order(order);
}

View File

@ -276,7 +276,7 @@ public final class Unpooled {
}
if (!components.isEmpty()) {
return new DefaultCompositeByteBuf(ALLOC, maxNumComponents, components);
return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, components);
}
}
@ -300,7 +300,7 @@ public final class Unpooled {
default:
for (ByteBuf b: buffers) {
if (b.readable()) {
return new DefaultCompositeByteBuf(ALLOC, maxNumComponents, buffers);
return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, buffers);
}
}
}
@ -334,7 +334,7 @@ public final class Unpooled {
}
if (!components.isEmpty()) {
return new DefaultCompositeByteBuf(ALLOC, maxNumComponents, components);
return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, components);
}
}
@ -352,7 +352,7 @@ public final class Unpooled {
* Returns a new big-endian composite buffer with no components.
*/
public static CompositeByteBuf compositeBuffer(int maxNumComponents) {
return new DefaultCompositeByteBuf(ALLOC, maxNumComponents);
return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents);
}
/**

View File

@ -17,8 +17,6 @@ package io.netty.buffer;
import io.netty.util.internal.DetectionUtil;
import java.util.concurrent.TimeUnit;
/**
* Simplistic {@link ByteBufAllocator} implementation that does not pool anything.
*/
@ -28,7 +26,7 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
public static final UnpooledByteBufAllocator DIRECT_BY_DEFAULT = new UnpooledByteBufAllocator(true);
private UnpooledByteBufAllocator(boolean directByDefault) {
super(directByDefault);
super(Integer.MAX_VALUE, directByDefault);
}
@Override
@ -44,30 +42,9 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
@Override
public ByteBuf ioBuffer() {
if (DetectionUtil.canFreeDirectBuffer()) {
return directBuffer();
return directBuffer(0);
}
return heapBuffer();
}
@Override
public void shutdown() {
throw new IllegalStateException(getClass().getName() + " cannot be shut down.");
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
Thread.sleep(unit.toMillis(timeout));
return false;
return heapBuffer(0);
}
}

View File

@ -54,7 +54,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe {
CLEANER_FIELD = cleanerField;
}
private static void freeDirect(ByteBuffer buffer) {
static void freeDirect(ByteBuffer buffer) {
if (CLEANER_FIELD == null) {
// Doomed to wait for GC.
return;
@ -265,8 +265,8 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe {
ByteBuffer data = bbdst.internalNioBuffer();
data.clear().position(dstIndex).limit(dstIndex + length);
getBytes(index, data);
} else if (buffer.hasArray()) {
dst.setBytes(dstIndex, buffer.array(), index + buffer.arrayOffset(), length);
} else if (dst.hasArray()) {
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else {
dst.setBytes(dstIndex, this, index, length);
}
@ -417,9 +417,12 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe {
} else {
byte[] tmp = new byte[length];
int readBytes = in.read(tmp);
if (readBytes <= 0) {
return readBytes;
}
ByteBuffer tmpNioBuf = internalNioBuffer();
tmpNioBuf.clear().position(index);
tmpNioBuf.put(tmp);
tmpNioBuf.put(tmp, 0, readBytes);
return readBytes;
}
}
@ -457,7 +460,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe {
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
public ByteBuffer[] nioBuffers(int index, int length) {
throw new UnsupportedOperationException();
}

View File

@ -156,8 +156,8 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf implements Unsafe {
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
assert !freed;
if (dst instanceof UnpooledHeapByteBuf) {
getBytes(index, ((UnpooledHeapByteBuf) dst).array, dstIndex, length);
if (dst.hasArray()) {
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else {
dst.setBytes(dstIndex, array, index, length);
}
@ -201,8 +201,8 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf implements Unsafe {
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
assert !freed;
if (src instanceof UnpooledHeapByteBuf) {
setBytes(index, ((UnpooledHeapByteBuf) src).array, srcIndex, length);
if (src.hasArray()) {
setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
} else {
src.getBytes(srcIndex, array, index, length);
}
@ -256,7 +256,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf implements Unsafe {
}
@Override
public ByteBuffer[] nioBuffers(int offset, int length) {
public ByteBuffer[] nioBuffers(int index, int length) {
throw new UnsupportedOperationException();
}

98
microbench/pom.xml Normal file
View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2012 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.1.Alpha8-SNAPSHOT</version>
</parent>
<artifactId>netty-microbench</artifactId>
<packaging>jar</packaging>
<name>Netty/Microbench</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>upload-caliper-reports</id>
<phase>deploy</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<failOnError>false</failOnError>
<target>
<taskdef resource="net/sf/antcontrib/antlib.xml" />
<if>
<available file="${user.home}/.caliperrc"/>
<then>
<java
classname="com.google.caliper.Runner"
classpathref="maven.test.classpath">
<arg value="--uploadResults"/>
<arg value="${project.build.directory}/caliper-reports"/>
</java>
</then>
<else>
<echo>
No .caliperrc file found; not uploading the benchmark report.
Please follow the instructions at:
* http://code.google.com/p/caliper/wiki/OnlineResults
to upload and browse the benchmark results.
</echo>
</else>
</if>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,98 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.buffer;
import com.google.caliper.Param;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.microbench.util.DefaultBenchmark;
import java.util.ArrayDeque;
import java.util.Deque;
public class ByteBufAllocatorBenchmark extends DefaultBenchmark {
private static final ByteBufAllocator POOLED_ALLOCATOR_HEAP = PooledByteBufAllocator.DEFAULT;
private static final ByteBufAllocator POOLED_ALLOCATOR_DIRECT = new PooledByteBufAllocator(true);
@Param({"0", "256", "1024", "4096", "16384", "65536"})
private int size;
@Param
private Allocator allocator;
private final Deque<ByteBuf> queue = new ArrayDeque<ByteBuf>();
private ByteBufAllocator alloc;
@Override
protected void setUp() throws Exception {
alloc = allocator.alloc();
for (int i = 0; i < 2560; i ++) {
queue.add(alloc.buffer(size));
}
}
@Override
protected void tearDown() throws Exception {
for (ByteBuf b: queue) {
b.unsafe().free();
}
queue.clear();
}
public void timeAllocAndFree(int reps) {
final ByteBufAllocator alloc = this.alloc;
final Deque<ByteBuf> queue = this.queue;
final int size = this.size;
for (int i = 0; i < reps; i ++) {
queue.add(alloc.buffer(size));
queue.removeFirst().unsafe().free();
}
}
public enum Allocator {
UNPOOLED_HEAP {
@Override
ByteBufAllocator alloc() {
return UnpooledByteBufAllocator.HEAP_BY_DEFAULT;
}
},
UNPOOLED_DIRECT {
@Override
ByteBufAllocator alloc() {
return UnpooledByteBufAllocator.DIRECT_BY_DEFAULT;
}
},
POOLED_HEAP {
@Override
ByteBufAllocator alloc() {
return POOLED_ALLOCATOR_HEAP;
}
},
POOLED_DIRECT {
@Override
ByteBufAllocator alloc() {
return POOLED_ALLOCATOR_DIRECT;
}
};
abstract ByteBufAllocator alloc();
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.util;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import org.junit.Test;
import java.io.File;
import static org.junit.Assert.*;
public abstract class DefaultBenchmark extends SimpleBenchmark {
private final int trials;
private final int warmupMillis;
private final int runMillis;
protected DefaultBenchmark() {
this(1);
}
protected DefaultBenchmark(int trials) {
this(trials, 3000, 1000);
}
protected DefaultBenchmark(int trials, int warmupMillis, int runMillis) {
this.trials = trials;
this.warmupMillis = warmupMillis;
this.runMillis = runMillis;
}
@Test
public void runBenchmarks() throws Exception {
File me = new File(DefaultBenchmark.class.getResource(
'/' + DefaultBenchmark.class.getName().replace('.', '/') + ".class").getPath());
if (!me.exists()) {
fail("failed to determine the project path");
}
File buildDir =
me.getParentFile().getParentFile().getParentFile().getParentFile().getParentFile().getParentFile();
if (!buildDir.getPath().endsWith(File.separator + "target") || !buildDir.isDirectory()) {
fail("failed to locate the build directory");
}
File reportDir = new File(buildDir.getAbsolutePath() + File.separator + "caliper-reports");
if (!reportDir.exists()) {
if (!reportDir.mkdirs()) {
fail("failed to create the Caliper report directory: " + reportDir.getAbsolutePath());
}
}
if (!reportDir.isDirectory()) {
fail("not a directory: " + reportDir.getAbsolutePath());
}
deleteOldReports(reportDir);
new Runner().run(
"--trials", String.valueOf(trials),
"--warmupMillis", String.valueOf(warmupMillis),
"--runMillis", String.valueOf(runMillis),
"--captureVmLog",
"--saveResults", reportDir.getAbsolutePath(),
getClass().getName());
}
private void deleteOldReports(File reportDir) {
final String prefix = getClass().getName() + '.';
final String suffix = ".json";
for (File f: reportDir.listFiles()) {
String name = f.getName();
if (name.startsWith(prefix) && name.endsWith(suffix)) {
if (f.delete()) {
System.out.println(" Deleted old report: " + name.substring(prefix.length(), name.length() - suffix.length()));
}
}
}
}
}

36
pom.xml
View File

@ -187,6 +187,14 @@
<version>${jboss.marshalling.version}</version>
<scope>test</scope>
</dependency>
<!-- Test dependencies for microbench -->
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
<version>0.5-rc1</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
@ -222,7 +230,6 @@
<version>1.6.6</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -338,7 +345,10 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12</version>
<configuration>
<forkMode>once</forkMode>
<includes>
<include>**/*Test*.java</include>
<include>**/*Benchmark*.java</include>
</includes>
<excludes>
<exclude>**/Abstract*</exclude>
<exclude>**/TestUtil*</exclude>
@ -347,7 +357,11 @@
<argLine>
-server
-dsa -da -ea:io.netty...
-XX:+AggressiveOpts -XX:+UseFastAccessorMethods
-XX:+AggressiveOpts
-XX:+TieredCompilation
-XX:+UseBiasedLocking
-XX:+UseFastAccessorMethods
-XX:+UseStringCache
-XX:+OptimizeStringConcat
-XX:+HeapDumpOnOutOfMemoryError
</argLine>
@ -425,9 +439,21 @@
<version>2.2</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<dependencies>
<dependency>
<groupId>ant-contrib</groupId>
<artifactId>ant-contrib</artifactId>
<version>1.0b3</version>
<exclusions>
<exclusion>
<groupId>ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>

View File

@ -15,7 +15,6 @@
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
@ -26,12 +25,13 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.junit.Test;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import static org.junit.Assert.*;
public class SocketObjectEchoTest extends AbstractSocketTest {

View File

@ -16,7 +16,7 @@
package io.netty.channel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig;
import java.util.IdentityHashMap;
@ -30,7 +30,7 @@ import static io.netty.channel.ChannelOption.*;
*/
public class DefaultChannelConfig implements ChannelConfig {
private static final ByteBufAllocator DEFAULT_ALLOCATOR = UnpooledByteBufAllocator.HEAP_BY_DEFAULT;
private static final ByteBufAllocator DEFAULT_ALLOCATOR = PooledByteBufAllocator.DEFAULT;
private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR;

View File

@ -1252,7 +1252,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (out.writerIndex() > out.maxCapacity() - data.readableBytes()) {
// The target buffer is not going to be able to accept all data in the bridge.
out.ensureWritableBytes(out.maxCapacity() - out.writerIndex());
out.capacity(out.maxCapacity());
out.writeBytes(data, out.writableBytes());
} else {
exchangeBuf.remove();

View File

@ -527,7 +527,6 @@ public class LocalTransportThreadModelTest {
ByteBuf in = ctx.inboundByteBuffer();
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
while (in.readableBytes() >= 4) {
int msg = in.readInt();
int expected = inCnt ++;