diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java index 72269ca751..33c80feaf7 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java @@ -18,18 +18,28 @@ package io.netty.buffer; public abstract class AbstractByteBufAllocator implements ByteBufAllocator { + private final int bufferMaxCapacity; private final boolean directByDefault; private final ByteBuf emptyBuf; - protected AbstractByteBufAllocator() { - this(false); + protected AbstractByteBufAllocator(int bufferMaxCapacity) { + this(bufferMaxCapacity, false); } - protected AbstractByteBufAllocator(boolean directByDefault) { + protected AbstractByteBufAllocator(int bufferMaxCapacity, boolean directByDefault) { + if (bufferMaxCapacity <= 0) { + throw new IllegalArgumentException("bufferMaxCapacity: " + bufferMaxCapacity + " (expected: 1+)"); + } this.directByDefault = directByDefault; + this.bufferMaxCapacity = bufferMaxCapacity; emptyBuf = new UnpooledHeapByteBuf(this, 0, 0); } + @Override + public int bufferMaxCapacity() { + return bufferMaxCapacity; + } + @Override public ByteBuf buffer() { if (directByDefault) { @@ -56,12 +66,12 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator { @Override public ByteBuf heapBuffer() { - return heapBuffer(256, Integer.MAX_VALUE); + return heapBuffer(256, bufferMaxCapacity()); } @Override public ByteBuf heapBuffer(int initialCapacity) { - return buffer(initialCapacity, Integer.MAX_VALUE); + return heapBuffer(initialCapacity, bufferMaxCapacity()); } @Override @@ -69,17 +79,18 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } + validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); } @Override public ByteBuf directBuffer() { - return directBuffer(256, Integer.MAX_VALUE); + return directBuffer(256, bufferMaxCapacity()); } @Override public ByteBuf directBuffer(int initialCapacity) { - return directBuffer(initialCapacity, Integer.MAX_VALUE); + return directBuffer(initialCapacity, bufferMaxCapacity()); } @Override @@ -87,9 +98,58 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } + validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } + @Override + public CompositeByteBuf compositeBuffer() { + if (directByDefault) { + return compositeDirectBuffer(); + } + return compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + if (directByDefault) { + return compositeDirectBuffer(maxNumComponents); + } + return compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + return compositeHeapBuffer(16); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + return new DefaultCompositeByteBuf(this, false, maxNumComponents); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return compositeDirectBuffer(16); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return new DefaultCompositeByteBuf(this, true, maxNumComponents); + } + + private void validate(int initialCapacity, int maxCapacity) { + if (maxCapacity > bufferMaxCapacity()) { + throw new IllegalArgumentException( + "maxCapacity: " + maxCapacity + " (expected: not greater than " + bufferMaxCapacity()); + } + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity: %d (expected: not greater than maxCapacity(%d)", + initialCapacity, maxCapacity)); + } + } + protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity); protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity); } diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index ca8316e189..8eb5b23535 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1745,7 +1745,7 @@ public interface ByteBuf extends ChannelBuf, Comparable { ByteBuffer[] nioBuffers(); /** - * Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified offset and length + * Exposes this buffer's bytes as an NIO {@link ByteBuffer}'s for the specified index and length * The returned buffer shares the content with this buffer, while changing the position and limit * of the returned NIO buffer does not affect the indexes and marks of this buffer. This method does * not modify {@code readerIndex} or {@code writerIndex} of this buffer. Please note that the @@ -1756,7 +1756,7 @@ public interface ByteBuf extends ChannelBuf, Comparable { * @throws UnsupportedOperationException * if this buffer cannot create a {@link ByteBuffer} that shares the content with itself */ - ByteBuffer[] nioBuffers(int offset, int length); + ByteBuffer[] nioBuffers(int index, int length); /** * Returns {@code true} if and only if this buffer has a backing byte array. diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java index 22c6c5f3b1..d972948c6c 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufAllocator.java @@ -15,8 +15,6 @@ */ package io.netty.buffer; -import java.util.concurrent.TimeUnit; - public interface ByteBufAllocator { ByteBuf buffer(); @@ -30,8 +28,12 @@ public interface ByteBufAllocator { ByteBuf directBuffer(int initialCapacity, int maxCapacity); ByteBuf ioBuffer(); - void shutdown(); - boolean isShutdown(); - boolean isTerminated(); - boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; + CompositeByteBuf compositeBuffer(); + CompositeByteBuf compositeBuffer(int maxNumComponents); + CompositeByteBuf compositeHeapBuffer(); + CompositeByteBuf compositeHeapBuffer(int maxNumComponents); + CompositeByteBuf compositeDirectBuffer(); + CompositeByteBuf compositeDirectBuffer(int maxNumComponents); + + int bufferMaxCapacity(); } diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index b0036ad43d..06293b2339 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -40,11 +40,12 @@ import java.util.Queue; * is recommended to use {@link Unpooled#wrappedBuffer(ByteBuf...)} * instead of calling the constructor explicitly. */ -final class DefaultCompositeByteBuf extends AbstractByteBuf implements CompositeByteBuf, Unsafe { +public class DefaultCompositeByteBuf extends AbstractByteBuf implements CompositeByteBuf, Unsafe { private static final ByteBuffer[] EMPTY_NIOBUFFERS = new ByteBuffer[0]; private final ByteBufAllocator alloc; + private final boolean direct; private final List components = new ArrayList(); private final int maxNumComponents; @@ -53,16 +54,17 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite private boolean freed; private Queue suspendedDeallocations; - public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents) { + public DefaultCompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) { super(Integer.MAX_VALUE); if (alloc == null) { throw new NullPointerException("alloc"); } this.alloc = alloc; + this.direct = direct; this.maxNumComponents = maxNumComponents; } - public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents, ByteBuf... buffers) { + public DefaultCompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, ByteBuf... buffers) { super(Integer.MAX_VALUE); if (alloc == null) { throw new NullPointerException("alloc"); @@ -73,6 +75,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite } this.alloc = alloc; + this.direct = direct; this.maxNumComponents = maxNumComponents; addComponents0(0, buffers); @@ -80,7 +83,8 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite setIndex(0, capacity()); } - public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents, Iterable buffers) { + public DefaultCompositeByteBuf( + ByteBufAllocator alloc, boolean direct, int maxNumComponents, Iterable buffers) { super(Integer.MAX_VALUE); if (alloc == null) { throw new NullPointerException("alloc"); @@ -91,6 +95,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite } this.alloc = alloc; + this.direct = direct; this.maxNumComponents = maxNumComponents; addComponents0(0, buffers); consolidateIfNeeded(); @@ -260,7 +265,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite if (numComponents > maxNumComponents) { final int capacity = components.get(numComponents - 1).endOffset; - ByteBuf consolidated = alloc().buffer(capacity); + ByteBuf consolidated = allocBuffer(capacity); // We're not using foreach to avoid creating an iterator. // noinspection ForLoopReplaceableByForEach @@ -440,13 +445,16 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite if (newCapacity > oldCapacity) { final int paddingLength = newCapacity - oldCapacity; ByteBuf padding; - if (components.isEmpty()) { - padding = alloc().buffer(paddingLength, paddingLength); + int nComponents = components.size(); + if (nComponents < maxNumComponents) { + padding = allocBuffer(paddingLength); padding.setIndex(0, paddingLength); addComponent0(0, padding, true); } else { - padding = alloc().buffer(paddingLength); + padding = allocBuffer(paddingLength); padding.setIndex(0, paddingLength); + // FIXME: No need to create a padding buffer and consolidate. + // Just create a big single buffer and put the current content there. addComponent0(components.size(), padding, true); consolidateIfNeeded(); } @@ -1133,7 +1141,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite final Component last = components.get(numComponents - 1); final int capacity = last.endOffset; - final ByteBuf consolidated = alloc().buffer(capacity); + final ByteBuf consolidated = allocBuffer(capacity); for (int i = 0; i < numComponents; i ++) { Component c = components.get(i); @@ -1158,7 +1166,7 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite final int endCIndex = cIndex + numComponents; final Component last = components.get(endCIndex - 1); final int capacity = last.endOffset - components.get(cIndex).offset; - final ByteBuf consolidated = alloc().buffer(capacity); + final ByteBuf consolidated = allocBuffer(capacity); for (int i = cIndex; i < endCIndex; i ++) { Component c = components.get(i); @@ -1253,6 +1261,13 @@ final class DefaultCompositeByteBuf extends AbstractByteBuf implements Composite return this; } + private ByteBuf allocBuffer(int capacity) { + if (direct) { + return alloc().directBuffer(capacity); + } + return alloc().heapBuffer(capacity); + } + @Override public String toString() { String result = super.toString(); diff --git a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java index 3892e748c0..4f011f5f71 100644 --- a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java @@ -31,7 +31,7 @@ import java.nio.channels.ScatteringByteChannel; * parent. It is recommended to use {@link ByteBuf#duplicate()} instead * of calling the constructor explicitly. */ -final class DuplicatedByteBuf extends AbstractByteBuf implements Unsafe { +public class DuplicatedByteBuf extends AbstractByteBuf implements Unsafe { private final ByteBuf buffer; @@ -235,8 +235,8 @@ final class DuplicatedByteBuf extends AbstractByteBuf implements Unsafe { } @Override - public ByteBuffer[] nioBuffers(int offset, int length) { - return buffer.nioBuffers(offset, length); + public ByteBuffer[] nioBuffers(int index, int length) { + return buffer.nioBuffers(index, length); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java new file mode 100644 index 0000000000..2e2c26bb3e --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -0,0 +1,370 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import io.netty.util.internal.StringUtil; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; + +abstract class PoolArena { + + final PooledByteBufAllocator parent; + + private final int pageSize; + private final int maxOrder; + private final int pageShifts; + private final int chunkSize; + private final int subpageOverflowMask; + + private final Deque>[] tinySubpagePools; + private final Deque>[] smallSubpagePools; + + private final PoolChunkList q050; + private final PoolChunkList q025; + private final PoolChunkList q000; + private final PoolChunkList qInit; + private final PoolChunkList q075; + private final PoolChunkList q100; + + // TODO: Test if adding padding helps under contention + //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { + this.parent = parent; + this.pageSize = pageSize; + this.maxOrder = maxOrder; + this.pageShifts = pageShifts; + this.chunkSize = chunkSize; + subpageOverflowMask = ~(pageSize - 1); + + tinySubpagePools = newSubpagePoolArray(512 >>> 4); + for (int i = 0; i < tinySubpagePools.length; i ++) { + tinySubpagePools[i] = new ArrayDeque>(); + } + + smallSubpagePools = newSubpagePoolArray(pageShifts - 9); + for (int i = 0; i < smallSubpagePools.length; i ++) { + smallSubpagePools[i] = new ArrayDeque>(); + } + + q100 = new PoolChunkList(this, null, 100, Integer.MAX_VALUE); + q075 = new PoolChunkList(this, q100, 75, 100); + q050 = new PoolChunkList(this, q075, 50, 100); + q025 = new PoolChunkList(this, q050, 25, 75); + q000 = new PoolChunkList(this, q025, 1, 50); + qInit = new PoolChunkList(this, q000, Integer.MIN_VALUE, 25); + + q100.prevList = q075; + q075.prevList = q050; + q050.prevList = q025; + q025.prevList = q000; + q000.prevList = null; + qInit.prevList = qInit; + } + + @SuppressWarnings("unchecked") + private Deque>[] newSubpagePoolArray(int size) { + return new Deque[size]; + } + + PooledByteBuf allocate(PoolThreadCache cache, int minCapacity, int maxCapacity) { + PooledByteBuf buf = newByteBuf(maxCapacity); + allocate(cache, buf, minCapacity); + return buf; + } + + private void allocate(PoolThreadCache cache, PooledByteBuf buf, int minCapacity) { + final int capacity = normalizeCapacity(minCapacity); + if ((capacity & subpageOverflowMask) == 0) { // capacity < pageSize + int tableIdx; + Deque>[] table; + if ((capacity & 0xFFFFFE00) == 0) { // < 512 + tableIdx = capacity >>> 4; + table = tinySubpagePools; + } else { + tableIdx = 0; + int i = capacity >>> 10; + while (i != 0) { + i >>>= 1; + tableIdx ++; + } + table = smallSubpagePools; + } + + synchronized (this) { + Deque> subpages = table[tableIdx]; + for (;;) { + PoolSubpage s = subpages.peekFirst(); + if (s == null) { + break; + } + + if (!s.doNotDestroy || s.elemSize != capacity) { + // The subpage has been destroyed or being used for different element size. + subpages.removeFirst(); + continue; + } + + long handle = s.allocate(); + if (handle < 0) { + subpages.removeFirst(); + } else { + s.chunk.initBufWithSubpage(buf, handle); + return; + } + } + } + } + + allocateNormal(buf, capacity); + } + + private synchronized void allocateNormal(PooledByteBuf buf, int capacity) { + if (q050.allocate(buf, capacity) || q025.allocate(buf, capacity) || + q000.allocate(buf, capacity) || qInit.allocate(buf, capacity) || + q075.allocate(buf, capacity)) { + return; + } + + // Add a new chunk. + PoolChunk c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); + long handle = c.allocate(capacity); + assert handle > 0; + c.initBuf(buf, handle); + qInit.add(c); + } + + synchronized void free(PoolChunk chunk, long handle) { + chunk.parent.free(chunk, handle); + } + + void addSubpage(PoolSubpage subpage) { + int tableIdx; + int elemSize = subpage.elemSize; + Deque>[] table; + if ((elemSize & 0xFFFFFE00) == 0) { // < 512 + tableIdx = elemSize >>> 4; + table = tinySubpagePools; + } else { + tableIdx = 0; + elemSize >>>= 10; + while (elemSize != 0) { + elemSize >>>= 1; + tableIdx ++; + } + table = smallSubpagePools; + } + + table[tableIdx].addFirst(subpage); + } + + private int normalizeCapacity(int capacity) { + if (capacity < 0 || capacity > chunkSize) { + throw new IllegalArgumentException("capacity: " + capacity + " (expected: 0-" + chunkSize + ')'); + } + + if ((capacity & 0xFFFFFE00) != 0) { // >= 512 + // Doubled + int normalizedCapacity = 512; + while (normalizedCapacity < capacity) { + normalizedCapacity <<= 1; + } + return normalizedCapacity; + } + + // Quantum-spaced + if ((capacity & 15) == 0) { + return capacity; + } + + return (capacity & ~15) + 16; + } + + void reallocate(PooledByteBuf buf, int newCapacity, boolean freeOldMemory) { + if (newCapacity < 0 || newCapacity > buf.maxCapacity()) { + throw new IllegalArgumentException("newCapacity: " + newCapacity); + } + + int oldCapacity = buf.length; + if (oldCapacity == newCapacity) { + return; + } + + PoolChunk oldChunk = buf.chunk; + long oldHandle = buf.handle; + T oldMemory = buf.memory; + int oldOffset = buf.offset; + + int readerIndex = buf.readerIndex(); + int writerIndex = buf.writerIndex(); + + allocate(parent.threadCache.get(), buf, newCapacity); + if (newCapacity > oldCapacity) { + memoryCopy( + oldMemory, oldOffset + readerIndex, + buf.memory, buf.offset + readerIndex, writerIndex - readerIndex); + } else if (newCapacity < oldCapacity) { + if (readerIndex < newCapacity) { + if (writerIndex > newCapacity) { + writerIndex = newCapacity; + } + memoryCopy( + oldMemory, oldOffset + readerIndex, + buf.memory, buf.offset + readerIndex, writerIndex - readerIndex); + } else { + readerIndex = writerIndex = newCapacity; + } + } + + buf.setIndex(readerIndex, writerIndex); + + if (freeOldMemory) { + free(oldChunk, oldHandle); + } + } + + protected abstract PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize); + protected abstract PooledByteBuf newByteBuf(int maxCapacity); + protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length); + protected abstract void destroyChunk(PoolChunk chunk); + + public synchronized String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("Chunk(s) at 0~25%:"); + buf.append(StringUtil.NEWLINE); + buf.append(qInit); + buf.append(StringUtil.NEWLINE); + buf.append("Chunk(s) at 0~50%:"); + buf.append(StringUtil.NEWLINE); + buf.append(q000); + buf.append(StringUtil.NEWLINE); + buf.append("Chunk(s) at 25~75%:"); + buf.append(StringUtil.NEWLINE); + buf.append(q025); + buf.append(StringUtil.NEWLINE); + buf.append("Chunk(s) at 50~100%:"); + buf.append(StringUtil.NEWLINE); + buf.append(q050); + buf.append(StringUtil.NEWLINE); + buf.append("Chunk(s) at 75~100%:"); + buf.append(StringUtil.NEWLINE); + buf.append(q075); + buf.append(StringUtil.NEWLINE); + buf.append("Chunk(s) at 100%:"); + buf.append(StringUtil.NEWLINE); + buf.append(q100); + buf.append(StringUtil.NEWLINE); + buf.append("tiny subpages:"); + for (int i = 1; i < tinySubpagePools.length; i ++) { + Deque> subpages = tinySubpagePools[i]; + if (subpages.isEmpty()) { + continue; + } + + buf.append(StringUtil.NEWLINE); + buf.append(i); + buf.append(": "); + buf.append(subpages); + } + buf.append(StringUtil.NEWLINE); + buf.append("small subpages:"); + for (int i = 1; i < smallSubpagePools.length; i ++) { + Deque> subpages = smallSubpagePools[i]; + if (subpages.isEmpty()) { + continue; + } + + buf.append(StringUtil.NEWLINE); + buf.append(i); + buf.append(": "); + buf.append(subpages); + } + buf.append(StringUtil.NEWLINE); + + return buf.toString(); + } + + static final class HeapArena extends PoolArena { + + HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { + super(parent, pageSize, maxOrder, pageShifts, chunkSize); + } + + @Override + protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { + return new PoolChunk(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize); + } + + @Override + protected void destroyChunk(PoolChunk chunk) { + // Rely on GC. + } + + @Override + protected PooledByteBuf newByteBuf(int maxCapacity) { + return new PooledHeapByteBuf(maxCapacity); + } + + @Override + protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) { + if (length == 0) { + return; + } + + System.arraycopy(src, srcOffset, dst, dstOffset, length); + } + } + + static final class DirectArena extends PoolArena { + + DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { + super(parent, pageSize, maxOrder, pageShifts, chunkSize); + } + + @Override + protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { + return new PoolChunk( + this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize); + } + + @Override + protected void destroyChunk(PoolChunk chunk) { + UnpooledDirectByteBuf.freeDirect(chunk.memory); + } + + @Override + protected PooledByteBuf newByteBuf(int maxCapacity) { + return new PooledDirectByteBuf(maxCapacity); + } + + @Override + protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) { + if (length == 0) { + return; + } + + // We must duplicate the NIO buffers because they may be accessed by other Netty buffers. + src = src.duplicate(); + dst = dst.duplicate(); + src.position(srcOffset).limit(srcOffset + length); + dst.position(dstOffset); + dst.put(src); + } + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunk.java b/buffer/src/main/java/io/netty/buffer/PoolChunk.java new file mode 100644 index 0000000000..3b66f0f585 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolChunk.java @@ -0,0 +1,332 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +final class PoolChunk { + private static final int ST_UNUSED = 0; + private static final int ST_BRANCH = 1; + private static final int ST_ALLOCATED = 2; + private static final int ST_ALLOCATED_SUBPAGE = ST_ALLOCATED | 1; + + private static final long multiplier = 0x5DEECE66DL; + private static final long addend = 0xBL; + private static final long mask = (1L << 48) - 1; + + final PoolArena arena; + final T memory; + + private final int[] memoryMap; + private final PoolSubpage[] subpages; + /** Used to determine if the requested capacity is equal to or greater than pageSize. */ + private final int subpageOverflowMask; + private final int pageSize; + private final int pageShifts; + + private final int chunkSize; + private final int maxSubpageAllocs; + + private long random = (System.nanoTime() ^ multiplier) & mask; + + private int freeBytes; + + PoolChunkList parent; + PoolChunk prev; + PoolChunk next; + + // TODO: Test if adding padding helps under contention + //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + PoolChunk(PoolArena arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) { + this.arena = arena; + this.memory = memory; + this.pageSize = pageSize; + this.pageShifts = pageShifts; + this.chunkSize = chunkSize; + subpageOverflowMask = ~(pageSize - 1); + freeBytes = chunkSize; + + int chunkSizeInPages = chunkSize >>> pageShifts; + maxSubpageAllocs = 1 << maxOrder; + + // Generate the memory map. + memoryMap = new int[maxSubpageAllocs << 1]; + int memoryMapIndex = 1; + for (int i = 0; i <= maxOrder; i ++) { + int runSizeInPages = chunkSizeInPages >>> i; + for (int j = 0; j < chunkSizeInPages; j += runSizeInPages) { + //noinspection PointlessBitwiseExpression + memoryMap[memoryMapIndex ++] = j << 17 | runSizeInPages << 2 | ST_UNUSED; + } + } + + subpages = newSubpageArray(maxSubpageAllocs); + } + + @SuppressWarnings("unchecked") + private PoolSubpage[] newSubpageArray(int size) { + return new PoolSubpage[size]; + } + + int usage() { + if (freeBytes == 0) { + return 100; + } + + int freePercentage = (int) (freeBytes * 100L / chunkSize); + if (freePercentage == 0) { + return 99; + } + return 100 - freePercentage; + } + + long allocate(int capacity) { + int firstVal = memoryMap[1]; + if ((capacity & subpageOverflowMask) != 0) { // >= pageSize + return allocateRun(capacity, 1, firstVal); + } else { + return allocateSubpage(capacity, 1, firstVal); + } + } + + private long allocateRun(int capacity, int curIdx, int val) { + for (;;) { + if ((val & ST_ALLOCATED) != 0) { // state == ST_ALLOCATED || state == ST_ALLOCATED_SUBPAGE + return -1; + } + + if ((val & ST_BRANCH) != 0) { // state == ST_BRANCH + int nextIdx = curIdx << 1 ^ nextRandom(); + long res = allocateRun(capacity, nextIdx, memoryMap[nextIdx]); + if (res > 0) { + return res; + } + + curIdx = nextIdx ^ 1; + val = memoryMap[curIdx]; + continue; + } + + // state == ST_UNUSED + return allocateRunSimple(capacity, curIdx, val); + } + } + + private long allocateRunSimple(int capacity, int curIdx, int val) { + int runLength = runLength(val); + if (capacity > runLength) { + return -1; + } + + for (;;) { + if (capacity == runLength) { + // Found the run that fits. + // Note that capacity has been normalized already, so we don't need to deal with + // the values that are not power of 2. + memoryMap[curIdx] = val & ~3 | ST_ALLOCATED; + freeBytes -= runLength; + return curIdx; + } + + int nextIdx = curIdx << 1 ^ nextRandom(); + int unusedIdx = nextIdx ^ 1; + + memoryMap[curIdx] = val & ~3 | ST_BRANCH; + //noinspection PointlessBitwiseExpression + memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED; + + runLength >>>= 1; + curIdx = nextIdx; + val = memoryMap[curIdx]; + } + } + + private long allocateSubpage(int capacity, int curIdx, int val) { + int state = val & 3; + if (state == ST_BRANCH) { + int nextIdx = curIdx << 1 ^ nextRandom(); + long res = branchSubpage(capacity, nextIdx); + if (res > 0) { + return res; + } + + return branchSubpage(capacity, nextIdx ^ 1); + } + + if (state == ST_UNUSED) { + return allocateSubpageSimple(capacity, curIdx, val); + } + + if (state == ST_ALLOCATED_SUBPAGE) { + PoolSubpage subpage = subpages[subpageIdx(curIdx)]; + int elemSize = subpage.elemSize; + if (capacity != elemSize) { + return -1; + } + + return subpage.allocate(); + } + + return -1; + } + + private long allocateSubpageSimple(int capacity, int curIdx, int val) { + int runLength = runLength(val); + for (;;) { + if (runLength == pageSize) { + memoryMap[curIdx] = val & ~3 | ST_ALLOCATED_SUBPAGE; + freeBytes -= runLength; + + int subpageIdx = subpageIdx(curIdx); + PoolSubpage subpage = subpages[subpageIdx]; + if (subpage == null) { + subpage = new PoolSubpage(this, curIdx, runOffset(val), pageSize, capacity); + subpages[subpageIdx] = subpage; + } else { + subpage.init(capacity); + } + arena.addSubpage(subpage); + return subpage.allocate(); + } + + int nextIdx = curIdx << 1 ^ nextRandom(); + int unusedIdx = nextIdx ^ 1; + + memoryMap[curIdx] = val & ~3 | ST_BRANCH; + //noinspection PointlessBitwiseExpression + memoryMap[unusedIdx] = memoryMap[unusedIdx] & ~3 | ST_UNUSED; + + runLength >>>= 1; + curIdx = nextIdx; + val = memoryMap[curIdx]; + } + } + + private long branchSubpage(int capacity, int nextIdx) { + int nextVal = memoryMap[nextIdx]; + if ((nextVal & 3) != ST_ALLOCATED) { + return allocateSubpage(capacity, nextIdx, nextVal); + } + return -1; + } + + void free(long handle) { + int memoryMapIdx = (int) handle; + int bitmapIdx = (int) (handle >>> 32); + + int val = memoryMap[memoryMapIdx]; + int state = val & 3; + if (state == ST_ALLOCATED_SUBPAGE) { + assert bitmapIdx != 0; + PoolSubpage subpage = subpages[subpageIdx(memoryMapIdx)]; + assert subpage != null && subpage.doNotDestroy; + if (subpage.free(bitmapIdx & 0x3FFFFFFF)) { + return; + } + } else { + assert state == ST_ALLOCATED : "state: " + state; + assert bitmapIdx == 0; + } + + freeBytes += runLength(val); + + for (;;) { + //noinspection PointlessBitwiseExpression + memoryMap[memoryMapIdx] = val & ~3 | ST_UNUSED; + if (memoryMapIdx == 1) { + assert freeBytes == chunkSize; + return; + } + + if ((memoryMap[siblingIdx(memoryMapIdx)] & 3) != ST_UNUSED) { + break; + } + + memoryMapIdx = parentIdx(memoryMapIdx); + val = memoryMap[memoryMapIdx]; + } + } + + void initBuf(PooledByteBuf buf, long handle) { + int memoryMapIdx = (int) handle; + int bitmapIdx = (int) (handle >>> 32); + if (bitmapIdx == 0) { + int val = memoryMap[memoryMapIdx]; + assert (val & 3) == ST_ALLOCATED : String.valueOf(val & 3); + buf.init(this, handle, memory, runOffset(val), runLength(val)); + } else { + initBufWithSubpage(buf, handle, bitmapIdx); + } + } + + void initBufWithSubpage(PooledByteBuf buf, long handle) { + initBufWithSubpage(buf, handle, (int) (handle >>> 32)); + } + + private void initBufWithSubpage(PooledByteBuf buf, long handle, int bitmapIdx) { + assert bitmapIdx != 0; + + int memoryMapIdx = (int) handle; + int val = memoryMap[memoryMapIdx]; + assert (val & 3) == ST_ALLOCATED_SUBPAGE; + + PoolSubpage subpage = subpages[subpageIdx(memoryMapIdx)]; + assert subpage.doNotDestroy; + + buf.init( + this, handle, memory, + runOffset(val) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, subpage.elemSize); + } + + private static int parentIdx(int memoryMapIdx) { + return memoryMapIdx >>> 1; + } + + private static int siblingIdx(int memoryMapIdx) { + return memoryMapIdx ^ 1; + } + + private int runLength(int val) { + return (val >>> 2 & 0x7FFF) << pageShifts; + } + + private int runOffset(int val) { + return val >>> 17 << pageShifts; + } + + private int subpageIdx(int memoryMapIdx) { + return memoryMapIdx - maxSubpageAllocs; + } + + private int nextRandom() { + random = random * multiplier + addend & mask; + return (int) (random >>> 47) & 1; + } + + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("Chunk("); + buf.append(Integer.toHexString(System.identityHashCode(this))); + buf.append(": "); + buf.append(usage()); + buf.append("%, "); + buf.append(chunkSize - freeBytes); + buf.append('/'); + buf.append(chunkSize); + buf.append(')'); + return buf.toString(); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunkList.java b/buffer/src/main/java/io/netty/buffer/PoolChunkList.java new file mode 100644 index 0000000000..5b0d12aec8 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolChunkList.java @@ -0,0 +1,129 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import io.netty.util.internal.StringUtil; + +final class PoolChunkList { + private final PoolArena arena; + private final PoolChunkList nextList; + PoolChunkList prevList; + + private final int minUsage; + private final int maxUsage; + + private PoolChunk head; + + // TODO: Test if adding padding helps under contention + //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + PoolChunkList(PoolArena arena, PoolChunkList nextList, int minUsage, int maxUsage) { + this.arena = arena; + this.nextList = nextList; + this.minUsage = minUsage; + this.maxUsage = maxUsage; + } + + boolean allocate(PooledByteBuf buf, int capacity) { + if (head == null) { + return false; + } + + for (PoolChunk cur = head;;) { + long handle = cur.allocate(capacity); + if (handle < 0) { + cur = cur.next; + if (cur == null) { + return false; + } + } else { + cur.initBuf(buf, handle); + if (cur.usage() >= maxUsage) { + remove(cur); + nextList.add(cur); + } + return true; + } + } + } + + void free(PoolChunk chunk, long handle) { + chunk.free(handle); + if (chunk.usage() < minUsage) { + remove(chunk); + if (prevList == null) { + assert chunk.usage() == 0; + arena.destroyChunk(chunk); + } else { + prevList.add(chunk); + } + } + } + + void add(PoolChunk chunk) { + if (chunk.usage() >= maxUsage) { + nextList.add(chunk); + return; + } + + chunk.parent = this; + if (head == null) { + head = chunk; + chunk.prev = null; + chunk.next = null; + } else { + chunk.prev = null; + chunk.next = head; + head.prev = chunk; + head = chunk; + } + } + + private void remove(PoolChunk cur) { + if (cur == head) { + head = cur.next; + if (head != null) { + head.prev = null; + } + } else { + PoolChunk next = cur.next; + cur.prev.next = next; + if (next != null) { + next.prev = cur.prev; + } + } + } + + @Override + public String toString() { + if (head == null) { + return "none"; + } + + StringBuilder buf = new StringBuilder(); + for (PoolChunk cur = head;;) { + buf.append(cur); + cur = cur.next; + if (cur == null) { + break; + } + buf.append(StringUtil.NEWLINE); + } + + return buf.toString(); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PoolSubpage.java b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java new file mode 100644 index 0000000000..3eac99c180 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java @@ -0,0 +1,155 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +final class PoolSubpage { + + final PoolChunk chunk; + final int memoryMapIdx; + final int runOffset; + final int pageSize; + final long[] bitmap; + + boolean doNotDestroy; + int elemSize; + int maxNumElems; + int nextAvail; + int bitmapLength; + int numAvail; + + // TODO: Test if adding padding helps under contention + //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + PoolSubpage(PoolChunk chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { + this.chunk = chunk; + this.memoryMapIdx = memoryMapIdx; + this.runOffset = runOffset; + this.pageSize = pageSize; + bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64 + init(elemSize); + } + + void init(int elemSize) { + doNotDestroy = true; + this.elemSize = elemSize; + if (elemSize == 0) { + return; + } + + maxNumElems = numAvail = pageSize / elemSize; + nextAvail = 0; + bitmapLength = maxNumElems >>> 6; + if ((maxNumElems & 63) != 0) { + bitmapLength ++; + } + + for (int i = 0; i < bitmapLength; i ++) { + bitmap[i] = 0; + } + } + + /** + * Returns the bitmap index of the subpage allocation. + */ + long allocate() { + if (elemSize == 0) { + return toHandle(0); + } + + if (numAvail == 0 || !doNotDestroy) { + return -1; + } + + final int bitmapIdx = nextAvail; + int q = bitmapIdx >>> 6; + int r = bitmapIdx & 63; + assert (bitmap[q] >>> r & 1) == 0; + bitmap[q] |= 1L << r; + + if (-- numAvail == 0) { + nextAvail = -1; + } else { + nextAvail = findNextAvailable(); + } + + return toHandle(bitmapIdx); + } + + /** + * @return {@code true} if this subpage is in use. + * {@code false} if this subpage is not used by its chunk and thus it's OK to be released. + */ + boolean free(int bitmapIdx) { + if (elemSize == 0) { + return true; + } + + int q = bitmapIdx >>> 6; + int r = bitmapIdx & 63; + assert (bitmap[q] >>> r & 1) != 0; + bitmap[q] ^= 1L << r; + + if (numAvail ++ == 0) { + nextAvail = bitmapIdx; + chunk.arena.addSubpage(this); + return true; + } + + if (numAvail < maxNumElems) { + return true; + } else { + doNotDestroy = false; + return false; + } + } + + private int findNextAvailable() { + int newNextAvail = -1; + loop: + for (int i = 0; i < bitmapLength; i ++) { + long bits = bitmap[i]; + if (~bits != 0) { + for (int j = 0; j < 64; j ++) { + if ((bits & 1) == 0) { + newNextAvail = i << 6 | j; + break loop; + } + bits >>>= 1; + } + } + } + + if (newNextAvail < maxNumElems) { + return newNextAvail; + } else { + return -1; + } + } + + private long toHandle(int bitmapIdx) { + return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx; + } + + public String toString() { + if (!doNotDestroy) { + return "(" + memoryMapIdx + ": not in use)"; + } + + return String.valueOf('(') + memoryMapIdx + ": " + (maxNumElems - numAvail) + '/' + maxNumElems + + ", offset: " + runOffset + ", length: " + pageSize + ", elemSize: " + elemSize + ')'; + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java new file mode 100644 index 0000000000..09ac499b04 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -0,0 +1,33 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import java.nio.ByteBuffer; + +final class PoolThreadCache { + + final PoolArena heapArena; + final PoolArena directArena; + + // TODO: Test if adding padding helps under contention + //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + PoolThreadCache(PoolArena heapArena, PoolArena directArena) { + this.heapArena = heapArena; + this.directArena = directArena; + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java new file mode 100644 index 0000000000..fe7e15f34f --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java @@ -0,0 +1,190 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import io.netty.buffer.ByteBuf.Unsafe; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayDeque; +import java.util.Queue; + +abstract class PooledByteBuf extends AbstractByteBuf implements Unsafe { + + protected PoolChunk chunk; + protected long handle; + protected T memory; + protected int offset; + protected int length; + + private ByteBuffer tmpNioBuf; + private Queue> suspendedDeallocations; + + protected PooledByteBuf(int maxCapacity) { + super(maxCapacity); + } + + void init(PoolChunk chunk, long handle, T memory, int offset, int length) { + assert handle >= 0; + this.chunk = chunk; + this.handle = handle; + this.memory = memory; + this.offset = offset; + this.length = length; + setIndex(0, 0); + tmpNioBuf = null; + } + + @Override + public int capacity() { + return length; + } + + @Override + public ByteBuf capacity(int newCapacity) { + assert !isFreed(); + + if (suspendedDeallocations == null) { + chunk.arena.reallocate(this, newCapacity, true); + } else { + Allocation old = new Allocation(chunk, handle); + chunk.arena.reallocate(this, newCapacity, false); + suspendedDeallocations.add(old); + } + return this; + } + + @Override + public ByteBufAllocator alloc() { + return chunk.arena.parent; + } + + @Override + public ByteOrder order() { + return ByteOrder.BIG_ENDIAN; + } + + @Override + public ByteBuf unwrap() { + return null; + } + + @Override + public Unsafe unsafe() { + return this; + } + + @Override + public ByteBuffer internalNioBuffer() { + ByteBuffer tmpNioBuf = this.tmpNioBuf; + if (tmpNioBuf == null) { + this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory); + } + return tmpNioBuf; + } + + @Override + public ByteBuffer[] internalNioBuffers() { + return new ByteBuffer[] { internalNioBuffer() }; + } + + protected abstract ByteBuffer newInternalNioBuffer(T memory); + + @Override + public void discardSomeReadBytes() { + final int readerIndex = readerIndex(); + if (readerIndex == writerIndex()) { + discardReadBytes(); + return; + } + + if (readerIndex > 0 && readerIndex >= capacity() >>> 1) { + discardReadBytes(); + } + } + + @Override + public void suspendIntermediaryDeallocations() { + if (suspendedDeallocations == null) { + suspendedDeallocations = new ArrayDeque>(2); + } + } + + @Override + public void resumeIntermediaryDeallocations() { + if (suspendedDeallocations == null) { + return; + } + + Queue> suspendedDeallocations = this.suspendedDeallocations; + this.suspendedDeallocations = null; + + if (suspendedDeallocations.isEmpty()) { + return; + } + + for (Allocation a: suspendedDeallocations) { + chunk.arena.free(a.chunk, a.handle); + } + } + + @Override + public boolean isFreed() { + return handle < 0; + } + + @Override + public void free() { + if (handle >= 0) { + final long handle = this.handle; + this.handle = -1; + memory = null; + resumeIntermediaryDeallocations(); + chunk.arena.free(chunk, handle); + } + } + + protected int idx(int index) { + return offset + index; + } + + protected void checkIndex(int index) { + assert !isFreed(); + if (index < 0 || index >= length) { + throw new IndexOutOfBoundsException(String.format( + "index: %d (expected: range(0, %d))", index, length)); + } + } + + protected void checkIndex(int index, int fieldLength) { + assert !isFreed(); + if (index < 0 || index > length - fieldLength) { + throw new IndexOutOfBoundsException(String.format( + "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, length)); + } + } + + private static final class Allocation { + final PoolChunk chunk; + final long handle; + + Allocation(PoolChunk chunk, long handle) { + this.chunk = chunk; + this.handle = handle; + } + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java new file mode 100644 index 0000000000..9d68f109a6 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -0,0 +1,157 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import io.netty.util.internal.StringUtil; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +public class PooledByteBufAllocator extends AbstractByteBufAllocator { + + private static final int DEFAULT_NUM_HEAP_ARENA = Runtime.getRuntime().availableProcessors(); + private static final int DEFAULT_NUM_DIRECT_ARENA = Runtime.getRuntime().availableProcessors(); + private static final int DEFAULT_PAGE_SIZE = 8192; + private static final int DEFAULT_MAX_ORDER = 11; // 8192 << 11 = 16 MiB per chunk + + private static final int MIN_PAGE_SIZE = 4096; + private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); + + public static final PooledByteBufAllocator DEFAULT = new PooledByteBufAllocator(); + + private final PoolArena[] heapArenas; + private final PoolArena[] directArenas; + + final ThreadLocal threadCache = new ThreadLocal() { + private final AtomicInteger index = new AtomicInteger(); + @Override + protected PoolThreadCache initialValue() { + int idx = Math.abs(index.getAndIncrement() % heapArenas.length); + return new PoolThreadCache(heapArenas[idx], directArenas[idx]); + } + }; + + public PooledByteBufAllocator() { + this(false); + } + + public PooledByteBufAllocator(boolean directByDefault) { + this(directByDefault, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER); + } + + public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { + this(false, nHeapArena, nDirectArena, pageSize, maxOrder); + } + + public PooledByteBufAllocator( + boolean directByDefault, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { + super(validateAndCalculateChunkSize(pageSize, maxOrder), directByDefault); + if (nHeapArena <= 0) { + throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: 1+)"); + } + if (nDirectArena <= 0) { + throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: 1+)"); + } + + int pageShifts = validateAndCalculatePageShifts(pageSize); + int chunkSize = bufferMaxCapacity(); + + heapArenas = newArenaArray(nHeapArena); + for (int i = 0; i < heapArenas.length; i ++) { + heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize); + } + + directArenas = newArenaArray(nDirectArena); + for (int i = 0; i < directArenas.length; i ++) { + directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize); + } + } + + @SuppressWarnings("unchecked") + private static PoolArena[] newArenaArray(int size) { + return new PoolArena[size]; + } + + private static int validateAndCalculatePageShifts(int pageSize) { + if (pageSize < MIN_PAGE_SIZE) { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: 4096+)"); + } + + // Ensure pageSize is power of 2. + boolean found1 = false; + int pageShifts = 0; + for (int i = pageSize; i != 0 ; i >>= 1) { + if ((i & 1) != 0) { + if (!found1) { + found1 = true; + } else { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2"); + } + } else { + if (!found1) { + pageShifts ++; + } + } + } + return pageShifts; + } + + private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) { + if (maxOrder > 14) { + throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)"); + } + + // Ensure the resulting chunkSize does not overflow. + int chunkSize = pageSize; + for (int i = maxOrder; i > 0; i --) { + if (chunkSize > MAX_CHUNK_SIZE / 2) { + throw new IllegalArgumentException(String.format( + "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE)); + } + chunkSize <<= 1; + } + return chunkSize; + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + PoolThreadCache cache = threadCache.get(); + return cache.heapArena.allocate(cache, initialCapacity, maxCapacity); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + PoolThreadCache cache = threadCache.get(); + return cache.directArena.allocate(cache, initialCapacity, maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return directBuffer(0); + } + + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(heapArenas.length); + buf.append(" arena(s):"); + buf.append(StringUtil.NEWLINE); + for (PoolArena a: heapArenas) { + buf.append(a); + } + return buf.toString(); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java new file mode 100644 index 0000000000..b2bb872127 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java @@ -0,0 +1,290 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +final class PooledDirectByteBuf extends PooledByteBuf { + + PooledDirectByteBuf(int maxCapacity) { + super(maxCapacity); + } + + @Override + protected ByteBuffer newInternalNioBuffer(ByteBuffer memory) { + return memory.duplicate(); + } + + @Override + public boolean isDirect() { + return true; + } + + @Override + public byte getByte(int index) { + checkIndex(index); + return memory.get(idx(index)); + } + + @Override + public short getShort(int index) { + checkIndex(index, 2); + return memory.getShort(idx(index)); + } + + @Override + public int getUnsignedMedium(int index) { + checkIndex(index, 3); + index = idx(index); + return (memory.get(index) & 0xff) << 16 | (memory.get(index + 1) & 0xff) << 8 | memory.get(index + 2) & 0xff; + } + + @Override + public int getInt(int index) { + checkIndex(index, 4); + return memory.getInt(idx(index)); + } + + @Override + public long getLong(int index) { + checkIndex(index, 8); + return memory.getLong(idx(index)); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + checkIndex(index, length); + if (dst instanceof PooledDirectByteBuf) { + PooledDirectByteBuf bbdst = (PooledDirectByteBuf) dst; + ByteBuffer data = bbdst.internalNioBuffer(); + dstIndex = bbdst.idx(dstIndex); + data.clear().position(dstIndex).limit(dstIndex + length); + getBytes(index, data); + } else if (dst.hasArray()) { + getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length); + } else { + dst.setBytes(dstIndex, this, index, length); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + checkIndex(index, length); + ByteBuffer tmpBuf = internalNioBuffer(); + index = idx(index); + tmpBuf.clear().position(index).limit(index + length); + tmpBuf.get(dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + checkIndex(index); + int bytesToCopy = Math.min(capacity() - index, dst.remaining()); + ByteBuffer tmpBuf = internalNioBuffer(); + index = idx(index); + tmpBuf.clear().position(index).limit(index + bytesToCopy); + dst.put(tmpBuf); + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + checkIndex(index, length); + if (length == 0) { + return this; + } + + byte[] tmp = new byte[length]; + ByteBuffer tmpBuf = internalNioBuffer(); + tmpBuf.clear().position(idx(index)); + tmpBuf.get(tmp); + out.write(tmp); + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + checkIndex(index, length); + if (length == 0) { + return 0; + } + + ByteBuffer tmpBuf = internalNioBuffer(); + index = idx(index); + tmpBuf.clear().position(index).limit(index + length); + return out.write(tmpBuf); + } + + @Override + public ByteBuf setByte(int index, int value) { + checkIndex(index); + memory.put(idx(index), (byte) value); + return this; + } + + @Override + public ByteBuf setShort(int index, int value) { + checkIndex(index, 2); + memory.putShort(idx(index), (short) value); + return this; + } + + @Override + public ByteBuf setMedium(int index, int value) { + checkIndex(index, 3); + index = idx(index); + memory.put(index, (byte) (value >>> 16)); + memory.put(index + 1, (byte) (value >>> 8)); + memory.put(index + 2, (byte) value); + return this; + } + + @Override + public ByteBuf setInt(int index, int value) { + checkIndex(index, 4); + memory.putInt(idx(index), value); + return this; + } + + @Override + public ByteBuf setLong(int index, long value) { + checkIndex(index, 8); + memory.putLong(idx(index), value); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + checkIndex(index, length); + if (src instanceof PooledDirectByteBuf) { + PooledDirectByteBuf bbsrc = (PooledDirectByteBuf) src; + ByteBuffer data = bbsrc.internalNioBuffer(); + srcIndex = bbsrc.idx(srcIndex); + data.clear().position(srcIndex).limit(srcIndex + length); + setBytes(index, data); + } else if (src.hasArray()) { + setBytes(index, src.array(), src.arrayOffset() + srcIndex, length); + } else { + src.getBytes(srcIndex, this, index, length); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + checkIndex(index, length); + ByteBuffer tmpBuf = internalNioBuffer(); + index = idx(index); + tmpBuf.clear().position(index).limit(index + length); + tmpBuf.put(src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + checkIndex(index); + ByteBuffer tmpBuf = internalNioBuffer(); + if (src == tmpBuf) { + src = src.duplicate(); + } + + index = idx(index); + tmpBuf.clear().position(index).limit(index + src.remaining()); + tmpBuf.put(src); + return this; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + checkIndex(index, length); + byte[] tmp = new byte[length]; + int readBytes = in.read(tmp); + if (readBytes <= 0) { + return readBytes; + } + ByteBuffer tmpNioBuf = internalNioBuffer(); + tmpNioBuf.clear().position(idx(index)); + tmpNioBuf.put(tmp, 0, readBytes); + return readBytes; + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + checkIndex(index, length); + ByteBuffer tmpNioBuf = internalNioBuffer(); + index = idx(index); + tmpNioBuf.clear().position(index).limit(index + length); + try { + return in.read(tmpNioBuf); + } catch (ClosedChannelException e) { + return -1; + } + } + + @Override + public ByteBuf copy(int index, int length) { + checkIndex(index, length); + ByteBuf copy = alloc().directBuffer(capacity(), maxCapacity()); + copy.writeBytes(this, index, length); + return copy(); + } + + @Override + public boolean hasNioBuffer() { + return true; + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + checkIndex(index, length); + index = idx(index); + return ((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)).slice(); + } + + @Override + public boolean hasNioBuffers() { + return true; + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return new ByteBuffer[] { nioBuffer(index, length) }; + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException("direct buffer"); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException("direct buffer"); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java new file mode 100644 index 0000000000..244ccd691d --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java @@ -0,0 +1,265 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file tothe License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +final class PooledHeapByteBuf extends PooledByteBuf { + + PooledHeapByteBuf(int maxCapacity) { + super(maxCapacity); + } + + @Override + public boolean isDirect() { + return false; + } + + @Override + public byte getByte(int index) { + checkIndex(index); + return memory[idx(index)]; + } + + @Override + public short getShort(int index) { + checkIndex(index, 2); + index = idx(index); + return (short) (memory[index] << 8 | memory[index + 1] & 0xFF); + } + + @Override + public int getUnsignedMedium(int index) { + checkIndex(index, 3); + index = idx(index); + return (memory[index] & 0xff) << 16 | + (memory[index + 1] & 0xff) << 8 | + memory[index + 2] & 0xff; + } + + @Override + public int getInt(int index) { + checkIndex(index, 4); + index = idx(index); + return (memory[index] & 0xff) << 24 | + (memory[index + 1] & 0xff) << 16 | + (memory[index + 2] & 0xff) << 8 | + memory[index + 3] & 0xff; + } + + @Override + public long getLong(int index) { + checkIndex(index, 8); + index = idx(index); + return ((long) memory[index] & 0xff) << 56 | + ((long) memory[index + 1] & 0xff) << 48 | + ((long) memory[index + 2] & 0xff) << 40 | + ((long) memory[index + 3] & 0xff) << 32 | + ((long) memory[index + 4] & 0xff) << 24 | + ((long) memory[index + 5] & 0xff) << 16 | + ((long) memory[index + 6] & 0xff) << 8 | + (long) memory[index + 7] & 0xff; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + checkIndex(index, length); + if (dst.hasArray()) { + getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length); + } else { + dst.setBytes(dstIndex, memory, idx(index), length); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + checkIndex(index, length); + System.arraycopy(memory, idx(index), dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + checkIndex(index); + dst.put(memory, idx(index), Math.min(capacity() - index, dst.remaining())); + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + checkIndex(index, length); + out.write(memory, idx(index), length); + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + checkIndex(index, length); + index = idx(index); + return out.write((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)); + } + + @Override + public ByteBuf setByte(int index, int value) { + checkIndex(index); + memory[idx(index)] = (byte) value; + return this; + } + + @Override + public ByteBuf setShort(int index, int value) { + checkIndex(index, 2); + index = idx(index); + memory[index] = (byte) (value >>> 8); + memory[index + 1] = (byte) value; + return this; + } + + @Override + public ByteBuf setMedium(int index, int value) { + checkIndex(index, 3); + index = idx(index); + memory[index] = (byte) (value >>> 16); + memory[index + 1] = (byte) (value >>> 8); + memory[index + 2] = (byte) value; + return this; + } + + @Override + public ByteBuf setInt(int index, int value) { + checkIndex(index, 4); + index = idx(index); + memory[index] = (byte) (value >>> 24); + memory[index + 1] = (byte) (value >>> 16); + memory[index + 2] = (byte) (value >>> 8); + memory[index + 3] = (byte) value; + return this; + } + + @Override + public ByteBuf setLong(int index, long value) { + checkIndex(index, 8); + index = idx(index); + memory[index] = (byte) (value >>> 56); + memory[index + 1] = (byte) (value >>> 48); + memory[index + 2] = (byte) (value >>> 40); + memory[index + 3] = (byte) (value >>> 32); + memory[index + 4] = (byte) (value >>> 24); + memory[index + 5] = (byte) (value >>> 16); + memory[index + 6] = (byte) (value >>> 8); + memory[index + 7] = (byte) value; + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + checkIndex(index, length); + if (src.hasArray()) { + setBytes(index, src.array(), src.arrayOffset() + srcIndex, length); + } else { + src.getBytes(srcIndex, memory, idx(index), length); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + checkIndex(index, length); + System.arraycopy(src, srcIndex, memory, idx(index), length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + int length = src.remaining(); + checkIndex(index, length); + src.get(memory, idx(index), length); + return this; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + checkIndex(index, length); + return in.read(memory, idx(index), length); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + checkIndex(index, length); + index = idx(index); + try { + return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)); + } catch (ClosedChannelException e) { + return -1; + } + } + + @Override + public ByteBuf copy(int index, int length) { + checkIndex(index, length); + ByteBuf copy = alloc().heapBuffer(length, maxCapacity()); + copy.writeBytes(memory, idx(index), length); + return copy(); + } + + @Override + public boolean hasNioBuffer() { + return true; + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + checkIndex(index, length); + index = idx(index); + return ((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)).slice(); + } + + @Override + public boolean hasNioBuffers() { + return true; + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return new ByteBuffer[] { nioBuffer(index, length) }; + } + + @Override + public boolean hasArray() { + return true; + } + + @Override + public byte[] array() { + return memory; + } + + @Override + public int arrayOffset() { + return offset; + } + + @Override + protected ByteBuffer newInternalNioBuffer(byte[] memory) { + return ByteBuffer.wrap(memory); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java index 64e9a8dfaf..04ef320603 100644 --- a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java @@ -31,7 +31,7 @@ import java.nio.channels.ScatteringByteChannel; * recommended to use {@link Unpooled#unmodifiableBuffer(ByteBuf)} * instead of calling the constructor explicitly. */ -final class ReadOnlyByteBuf extends AbstractByteBuf implements Unsafe { +public class ReadOnlyByteBuf extends AbstractByteBuf implements Unsafe { private final ByteBuf buffer; @@ -223,8 +223,8 @@ final class ReadOnlyByteBuf extends AbstractByteBuf implements Unsafe { } @Override - public ByteBuffer[] nioBuffers(int offset, int length) { - return buffer.nioBuffers(offset, length); + public ByteBuffer[] nioBuffers(int index, int length) { + return buffer.nioBuffers(index, length); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java index 2f6c43f854..f680bf4f04 100644 --- a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java @@ -32,7 +32,7 @@ import java.nio.channels.ScatteringByteChannel; * {@link ByteBuf#slice(int, int)} instead of calling the constructor * explicitly. */ -final class SlicedByteBuf extends AbstractByteBuf implements Unsafe { +public class SlicedByteBuf extends AbstractByteBuf implements Unsafe { private final ByteBuf buffer; private final int adjustment; diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index 8e058e8355..c77d28c94d 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -735,8 +735,8 @@ public final class SwappedByteBuf implements ByteBuf { } @Override - public ByteBuffer[] nioBuffers(int offset, int length) { - ByteBuffer[] nioBuffers = buf.nioBuffers(offset, length); + public ByteBuffer[] nioBuffers(int index, int length) { + ByteBuffer[] nioBuffers = buf.nioBuffers(index, length); for (int i = 0; i < nioBuffers.length; i++) { nioBuffers[i] = nioBuffers[i].order(order); } diff --git a/buffer/src/main/java/io/netty/buffer/Unpooled.java b/buffer/src/main/java/io/netty/buffer/Unpooled.java index 34cb203c72..8b707ed77d 100644 --- a/buffer/src/main/java/io/netty/buffer/Unpooled.java +++ b/buffer/src/main/java/io/netty/buffer/Unpooled.java @@ -276,7 +276,7 @@ public final class Unpooled { } if (!components.isEmpty()) { - return new DefaultCompositeByteBuf(ALLOC, maxNumComponents, components); + return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, components); } } @@ -300,7 +300,7 @@ public final class Unpooled { default: for (ByteBuf b: buffers) { if (b.readable()) { - return new DefaultCompositeByteBuf(ALLOC, maxNumComponents, buffers); + return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, buffers); } } } @@ -334,7 +334,7 @@ public final class Unpooled { } if (!components.isEmpty()) { - return new DefaultCompositeByteBuf(ALLOC, maxNumComponents, components); + return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents, components); } } @@ -352,7 +352,7 @@ public final class Unpooled { * Returns a new big-endian composite buffer with no components. */ public static CompositeByteBuf compositeBuffer(int maxNumComponents) { - return new DefaultCompositeByteBuf(ALLOC, maxNumComponents); + return new DefaultCompositeByteBuf(ALLOC, false, maxNumComponents); } /** diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java index 8965facb2c..894611658e 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java @@ -17,8 +17,6 @@ package io.netty.buffer; import io.netty.util.internal.DetectionUtil; -import java.util.concurrent.TimeUnit; - /** * Simplistic {@link ByteBufAllocator} implementation that does not pool anything. */ @@ -28,7 +26,7 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator { public static final UnpooledByteBufAllocator DIRECT_BY_DEFAULT = new UnpooledByteBufAllocator(true); private UnpooledByteBufAllocator(boolean directByDefault) { - super(directByDefault); + super(Integer.MAX_VALUE, directByDefault); } @Override @@ -44,30 +42,9 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator { @Override public ByteBuf ioBuffer() { if (DetectionUtil.canFreeDirectBuffer()) { - return directBuffer(); + return directBuffer(0); } - return heapBuffer(); - } - - @Override - public void shutdown() { - throw new IllegalStateException(getClass().getName() + " cannot be shut down."); - } - - @Override - public boolean isShutdown() { - return false; - } - - @Override - public boolean isTerminated() { - return false; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - Thread.sleep(unit.toMillis(timeout)); - return false; + return heapBuffer(0); } } diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java index bd356b4a30..abe5521002 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java @@ -54,7 +54,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe { CLEANER_FIELD = cleanerField; } - private static void freeDirect(ByteBuffer buffer) { + static void freeDirect(ByteBuffer buffer) { if (CLEANER_FIELD == null) { // Doomed to wait for GC. return; @@ -265,8 +265,8 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe { ByteBuffer data = bbdst.internalNioBuffer(); data.clear().position(dstIndex).limit(dstIndex + length); getBytes(index, data); - } else if (buffer.hasArray()) { - dst.setBytes(dstIndex, buffer.array(), index + buffer.arrayOffset(), length); + } else if (dst.hasArray()) { + getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length); } else { dst.setBytes(dstIndex, this, index, length); } @@ -417,9 +417,12 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe { } else { byte[] tmp = new byte[length]; int readBytes = in.read(tmp); + if (readBytes <= 0) { + return readBytes; + } ByteBuffer tmpNioBuf = internalNioBuffer(); tmpNioBuf.clear().position(index); - tmpNioBuf.put(tmp); + tmpNioBuf.put(tmp, 0, readBytes); return readBytes; } } @@ -457,7 +460,7 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf implements Unsafe { } @Override - public ByteBuffer[] nioBuffers(int offset, int length) { + public ByteBuffer[] nioBuffers(int index, int length) { throw new UnsupportedOperationException(); } diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java index 00ce55790d..6a146f524f 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java @@ -156,8 +156,8 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf implements Unsafe { @Override public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { assert !freed; - if (dst instanceof UnpooledHeapByteBuf) { - getBytes(index, ((UnpooledHeapByteBuf) dst).array, dstIndex, length); + if (dst.hasArray()) { + getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length); } else { dst.setBytes(dstIndex, array, index, length); } @@ -201,8 +201,8 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf implements Unsafe { @Override public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { assert !freed; - if (src instanceof UnpooledHeapByteBuf) { - setBytes(index, ((UnpooledHeapByteBuf) src).array, srcIndex, length); + if (src.hasArray()) { + setBytes(index, src.array(), src.arrayOffset() + srcIndex, length); } else { src.getBytes(srcIndex, array, index, length); } @@ -256,7 +256,7 @@ final class UnpooledHeapByteBuf extends AbstractByteBuf implements Unsafe { } @Override - public ByteBuffer[] nioBuffers(int offset, int length) { + public ByteBuffer[] nioBuffers(int index, int length) { throw new UnsupportedOperationException(); } diff --git a/microbench/pom.xml b/microbench/pom.xml new file mode 100644 index 0000000000..f5d2e805f9 --- /dev/null +++ b/microbench/pom.xml @@ -0,0 +1,98 @@ + + + + + 4.0.0 + + io.netty + netty-parent + 4.0.1.Alpha8-SNAPSHOT + + + netty-microbench + jar + + Netty/Microbench + + + + ${project.groupId} + netty-handler + ${project.version} + + + ${project.groupId} + netty-codec-http + ${project.version} + + + com.google.caliper + caliper + + + + + + + maven-deploy-plugin + + true + + + + maven-antrun-plugin + + + upload-caliper-reports + deploy + + run + + + false + + + + + + + + + + + + +No .caliperrc file found; not uploading the benchmark report. +Please follow the instructions at: + + * http://code.google.com/p/caliper/wiki/OnlineResults + +to upload and browse the benchmark results. + + + + + + + + + + + + diff --git a/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java b/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java new file mode 100644 index 0000000000..9502f236e8 --- /dev/null +++ b/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java @@ -0,0 +1,98 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.microbench.buffer; + +import com.google.caliper.Param; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.microbench.util.DefaultBenchmark; + +import java.util.ArrayDeque; +import java.util.Deque; + +public class ByteBufAllocatorBenchmark extends DefaultBenchmark { + + private static final ByteBufAllocator POOLED_ALLOCATOR_HEAP = PooledByteBufAllocator.DEFAULT; + private static final ByteBufAllocator POOLED_ALLOCATOR_DIRECT = new PooledByteBufAllocator(true); + + @Param({"0", "256", "1024", "4096", "16384", "65536"}) + private int size; + + @Param + private Allocator allocator; + + private final Deque queue = new ArrayDeque(); + private ByteBufAllocator alloc; + + @Override + protected void setUp() throws Exception { + alloc = allocator.alloc(); + for (int i = 0; i < 2560; i ++) { + queue.add(alloc.buffer(size)); + } + } + + @Override + protected void tearDown() throws Exception { + for (ByteBuf b: queue) { + b.unsafe().free(); + } + queue.clear(); + } + + public void timeAllocAndFree(int reps) { + final ByteBufAllocator alloc = this.alloc; + final Deque queue = this.queue; + final int size = this.size; + + for (int i = 0; i < reps; i ++) { + queue.add(alloc.buffer(size)); + queue.removeFirst().unsafe().free(); + } + } + + public enum Allocator { + UNPOOLED_HEAP { + @Override + ByteBufAllocator alloc() { + return UnpooledByteBufAllocator.HEAP_BY_DEFAULT; + } + }, + UNPOOLED_DIRECT { + @Override + ByteBufAllocator alloc() { + return UnpooledByteBufAllocator.DIRECT_BY_DEFAULT; + } + }, + POOLED_HEAP { + @Override + ByteBufAllocator alloc() { + return POOLED_ALLOCATOR_HEAP; + } + }, + POOLED_DIRECT { + @Override + ByteBufAllocator alloc() { + return POOLED_ALLOCATOR_DIRECT; + } + }; + + abstract ByteBufAllocator alloc(); + } +} diff --git a/microbench/src/test/java/io/netty/microbench/util/DefaultBenchmark.java b/microbench/src/test/java/io/netty/microbench/util/DefaultBenchmark.java new file mode 100644 index 0000000000..2089cce42b --- /dev/null +++ b/microbench/src/test/java/io/netty/microbench/util/DefaultBenchmark.java @@ -0,0 +1,98 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.microbench.util; + +import com.google.caliper.Runner; +import com.google.caliper.SimpleBenchmark; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.*; + +public abstract class DefaultBenchmark extends SimpleBenchmark { + + private final int trials; + private final int warmupMillis; + private final int runMillis; + + protected DefaultBenchmark() { + this(1); + } + + protected DefaultBenchmark(int trials) { + this(trials, 3000, 1000); + } + + protected DefaultBenchmark(int trials, int warmupMillis, int runMillis) { + this.trials = trials; + this.warmupMillis = warmupMillis; + this.runMillis = runMillis; + } + + @Test + public void runBenchmarks() throws Exception { + File me = new File(DefaultBenchmark.class.getResource( + '/' + DefaultBenchmark.class.getName().replace('.', '/') + ".class").getPath()); + + if (!me.exists()) { + fail("failed to determine the project path"); + } + + File buildDir = + me.getParentFile().getParentFile().getParentFile().getParentFile().getParentFile().getParentFile(); + + if (!buildDir.getPath().endsWith(File.separator + "target") || !buildDir.isDirectory()) { + fail("failed to locate the build directory"); + } + + File reportDir = new File(buildDir.getAbsolutePath() + File.separator + "caliper-reports"); + + if (!reportDir.exists()) { + if (!reportDir.mkdirs()) { + fail("failed to create the Caliper report directory: " + reportDir.getAbsolutePath()); + } + } + + if (!reportDir.isDirectory()) { + fail("not a directory: " + reportDir.getAbsolutePath()); + } + + deleteOldReports(reportDir); + + new Runner().run( + "--trials", String.valueOf(trials), + "--warmupMillis", String.valueOf(warmupMillis), + "--runMillis", String.valueOf(runMillis), + "--captureVmLog", + "--saveResults", reportDir.getAbsolutePath(), + getClass().getName()); + } + + private void deleteOldReports(File reportDir) { + final String prefix = getClass().getName() + '.'; + final String suffix = ".json"; + for (File f: reportDir.listFiles()) { + String name = f.getName(); + if (name.startsWith(prefix) && name.endsWith(suffix)) { + if (f.delete()) { + System.out.println(" Deleted old report: " + name.substring(prefix.length(), name.length() - suffix.length())); + } + } + } + } +} diff --git a/pom.xml b/pom.xml index 7767d1fdc5..77ec9e7c72 100644 --- a/pom.xml +++ b/pom.xml @@ -187,6 +187,14 @@ ${jboss.marshalling.version} test + + + + com.google.caliper + caliper + 0.5-rc1 + test + @@ -222,7 +230,6 @@ 1.6.6 test - @@ -338,7 +345,10 @@ maven-surefire-plugin 2.12 - once + + **/*Test*.java + **/*Benchmark*.java + **/Abstract* **/TestUtil* @@ -347,7 +357,11 @@ -server -dsa -da -ea:io.netty... - -XX:+AggressiveOpts -XX:+UseFastAccessorMethods + -XX:+AggressiveOpts + -XX:+TieredCompilation + -XX:+UseBiasedLocking + -XX:+UseFastAccessorMethods + -XX:+UseStringCache -XX:+OptimizeStringConcat -XX:+HeapDumpOnOutOfMemoryError @@ -425,9 +439,21 @@ 2.2 - org.codehaus.mojo - build-helper-maven-plugin + maven-antrun-plugin 1.7 + + + ant-contrib + ant-contrib + 1.0b3 + + + ant + ant + + + + org.codehaus.mojo diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java index 5df1cd5c8a..097ac4cab7 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java @@ -15,7 +15,6 @@ */ package io.netty.testsuite.transport.socket; -import static org.junit.Assert.*; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; @@ -26,12 +25,13 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; +import org.junit.Test; import java.io.IOException; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; +import static org.junit.Assert.*; public class SocketObjectEchoTest extends AbstractSocketTest { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 328fb897da..f1592cc0a0 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -16,7 +16,7 @@ package io.netty.channel; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.socket.SocketChannelConfig; import java.util.IdentityHashMap; @@ -30,7 +30,7 @@ import static io.netty.channel.ChannelOption.*; */ public class DefaultChannelConfig implements ChannelConfig { - private static final ByteBufAllocator DEFAULT_ALLOCATOR = UnpooledByteBufAllocator.HEAP_BY_DEFAULT; + private static final ByteBufAllocator DEFAULT_ALLOCATOR = PooledByteBufAllocator.DEFAULT; private static final int DEFAULT_CONNECT_TIMEOUT = 30000; private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR; diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index e429152802..f534801e60 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -1252,7 +1252,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements if (out.writerIndex() > out.maxCapacity() - data.readableBytes()) { // The target buffer is not going to be able to accept all data in the bridge. - out.ensureWritableBytes(out.maxCapacity() - out.writerIndex()); + out.capacity(out.maxCapacity()); out.writeBytes(data, out.writableBytes()); } else { exchangeBuf.remove(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 8820081417..831cad2e19 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -527,7 +527,6 @@ public class LocalTransportThreadModelTest { ByteBuf in = ctx.inboundByteBuffer(); MessageBuf out = ctx.nextInboundMessageBuffer(); - while (in.readableBytes() >= 4) { int msg = in.readInt(); int expected = inCnt ++;