From da70f29ff4504419083926bd72f555c0208014f4 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 9 Mar 2021 12:04:57 +0100 Subject: [PATCH] Fix numerous bugs in the ByteBufAdaptor --- .../buffer/api/adaptor/ByteBufAdaptor.java | 114 ++++++++++++------ .../api/adaptor/ByteBufAllocatorAdaptor.java | 6 +- .../api/adaptor/ByteBufAdaptorTest.java | 46 +++++++ 3 files changed, 125 insertions(+), 41 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java index 52563c3..4c8cb0c 100644 --- a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAdaptor.java @@ -38,10 +38,12 @@ import java.util.concurrent.atomic.AtomicReference; public final class ByteBufAdaptor extends ByteBuf { private final ByteBufAllocatorAdaptor alloc; private final Buffer buffer; + private final boolean hasMemoryAddress; public ByteBufAdaptor(ByteBufAllocatorAdaptor alloc, Buffer buffer) { this.alloc = alloc; this.buffer = buffer; + hasMemoryAddress = buffer.nativeAddress() != 0; } /** @@ -70,7 +72,14 @@ public final class ByteBufAdaptor extends ByteBuf { public ByteBuf capacity(int newCapacity) { int diff = newCapacity - capacity() - buffer.writableBytes(); if (diff > 0) { - buffer.ensureWritable(diff); + try { + buffer.ensureWritable(diff); + } catch (IllegalStateException e) { + if (!buffer.isOwned()) { + throw new UnsupportedOperationException(e); + } + throw e; + } } return this; } @@ -103,7 +112,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public boolean isDirect() { - return buffer.nativeAddress() != 0; + return hasMemoryAddress; } @Override @@ -186,6 +195,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf discardReadBytes() { + checkAccess(); buffer.compact(); return this; } @@ -197,9 +207,10 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf ensureWritable(int minWritableBytes) { - try { - if (writableBytes() < minWritableBytes) { - int borrows = buffer.countBorrows(); + checkAccess(); + if (writableBytes() < minWritableBytes) { + int borrows = buffer.countBorrows(); + try { if (borrows == 0) { // Good place. buffer.ensureWritable(minWritableBytes); @@ -212,9 +223,9 @@ public final class ByteBufAdaptor extends ByteBuf { retain(borrows); } } + } catch (IllegalArgumentException e) { + throw new IndexOutOfBoundsException(e.getMessage()); } - } catch (IllegalStateException e) { - throw new IllegalReferenceCountException(e); } return this; } @@ -453,6 +464,9 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + if (index < 0 || capacity() < index + length) { + throw new IndexOutOfBoundsException(); + } for (int i = 0; i < length; i++) { dst[dstIndex + i] = getByte(index + i); } @@ -636,9 +650,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf setBytes(int index, ByteBuf src) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); while (src.isReadable() && index < capacity()) { setByte(index++, src.readByte()); } @@ -647,6 +659,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf setBytes(int index, ByteBuf src, int length) { + checkAccess(); for (int i = 0; i < length; i++) { setByte(index + i, src.readByte()); } @@ -685,6 +698,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public int setBytes(int index, InputStream in, int length) throws IOException { + checkAccess(); byte[] bytes = in.readNBytes(length); setBytes(index, bytes, 0, length); return bytes.length; @@ -692,6 +706,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + checkAccess(); ByteBuffer transfer = ByteBuffer.allocate(length); int bytes = in.read(transfer); transfer.flip(); @@ -701,6 +716,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + checkAccess(); ByteBuffer transfer = ByteBuffer.allocate(length); int bytes = in.read(transfer, position); transfer.flip(); @@ -925,8 +941,9 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf readBytes(int length) { Buffer copy = preferredBufferAllocator().allocate(length); - buffer.copyInto(0, copy, 0, length); - return wrap(copy); + buffer.copyInto(0, copy, readerIndex(), length); + readerIndex(readerIndex() + length); + return wrap(copy).writerIndex(length); } @Override @@ -1262,6 +1279,9 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf writeZero(int length) { + if (length < 0) { + throw new IllegalArgumentException(); + } ensureWritable(length); for (int i = 0; i < length; i++) { writeByte(0); @@ -1278,7 +1298,32 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public int indexOf(int fromIndex, int toIndex, byte value) { - for (int i = fromIndex; i < toIndex; i++) { + if (!buffer.isAccessible()) { + return -1; + } + int inc, start, end; + if (fromIndex <= toIndex) { + inc = 1; + start = fromIndex; + end = toIndex - 1; + if (start < 0) { + start = 0; // Required to pass regression tests. + } + if (capacity() <= end) { + throw new IndexOutOfBoundsException(); + } + } else { + inc = -1; + start = fromIndex - 1; + end = toIndex; + if (capacity() <= start) { + start = capacity() - 1; // Required to pass regression tests. + } + if (end < 0) { + throw new IndexOutOfBoundsException(); + } + } + for (int i = start; i != end; i += inc) { if (getByte(i) == value) { return i; } @@ -1307,9 +1352,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public int forEachByte(ByteProcessor processor) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); int index = readerIndex(); int bytes = buffer.openCursor().process(processor); return bytes == -1 ? -1 : index + bytes; @@ -1317,18 +1360,14 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public int forEachByte(int index, int length, ByteProcessor processor) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); int bytes = buffer.openCursor(index, length).process(processor); return bytes == -1 ? -1 : index + bytes; } @Override public int forEachByteDesc(ByteProcessor processor) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); int index = readerIndex(); int bytes = buffer.openReverseCursor().process(processor); return bytes == -1 ? -1 : index - bytes; @@ -1336,9 +1375,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public int forEachByteDesc(int index, int length, ByteProcessor processor) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); int bytes = buffer.openReverseCursor(index, length).process(processor); return bytes == -1 ? -1 : index - bytes; } @@ -1350,9 +1387,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf copy(int index, int length) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); try { BufferAllocator allocator = preferredBufferAllocator(); Buffer copy = allocator.allocate(length); @@ -1374,9 +1409,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf retainedSlice() { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); return wrap(buffer.slice()); } @@ -1389,6 +1422,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf retainedSlice(int index, int length) { + checkAccess(); try { return wrap(buffer.slice(index, length)); } catch (IllegalStateException e) { @@ -1405,7 +1439,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuf retainedDuplicate() { - return retainedSlice(0, capacity()); + return retainedSlice(0, capacity()).setIndex(readerIndex(), writerIndex()); } @Override @@ -1420,9 +1454,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuffer nioBuffer(int index, int length) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); ByteBuffer copy = isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length); while (index < length) { copy.put(getByte(index++)); @@ -1432,9 +1464,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public ByteBuffer internalNioBuffer(int index, int length) { - if (!buffer.isAccessible()) { - throw new IllegalReferenceCountException(); - } + checkAccess(); if (readerIndex() <= index && index < writerIndex() && length <= readableBytes()) { // We wish to read from the internal buffer. if (buffer.countReadableComponents() != 1) { @@ -1504,7 +1534,7 @@ public final class ByteBufAdaptor extends ByteBuf { @Override public boolean hasMemoryAddress() { - return buffer.nativeAddress() != 0; + return hasMemoryAddress; } @Override @@ -1612,6 +1642,12 @@ public final class ByteBufAdaptor extends ByteBuf { return !buffer.isAccessible(); } + private void checkAccess() { + if (!buffer.isAccessible()) { + throw new IllegalReferenceCountException(); + } + } + private ByteBufAdaptor wrap(Buffer copy) { return new ByteBufAdaptor(alloc, copy); } diff --git a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java index 22e4d0d..bd65fca 100644 --- a/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java +++ b/src/main/java/io/netty/buffer/api/adaptor/ByteBufAllocatorAdaptor.java @@ -20,6 +20,8 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.api.BufferAllocator; +import static java.nio.ByteOrder.BIG_ENDIAN; + public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable { private final BufferAllocator onheap; private final BufferAllocator offheap; @@ -53,7 +55,7 @@ public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable @Override public ByteBuf buffer(int initialCapacity) { - return new ByteBufAdaptor(this, onheap.allocate(initialCapacity)); + return new ByteBufAdaptor(this, onheap.allocate(initialCapacity).order(BIG_ENDIAN)); } @Override @@ -98,7 +100,7 @@ public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable @Override public ByteBuf directBuffer(int initialCapacity) { - return new ByteBufAdaptor(this, offheap.allocate(initialCapacity)); + return new ByteBufAdaptor(this, offheap.allocate(initialCapacity).order(BIG_ENDIAN)); } @Override diff --git a/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java b/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java index 92d873e..48a753c 100644 --- a/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java +++ b/src/test/java/io/netty/buffer/api/adaptor/ByteBufAdaptorTest.java @@ -1,9 +1,25 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ package io.netty.buffer.api.adaptor; import io.netty.buffer.AbstractByteBufTest; import io.netty.buffer.ByteBuf; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; public class ByteBufAdaptorTest extends AbstractByteBufTest { static ByteBufAllocatorAdaptor alloc; @@ -22,4 +38,34 @@ public class ByteBufAdaptorTest extends AbstractByteBufTest { protected ByteBuf newBuffer(int capacity, int maxCapacity) { return alloc.buffer(capacity, capacity); } + + @Ignore("new buffers not thread-safe like this") + @Override + public void testSliceReadGatheringByteChannelMultipleThreads() throws Exception { + } + + @Ignore("new buffers not thread-safe like this") + @Override + public void testDuplicateReadGatheringByteChannelMultipleThreads() throws Exception { + } + + @Ignore("new buffers not thread-safe like this") + @Override + public void testSliceReadOutputStreamMultipleThreads() throws Exception { + } + + @Ignore("new buffers not thread-safe like this") + @Override + public void testDuplicateReadOutputStreamMultipleThreads() throws Exception { + } + + @Ignore("new buffers not thread-safe like this") + @Override + public void testSliceBytesInArrayMultipleThreads() throws Exception { + } + + @Ignore("new buffers not thread-safe like this") + @Override + public void testDuplicateBytesInArrayMultipleThreads() throws Exception { + } }