Fix numerous bugs in the ByteBufAdaptor

This commit is contained in:
Chris Vest 2021-03-09 12:04:57 +01:00
parent 45074e4749
commit da70f29ff4
3 changed files with 125 additions and 41 deletions

View File

@ -38,10 +38,12 @@ import java.util.concurrent.atomic.AtomicReference;
public final class ByteBufAdaptor extends ByteBuf {
private final ByteBufAllocatorAdaptor alloc;
private final Buffer buffer;
private final boolean hasMemoryAddress;
public ByteBufAdaptor(ByteBufAllocatorAdaptor alloc, Buffer buffer) {
this.alloc = alloc;
this.buffer = buffer;
hasMemoryAddress = buffer.nativeAddress() != 0;
}
/**
@ -70,7 +72,14 @@ public final class ByteBufAdaptor extends ByteBuf {
public ByteBuf capacity(int newCapacity) {
int diff = newCapacity - capacity() - buffer.writableBytes();
if (diff > 0) {
try {
buffer.ensureWritable(diff);
} catch (IllegalStateException e) {
if (!buffer.isOwned()) {
throw new UnsupportedOperationException(e);
}
throw e;
}
}
return this;
}
@ -103,7 +112,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public boolean isDirect() {
return buffer.nativeAddress() != 0;
return hasMemoryAddress;
}
@Override
@ -186,6 +195,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf discardReadBytes() {
checkAccess();
buffer.compact();
return this;
}
@ -197,9 +207,10 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
try {
checkAccess();
if (writableBytes() < minWritableBytes) {
int borrows = buffer.countBorrows();
try {
if (borrows == 0) {
// Good place.
buffer.ensureWritable(minWritableBytes);
@ -212,9 +223,9 @@ public final class ByteBufAdaptor extends ByteBuf {
retain(borrows);
}
}
} catch (IllegalArgumentException e) {
throw new IndexOutOfBoundsException(e.getMessage());
}
} catch (IllegalStateException e) {
throw new IllegalReferenceCountException(e);
}
return this;
}
@ -453,6 +464,9 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
if (index < 0 || capacity() < index + length) {
throw new IndexOutOfBoundsException();
}
for (int i = 0; i < length; i++) {
dst[dstIndex + i] = getByte(index + i);
}
@ -636,9 +650,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setBytes(int index, ByteBuf src) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
while (src.isReadable() && index < capacity()) {
setByte(index++, src.readByte());
}
@ -647,6 +659,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf setBytes(int index, ByteBuf src, int length) {
checkAccess();
for (int i = 0; i < length; i++) {
setByte(index + i, src.readByte());
}
@ -685,6 +698,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
checkAccess();
byte[] bytes = in.readNBytes(length);
setBytes(index, bytes, 0, length);
return bytes.length;
@ -692,6 +706,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkAccess();
ByteBuffer transfer = ByteBuffer.allocate(length);
int bytes = in.read(transfer);
transfer.flip();
@ -701,6 +716,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
checkAccess();
ByteBuffer transfer = ByteBuffer.allocate(length);
int bytes = in.read(transfer, position);
transfer.flip();
@ -925,8 +941,9 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf readBytes(int length) {
Buffer copy = preferredBufferAllocator().allocate(length);
buffer.copyInto(0, copy, 0, length);
return wrap(copy);
buffer.copyInto(0, copy, readerIndex(), length);
readerIndex(readerIndex() + length);
return wrap(copy).writerIndex(length);
}
@Override
@ -1262,6 +1279,9 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf writeZero(int length) {
if (length < 0) {
throw new IllegalArgumentException();
}
ensureWritable(length);
for (int i = 0; i < length; i++) {
writeByte(0);
@ -1278,7 +1298,32 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
for (int i = fromIndex; i < toIndex; i++) {
if (!buffer.isAccessible()) {
return -1;
}
int inc, start, end;
if (fromIndex <= toIndex) {
inc = 1;
start = fromIndex;
end = toIndex - 1;
if (start < 0) {
start = 0; // Required to pass regression tests.
}
if (capacity() <= end) {
throw new IndexOutOfBoundsException();
}
} else {
inc = -1;
start = fromIndex - 1;
end = toIndex;
if (capacity() <= start) {
start = capacity() - 1; // Required to pass regression tests.
}
if (end < 0) {
throw new IndexOutOfBoundsException();
}
}
for (int i = start; i != end; i += inc) {
if (getByte(i) == value) {
return i;
}
@ -1307,9 +1352,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int forEachByte(ByteProcessor processor) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
int index = readerIndex();
int bytes = buffer.openCursor().process(processor);
return bytes == -1 ? -1 : index + bytes;
@ -1317,18 +1360,14 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int forEachByte(int index, int length, ByteProcessor processor) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
int bytes = buffer.openCursor(index, length).process(processor);
return bytes == -1 ? -1 : index + bytes;
}
@Override
public int forEachByteDesc(ByteProcessor processor) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
int index = readerIndex();
int bytes = buffer.openReverseCursor().process(processor);
return bytes == -1 ? -1 : index - bytes;
@ -1336,9 +1375,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public int forEachByteDesc(int index, int length, ByteProcessor processor) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
int bytes = buffer.openReverseCursor(index, length).process(processor);
return bytes == -1 ? -1 : index - bytes;
}
@ -1350,9 +1387,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf copy(int index, int length) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
try {
BufferAllocator allocator = preferredBufferAllocator();
Buffer copy = allocator.allocate(length);
@ -1374,9 +1409,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf retainedSlice() {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
return wrap(buffer.slice());
}
@ -1389,6 +1422,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf retainedSlice(int index, int length) {
checkAccess();
try {
return wrap(buffer.slice(index, length));
} catch (IllegalStateException e) {
@ -1405,7 +1439,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf retainedDuplicate() {
return retainedSlice(0, capacity());
return retainedSlice(0, capacity()).setIndex(readerIndex(), writerIndex());
}
@Override
@ -1420,9 +1454,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuffer nioBuffer(int index, int length) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
ByteBuffer copy = isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
while (index < length) {
copy.put(getByte(index++));
@ -1432,9 +1464,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuffer internalNioBuffer(int index, int length) {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
checkAccess();
if (readerIndex() <= index && index < writerIndex() && length <= readableBytes()) {
// We wish to read from the internal buffer.
if (buffer.countReadableComponents() != 1) {
@ -1504,7 +1534,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public boolean hasMemoryAddress() {
return buffer.nativeAddress() != 0;
return hasMemoryAddress;
}
@Override
@ -1612,6 +1642,12 @@ public final class ByteBufAdaptor extends ByteBuf {
return !buffer.isAccessible();
}
private void checkAccess() {
if (!buffer.isAccessible()) {
throw new IllegalReferenceCountException();
}
}
private ByteBufAdaptor wrap(Buffer copy) {
return new ByteBufAdaptor(alloc, copy);
}

View File

@ -20,6 +20,8 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.api.BufferAllocator;
import static java.nio.ByteOrder.BIG_ENDIAN;
public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable {
private final BufferAllocator onheap;
private final BufferAllocator offheap;
@ -53,7 +55,7 @@ public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable
@Override
public ByteBuf buffer(int initialCapacity) {
return new ByteBufAdaptor(this, onheap.allocate(initialCapacity));
return new ByteBufAdaptor(this, onheap.allocate(initialCapacity).order(BIG_ENDIAN));
}
@Override
@ -98,7 +100,7 @@ public class ByteBufAllocatorAdaptor implements ByteBufAllocator, AutoCloseable
@Override
public ByteBuf directBuffer(int initialCapacity) {
return new ByteBufAdaptor(this, offheap.allocate(initialCapacity));
return new ByteBufAdaptor(this, offheap.allocate(initialCapacity).order(BIG_ENDIAN));
}
@Override

View File

@ -1,9 +1,25 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.adaptor;
import io.netty.buffer.AbstractByteBufTest;
import io.netty.buffer.ByteBuf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
public class ByteBufAdaptorTest extends AbstractByteBufTest {
static ByteBufAllocatorAdaptor alloc;
@ -22,4 +38,34 @@ public class ByteBufAdaptorTest extends AbstractByteBufTest {
protected ByteBuf newBuffer(int capacity, int maxCapacity) {
return alloc.buffer(capacity, capacity);
}
@Ignore("new buffers not thread-safe like this")
@Override
public void testSliceReadGatheringByteChannelMultipleThreads() throws Exception {
}
@Ignore("new buffers not thread-safe like this")
@Override
public void testDuplicateReadGatheringByteChannelMultipleThreads() throws Exception {
}
@Ignore("new buffers not thread-safe like this")
@Override
public void testSliceReadOutputStreamMultipleThreads() throws Exception {
}
@Ignore("new buffers not thread-safe like this")
@Override
public void testDuplicateReadOutputStreamMultipleThreads() throws Exception {
}
@Ignore("new buffers not thread-safe like this")
@Override
public void testSliceBytesInArrayMultipleThreads() throws Exception {
}
@Ignore("new buffers not thread-safe like this")
@Override
public void testDuplicateBytesInArrayMultipleThreads() throws Exception {
}
}