diff --git a/buffer/src/main/java/io/netty/buffer/FixedCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/FixedCompositeByteBuf.java new file mode 100644 index 0000000000..816c0b41de --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/FixedCompositeByteBuf.java @@ -0,0 +1,572 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.EmptyArrays; +import io.netty.util.internal.RecyclableArrayList; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.ReadOnlyBufferException; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.util.Collections; + +/** + * {@link ByteBuf} implementation which allows to wrap an array of {@link ByteBuf} in a read-only mode. + * This is useful to write an array of {@link ByteBuf}s. + */ +final class FixedCompositeByteBuf extends AbstractReferenceCountedByteBuf { + private static final ByteBuf[] EMPTY = { Unpooled.EMPTY_BUFFER }; + private final int nioBufferCount; + private final int capacity; + private final ByteBufAllocator allocator; + private final ByteOrder order; + private final Object[] buffers; + private final boolean direct; + + FixedCompositeByteBuf(ByteBufAllocator allocator, ByteBuf... buffers) { + super(Integer.MAX_VALUE); + if (buffers.length == 0) { + this.buffers = EMPTY; + order = ByteOrder.BIG_ENDIAN; + nioBufferCount = 1; + capacity = 0; + direct = buffers[0].isDirect(); + } else { + ByteBuf b = buffers[0]; + this.buffers = new Object[buffers.length]; + this.buffers[0] = b; + boolean direct = true; + int nioBufferCount = b.nioBufferCount(); + int capacity = b.readableBytes(); + order = b.order(); + for (int i = 1; i < buffers.length; i++) { + b = buffers[i]; + if (buffers[i].order() != order) { + throw new IllegalArgumentException("All ByteBufs need to have same ByteOrder"); + } + nioBufferCount += b.nioBufferCount(); + capacity += b.readableBytes(); + if (!b.isDirect()) { + direct = false; + } + this.buffers[i] = b; + } + this.nioBufferCount = nioBufferCount; + this.capacity = capacity; + this.direct = direct; + } + setIndex(0, capacity()); + this.allocator = allocator; + } + + @Override + public boolean isWritable() { + return false; + } + + @Override + public boolean isWritable(int size) { + return false; + } + + @Override + public ByteBuf discardReadBytes() { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setByte(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + protected void _setByte(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setShort(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + protected void _setShort(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setMedium(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + protected void _setMedium(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setInt(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + protected void _setInt(int index, int value) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBuf setLong(int index, long value) { + throw new ReadOnlyBufferException(); + } + + @Override + protected void _setLong(int index, long value) { + throw new ReadOnlyBufferException(); + } + + @Override + public int setBytes(int index, InputStream in, int length) { + throw new ReadOnlyBufferException(); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) { + throw new ReadOnlyBufferException(); + } + + @Override + public int capacity() { + return capacity; + } + + @Override + public int maxCapacity() { + return capacity; + } + + @Override + public ByteBuf capacity(int newCapacity) { + throw new ReadOnlyBufferException(); + } + + @Override + public ByteBufAllocator alloc() { + return allocator; + } + + @Override + public ByteOrder order() { + return order; + } + + @Override + public ByteBuf unwrap() { + return null; + } + + @Override + public boolean isDirect() { + return direct; + } + + private Component findComponent(int index) { + int readable = 0; + for (int i = 0 ; i < buffers.length; i++) { + Component comp = null; + ByteBuf b; + Object obj = buffers[i]; + boolean isBuffer; + if (obj instanceof ByteBuf) { + b = (ByteBuf) obj; + isBuffer = true; + } else { + comp = (Component) obj; + b = comp.buf; + isBuffer = false; + } + readable += b.readableBytes(); + if (index < readable) { + if (isBuffer) { + // Create a new component ad store ti in the array so it not create a new object + // on the next access. + comp = new Component(i, readable - b.readableBytes(), b); + buffers[i] = comp; + } + return comp; + } + } + throw new IllegalStateException(); + } + + /** + * Return the {@link ByteBuf} stored at the given index of the array. + */ + private ByteBuf buffer(int i) { + Object obj = buffers[i]; + if (obj instanceof ByteBuf) { + return (ByteBuf) obj; + } + return ((Component) obj).buf; + } + + @Override + public byte getByte(int index) { + return _getByte(index); + } + + @Override + protected byte _getByte(int index) { + Component c = findComponent(index); + return c.buf.getByte(index - c.offset); + } + + @Override + protected short _getShort(int index) { + Component c = findComponent(index); + if (index + 2 <= c.endOffset) { + return c.buf.getShort(index - c.offset); + } else if (order() == ByteOrder.BIG_ENDIAN) { + return (short) ((_getByte(index) & 0xff) << 8 | _getByte(index + 1) & 0xff); + } else { + return (short) (_getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8); + } + } + + @Override + protected int _getUnsignedMedium(int index) { + Component c = findComponent(index); + if (index + 3 <= c.endOffset) { + return c.buf.getUnsignedMedium(index - c.offset); + } else if (order() == ByteOrder.BIG_ENDIAN) { + return (_getShort(index) & 0xffff) << 8 | _getByte(index + 2) & 0xff; + } else { + return _getShort(index) & 0xFFFF | (_getByte(index + 2) & 0xFF) << 16; + } + } + + @Override + protected int _getInt(int index) { + Component c = findComponent(index); + if (index + 4 <= c.endOffset) { + return c.buf.getInt(index - c.offset); + } else if (order() == ByteOrder.BIG_ENDIAN) { + return (_getShort(index) & 0xffff) << 16 | _getShort(index + 2) & 0xffff; + } else { + return _getShort(index) & 0xFFFF | (_getShort(index + 2) & 0xFFFF) << 16; + } + } + + @Override + protected long _getLong(int index) { + Component c = findComponent(index); + if (index + 8 <= c.endOffset) { + return c.buf.getLong(index - c.offset); + } else if (order() == ByteOrder.BIG_ENDIAN) { + return (_getInt(index) & 0xffffffffL) << 32 | _getInt(index + 4) & 0xffffffffL; + } else { + return _getInt(index) & 0xFFFFFFFFL | (_getInt(index + 4) & 0xFFFFFFFFL) << 32; + } + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + checkDstIndex(index, length, dstIndex, dst.length); + if (length == 0) { + return this; + } + + Component c = findComponent(index); + int i = c.index; + int adjustment = c.offset; + ByteBuf s = c.buf; + for (;;) { + int localLength = Math.min(length, s.readableBytes() - (index - adjustment)); + s.getBytes(index - adjustment, dst, dstIndex, localLength); + index += localLength; + dstIndex += localLength; + length -= localLength; + adjustment += s.readableBytes(); + if (length <= 0) { + break; + } + s = buffer(++i); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + int limit = dst.limit(); + int length = dst.remaining(); + + checkIndex(index, length); + if (length == 0) { + return this; + } + + try { + Component c = findComponent(index); + int i = c.index; + int adjustment = c.offset; + ByteBuf s = c.buf; + for (;;) { + int localLength = Math.min(length, s.capacity() - (index - adjustment)); + dst.limit(dst.position() + localLength); + s.getBytes(index - adjustment, dst); + index += localLength; + length -= localLength; + adjustment += s.readableBytes(); + if (length <= 0) { + break; + } + s = buffer(++i); + } + } finally { + dst.limit(limit); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + checkDstIndex(index, length, dstIndex, dst.capacity()); + if (length == 0) { + return this; + } + + Component c = findComponent(index); + int i = c.index; + int adjustment = c.offset; + ByteBuf s = c.buf; + for (;;) { + int localLength = Math.min(length, s.capacity() - (index - adjustment)); + s.getBytes(index - adjustment, dst, dstIndex, localLength); + index += localLength; + dstIndex += localLength; + length -= localLength; + adjustment += s.readableBytes(); + if (length <= 0) { + break; + } + s = buffer(++i); + } + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) + throws IOException { + int count = nioBufferCount(); + if (count == 1) { + return out.write(internalNioBuffer(index, length)); + } else { + long writtenBytes = out.write(nioBuffers(index, length)); + if (writtenBytes > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) writtenBytes; + } + } + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + checkIndex(index, length); + if (length == 0) { + return this; + } + + Component c = findComponent(index); + int i = c.index; + int adjustment = c.offset; + ByteBuf s = c.buf; + for (;;) { + int localLength = Math.min(length, s.capacity() - (index - adjustment)); + s.getBytes(index - adjustment, out, localLength); + index += localLength; + length -= localLength; + adjustment += s.readableBytes(); + if (length <= 0) { + break; + } + s = buffer(++i); + } + return this; + } + + @Override + public ByteBuf copy(int index, int length) { + checkIndex(index, length); + boolean release = true; + ByteBuf buf = alloc().buffer(length); + try { + buf.writeBytes(this, index, length); + release = false; + return buf; + } finally { + if (release) { + buf.release(); + } + } + } + + @Override + public int nioBufferCount() { + return nioBufferCount; + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + checkIndex(index, length); + if (buffers.length == 1) { + ByteBuf buf = buffer(0); + if (buf.nioBufferCount() == 1) { + return buf.nioBuffer(index, length); + } + } + ByteBuffer merged = ByteBuffer.allocate(length).order(order()); + ByteBuffer[] buffers = nioBuffers(index, length); + + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < buffers.length; i++) { + merged.put(buffers[i]); + } + + merged.flip(); + return merged; + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + if (buffers.length == 1) { + return buffer(0).internalNioBuffer(index, length); + } + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + checkIndex(index, length); + if (length == 0) { + return EmptyArrays.EMPTY_BYTE_BUFFERS; + } + + RecyclableArrayList array = RecyclableArrayList.newInstance(buffers.length); + try { + Component c = findComponent(index); + int i = c.index; + int adjustment = c.offset; + ByteBuf s = c.buf; + for (;;) { + int localLength = Math.min(length, s.capacity() - (index - adjustment)); + switch (s.nioBufferCount()) { + case 0: + throw new UnsupportedOperationException(); + case 1: + array.add(s.nioBuffer(index - adjustment, localLength)); + break; + default: + Collections.addAll(array, s.nioBuffers(index - adjustment, localLength)); + } + + index += localLength; + length -= localLength; + adjustment += s.readableBytes(); + if (length <= 0) { + break; + } + s = buffer(++i); + } + + return array.toArray(new ByteBuffer[array.size()]); + } finally { + array.recycle(); + } + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMemoryAddress() { + return false; + } + + @Override + public long memoryAddress() { + throw new UnsupportedOperationException(); + } + + @Override + protected void deallocate() { + for (int i = 0; i < buffers.length; i++) { + buffer(i).release(); + } + } + + @Override + public String toString() { + String result = super.toString(); + result = result.substring(0, result.length() - 1); + return result + ", components=" + buffers.length + ')'; + } + + private static final class Component { + private final int index; + private final int offset; + private final ByteBuf buf; + private final int endOffset; + + Component(int index, int offset, ByteBuf buf) { + this.index = index; + this.offset = offset; + endOffset = offset + buf.readableBytes(); + this.buf = buf; + } + } +} diff --git a/buffer/src/main/java/io/netty/buffer/Unpooled.java b/buffer/src/main/java/io/netty/buffer/Unpooled.java index 589b6c3d9b..0c071071cb 100644 --- a/buffer/src/main/java/io/netty/buffer/Unpooled.java +++ b/buffer/src/main/java/io/netty/buffer/Unpooled.java @@ -853,6 +853,14 @@ public final class Unpooled { return new UnreleasableByteBuf(buf); } + /** + * Wrap the given {@link ByteBuf}s in an unmodifiable {@link ByteBuf}. Be aware the returned {@link ByteBuf} will + * not try to slice the given {@link ByteBuf}s to reduce GC-Pressure. + */ + public static ByteBuf unmodifiableBuffer(ByteBuf... buffers) { + return new FixedCompositeByteBuf(ALLOC, buffers); + } + private Unpooled() { // Unused } diff --git a/buffer/src/test/java/io/netty/buffer/FixedCompositeByteBufTest.java b/buffer/src/test/java/io/netty/buffer/FixedCompositeByteBufTest.java new file mode 100644 index 0000000000..03857551de --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/FixedCompositeByteBufTest.java @@ -0,0 +1,279 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + + +import org.junit.After; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ReadOnlyBufferException; +import java.nio.channels.ScatteringByteChannel; +import java.util.ArrayDeque; +import java.util.Queue; + +import static io.netty.buffer.Unpooled.buffer; +import static io.netty.buffer.Unpooled.compositeBuffer; +import static io.netty.buffer.Unpooled.directBuffer; +import static org.junit.Assert.assertArrayEquals; + +public class FixedCompositeByteBufTest { + + private static final Queue freeLaterQueue = new ArrayDeque(); + + protected static T freeLater(T buf) { + freeLaterQueue.add(buf); + return buf; + } + + @After + public void dispose() { + for (;;) { + ByteBuf buf = freeLaterQueue.poll(); + if (buf == null) { + break; + } + + if (buf.refCnt() > 0) { + buf.release(buf.refCnt()); + } + } + } + + private ByteBuf newBuffer(ByteBuf... buffers) { + return new FixedCompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, buffers); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetBoolean() { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setBoolean(0, true); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetByte() { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setByte(0, 1); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetBytesWithByteBuf() { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setBytes(0, Unpooled.wrappedBuffer(new byte[4])); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetBytesWithByteBuffer() { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setBytes(0, ByteBuffer.wrap(new byte[4])); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetBytesWithInputStream() throws IOException { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setBytes(0, new ByteArrayInputStream(new byte[4]), 4); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetBytesWithChannel() throws IOException { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setBytes(0, new ScatteringByteChannel() { + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return 0; + } + + @Override + public long read(ByteBuffer[] dsts) throws IOException { + return 0; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return 0; + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() throws IOException { + } + }, 4); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetChar() throws IOException { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setChar(0, 'b'); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetDouble() throws IOException { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setDouble(0, 1); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetFloat() throws IOException { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setFloat(0, 1); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetInt() throws IOException { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setInt(0, 1); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetLong() { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setLong(0, 1); + } + + @Test(expected = ReadOnlyBufferException.class) + public void testSetMedium() throws IOException { + ByteBuf buf = newBuffer(Unpooled.wrappedBuffer(new byte[8])); + buf.setMedium(0, 1); + } + + @Test + public void testGatheringWritesHeap() throws Exception { + testGatheringWrites(buffer(), buffer()); + } + + @Test + public void testGatheringWritesDirect() throws Exception { + testGatheringWrites(directBuffer(), directBuffer()); + } + + @Test + public void testGatheringWritesMixes() throws Exception { + testGatheringWrites(buffer(), directBuffer()); + } + + @Test + public void testGatheringWritesHeapPooled() throws Exception { + testGatheringWrites(PooledByteBufAllocator.DEFAULT.heapBuffer(), + PooledByteBufAllocator.DEFAULT.heapBuffer()); + } + + @Test + public void testGatheringWritesDirectPooled() throws Exception { + testGatheringWrites(PooledByteBufAllocator.DEFAULT.directBuffer(), + PooledByteBufAllocator.DEFAULT.directBuffer()); + } + + @Test + public void testGatheringWritesMixesPooled() throws Exception { + testGatheringWrites(PooledByteBufAllocator.DEFAULT.heapBuffer(), + PooledByteBufAllocator.DEFAULT.directBuffer()); + } + + private static void testGatheringWrites(ByteBuf buf1, ByteBuf buf2) throws Exception { + CompositeByteBuf buf = freeLater(compositeBuffer()); + buf.addComponent(buf1.writeBytes(new byte[]{1, 2})); + buf.addComponent(buf2.writeBytes(new byte[]{1, 2})); + buf.writerIndex(3); + buf.readerIndex(1); + + AbstractByteBufTest.TestGatheringByteChannel channel = new AbstractByteBufTest.TestGatheringByteChannel(); + + buf.readBytes(channel, 2); + + byte[] data = new byte[2]; + buf.getBytes(1, data); + assertArrayEquals(data, channel.writtenBytes()); + } + + @Test + public void testGatheringWritesPartialHeap() throws Exception { + testGatheringWritesPartial(buffer(), buffer()); + } + + @Test + public void testGatheringWritesPartialDirect() throws Exception { + testGatheringWritesPartial(directBuffer(), directBuffer()); + } + + @Test + public void testGatheringWritesPartialMixes() throws Exception { + testGatheringWritesPartial(buffer(), directBuffer()); + } + + @Test + public void testGatheringWritesPartialHeapPooled() throws Exception { + testGatheringWritesPartial(PooledByteBufAllocator.DEFAULT.heapBuffer(), + PooledByteBufAllocator.DEFAULT.heapBuffer()); + } + + @Test + public void testGatheringWritesPartialDirectPooled() throws Exception { + testGatheringWritesPartial(PooledByteBufAllocator.DEFAULT.directBuffer(), + PooledByteBufAllocator.DEFAULT.directBuffer()); + } + + @Test + public void testGatheringWritesPartialMixesPooled() throws Exception { + testGatheringWritesPartial(PooledByteBufAllocator.DEFAULT.heapBuffer(), + PooledByteBufAllocator.DEFAULT.directBuffer()); + } + + private void testGatheringWritesPartial(ByteBuf buf1, ByteBuf buf2) throws Exception { + buf1.writeBytes(new byte[]{1, 2, 3, 4}); + buf2.writeBytes(new byte[]{1, 2, 3, 4}); + ByteBuf buf = newBuffer(buf1, buf2); + AbstractByteBufTest.TestGatheringByteChannel channel = new AbstractByteBufTest.TestGatheringByteChannel(1); + + while (buf.isReadable()) { + buf.readBytes(channel, buf.readableBytes()); + } + + byte[] data = new byte[8]; + buf.getBytes(0, data); + assertArrayEquals(data, channel.writtenBytes()); + buf.release(); + } + + @Test + public void testGatheringWritesSingleHeap() throws Exception { + testGatheringWritesSingleBuf(buffer()); + } + + @Test + public void testGatheringWritesSingleDirect() throws Exception { + testGatheringWritesSingleBuf(directBuffer()); + } + + private void testGatheringWritesSingleBuf(ByteBuf buf1) throws Exception { + ByteBuf buf = newBuffer(buf1.writeBytes(new byte[]{1, 2, 3, 4})); + buf.readerIndex(1); + + AbstractByteBufTest.TestGatheringByteChannel channel = new AbstractByteBufTest.TestGatheringByteChannel(); + buf.readBytes(channel, 2); + + byte[] data = new byte[2]; + buf.getBytes(1, data); + assertArrayEquals(data, channel.writtenBytes()); + buf.release(); + } + +}