Align slice sendability of composite buffers with that of non-composite buffers

This means we no longer need to have tests that are parameterised over non-sliced buffers.
This commit is contained in:
Chris Vest 2021-03-16 17:22:41 +01:00
parent d40989da78
commit de305bd6b9
3 changed files with 59 additions and 104 deletions

View File

@ -48,7 +48,6 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
private final BufferAllocator allocator;
private final TornBufferAccessors tornBufAccessors;
private final boolean isSendable;
private Buffer[] bufs;
private int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts.
private int capacity;
@ -60,7 +59,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
private boolean readOnly;
CompositeBuffer(BufferAllocator allocator, Deref<Buffer>[] refs) {
this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false);
this(allocator, filterExternalBufs(refs), COMPOSITE_DROP, false);
}
private static Buffer[] filterExternalBufs(Deref<Buffer>[] refs) {
@ -114,11 +113,10 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
return Stream.of(buf);
}
private CompositeBuffer(BufferAllocator allocator, boolean isSendable, Buffer[] bufs, Drop<CompositeBuffer> drop,
private CompositeBuffer(BufferAllocator allocator, Buffer[] bufs, Drop<CompositeBuffer> drop,
boolean acquireBufs) {
super(drop);
this.allocator = allocator;
this.isSendable = isSendable;
if (acquireBufs) {
for (Buffer buf : bufs) {
buf.acquire();
@ -305,46 +303,31 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
offset + ", and length was " + length + '.');
}
Buffer choice = (Buffer) chooseBuffer(offset, 0);
Buffer[] slices = null;
acquire(); // Increase reference count of the original composite buffer.
Drop<CompositeBuffer> drop = obj -> {
close(); // Decrement the reference count of the original composite buffer.
COMPOSITE_DROP.drop(obj);
};
Buffer[] slices;
try {
if (length > 0) {
slices = new Buffer[bufs.length];
int off = subOffset;
int cap = length;
int i;
for (i = searchOffsets(offset); cap > 0; i++) {
var buf = bufs[i];
int avail = buf.capacity() - off;
slices[i] = buf.slice(off, Math.min(cap, avail));
cap -= avail;
off = 0;
}
slices = Arrays.copyOf(slices, i);
} else {
// Specialize for length == 0, since we must slice from at least one constituent buffer.
slices = new Buffer[] { choice.slice(subOffset, 0) };
}
return new CompositeBuffer(allocator, false, slices, drop, true);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
throw throwable;
} finally {
if (slices != null) {
for (Buffer slice : slices) {
if (slice != null) {
slice.close(); // Ownership now transfers to the composite buffer.
}
}
if (length > 0) {
slices = new Buffer[bufs.length];
int off = subOffset;
int cap = length;
int i;
for (i = searchOffsets(offset); cap > 0; i++) {
var buf = bufs[i];
int avail = buf.capacity() - off;
slices[i] = buf.slice(off, Math.min(cap, avail));
cap -= avail;
off = 0;
}
slices = Arrays.copyOf(slices, i);
} else {
// Specialize for length == 0, since we must slice from at least one constituent buffer.
slices = new Buffer[] { choice.slice(subOffset, 0) };
}
// Use the constructor that skips filtering out empty buffers, and skips acquiring on the buffers.
// This is important because 1) slice() already acquired the buffers, and 2) if this slice is empty
// then we need to keep holding on to it to prevent this originating composite buffer from getting
// ownership. If it did, its behaviour would be inconsistent with that of a non-composite buffer.
return new CompositeBuffer(allocator, slices, COMPOSITE_DROP, false);
}
@Override
@ -757,7 +740,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
if (bufs.length == 0) {
// Bifurcating a zero-length buffer is trivial.
return new CompositeBuffer(allocator, true, bufs, unsafeGetDrop(), true).order(order);
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
}
int i = searchOffsets(woff);
@ -769,7 +752,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
computeBufferOffsets();
try {
var compositeBuf = new CompositeBuffer(allocator, true, bifs, unsafeGetDrop(), true);
var compositeBuf = new CompositeBuffer(allocator, bifs, unsafeGetDrop(), true);
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
return compositeBuf;
} finally {
@ -1172,7 +1155,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive();
}
var composite = new CompositeBuffer(allocator, true, received, drop, true);
var composite = new CompositeBuffer(allocator, received, drop, true);
composite.readOnly = readOnly;
drop.attach(composite);
return composite;
@ -1187,18 +1170,9 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
closed = true;
}
@Override
protected IllegalStateException notSendableException() {
if (!isSendable) {
return new IllegalStateException(
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
}
return super.notSendableException();
}
@Override
public boolean isOwned() {
return isSendable && super.isOwned() && allConstituentsAreOwned();
return super.isOwned() && allConstituentsAreOwned();
}
private boolean allConstituentsAreOwned() {

View File

@ -15,8 +15,6 @@
*/
package io.netty.buffer.api.memseg;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import jdk.incubator.foreign.MemorySegment;
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@ -29,15 +27,4 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
protected MemorySegment createSegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
@Override
public Drop<Buffer> drop() {
return convert(buf -> buf.makeInaccessible());
}
@SuppressWarnings({ "unchecked", "UnnecessaryLocalVariable" })
private static Drop<Buffer> convert(Drop<MemSegBuffer> drop) {
Drop<?> tmp = drop;
return (Drop<Buffer>) tmp;
}
}

View File

@ -63,8 +63,6 @@ public class BufferTest {
private static final Memoize<Fixture[]> ALL_COMBINATIONS = new Memoize<>(
() -> fixtureCombinations().toArray(Fixture[]::new));
private static final Memoize<Fixture[]> NON_SLICED = new Memoize<>(
() -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> !f.isSlice()).toArray(Fixture[]::new));
private static final Memoize<Fixture[]> NON_COMPOSITE = new Memoize<>(
() -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> !f.isComposite()).toArray(Fixture[]::new));
private static final Memoize<Fixture[]> HEAP_ALLOCS = new Memoize<>(
@ -78,10 +76,6 @@ public class BufferTest {
return ALL_COMBINATIONS.get();
}
static Fixture[] nonSliceAllocators() {
return NON_SLICED.get();
}
static Fixture[] nonCompositeAllocators() {
return NON_COMPOSITE.get();
}
@ -349,7 +343,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
void allocateAndSendToThread(Fixture fixture) throws Exception {
try (BufferAllocator allocator = fixture.createAllocator()) {
ArrayBlockingQueue<Send<Buffer>> queue = new ArrayBlockingQueue<>(10);
@ -369,7 +363,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
void allocateAndSendToThreadViaSyncQueue(Fixture fixture) throws Exception {
SynchronousQueue<Send<Buffer>> queue = new SynchronousQueue<>();
Future<Byte> future = executor.submit(() -> {
@ -388,7 +382,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
void sendMustThrowWhenBufIsAcquired(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -403,7 +397,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer orig = allocator.allocate(24)) {
@ -505,7 +499,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void cannotSendMoreThanOnce(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -822,7 +816,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
void sendOnSliceWithoutOffsetAndSizeMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -837,7 +831,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
void sendOnSliceWithOffsetAndSizeMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -1782,7 +1776,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableMustThrowForNegativeSize(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -1791,7 +1785,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(512)) {
@ -1800,7 +1794,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableMustNotThrowWhenSpaceIsAlreadyAvailable(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -1811,7 +1805,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableMustExpandBufferCapacity(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -1846,7 +1840,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void mustBeAbleToSliceAfterEnsureWritable(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(4)) {
@ -1861,7 +1855,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableOnCompositeBuffersMustRespectExistingBigEndianByteOrder(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
@ -1878,7 +1872,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteOrder(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
@ -1895,7 +1889,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(64)) {
@ -2220,7 +2214,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -2233,7 +2227,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) {
@ -2269,7 +2263,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void bifurcatedPartsMustBeIndividuallySendable(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) {
@ -2298,7 +2292,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void mustBePossibleToBifurcateMoreThanOnce(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN)) {
@ -2326,7 +2320,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8).order(BIG_ENDIAN)) {
@ -2344,7 +2338,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableOnBifurcatedBuffers(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -2363,7 +2357,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableOnBifurcatedBuffersWithOddOffsets(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(10).order(BIG_ENDIAN)) {
@ -2412,7 +2406,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void bifurcatedBuffersMustBeAccessibleInOtherThreads(Fixture fixture) throws Exception {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -2432,7 +2426,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws Exception {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16)) {
@ -2456,7 +2450,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void compactMustDiscardReadBytes(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16, BIG_ENDIAN)) {
@ -2478,7 +2472,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void compactMustThrowForUnownedBuffer(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8, BIG_ENDIAN)) {
@ -2593,7 +2587,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void readOnlyBufferMustRemainReadOnlyAfterSend(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -2658,7 +2652,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void bifurcateOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16)) {
@ -2710,7 +2704,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void compactOnReadOnlyBufferMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -2720,7 +2714,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void ensureWritableOnReadOnlyBufferMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
@ -2730,7 +2724,7 @@ public class BufferTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("allocators")
public void copyIntoOnReadOnlyBufferMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer dest = allocator.allocate(8)) {