Merge pull request #18 from netty/compact

Buffer Compaction
This commit is contained in:
Chris Vest 2020-12-17 15:18:37 +01:00 committed by GitHub
commit 6697840a34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 236 additions and 36 deletions

View File

@ -367,12 +367,53 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then it will be expanded using the {@link Allocator}
* the buffer was created with.
* This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is
* {@code false}.
*
* @param size The requested number of bytes of space that should be available for writing.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #countBorrows()} is not {@code 0}.
* That is, if {@link #isOwned()} is {@code false}.
*/
void ensureWritable(int size);
default void ensureWritable(int size) {
ensureWritable(size, true);
}
/**
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
* bytes.
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then space will be made available in one or all of
* the following available ways:
*
* <ul>
* <li>
* If {@code allowCompaction} is {@code true}, and sum of the read and writable bytes would be enough to
* satisfy the request, and it (depending on the buffer implementation) seems faster and easier to compact
* the existing buffer rather than allocation a new buffer, then the requested bytes will be made available
* that way. The compaction will not necessarily work the same way as the {@link #compact()} method, as the
* implementation may be able to make the requested bytes available with less effort than is strictly
* mandated by the {@link #compact()} method.
* </li>
* <li>
* Regardless of the value of the {@code allowCompaction}, the implementation may make more space available
* by just allocating more or larger buffers. This allocation would use the same {@link Allocator} that this
* buffer was created with.
* </li>
* <li>
* If {@code allowCompaction} is {@code true}, then the implementation may choose to do a combination of
* compaction and allocation.
* </li>
* </ul>
*
* @param size The requested number of bytes of space that should be available for writing.
* @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #isOwned()} is {@code false}.
*/
void ensureWritable(int size, boolean allowCompaction);
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
@ -417,4 +458,11 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
*/
Buf bifurcate();
/**
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
*
* The buffer must be {@linkplain #isOwned() owned}, or an exception will be thrown.
*/
void compact();
}

View File

@ -20,6 +20,9 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import static jdk.incubator.foreign.MemoryAccess.setByteAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
@ -520,20 +523,54 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
@Override
public void ensureWritable(int size) {
public void ensureWritable(int size, boolean allowCompaction) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
if (writableBytes() >= size) {
// We already have enough space.
return;
}
if (allowCompaction && size <= roff) {
// Let's see if we can solve some or all of the requested size with compaction.
// We always compact as much as is possible, regardless of size. This amortizes our work.
int compactableBuffers = 0;
for (Buf buf : bufs) {
if (buf.capacity() != buf.readerOffset()) {
break;
}
compactableBuffers++;
}
if (compactableBuffers > 0) {
Buf[] compactable;
if (compactableBuffers < bufs.length) {
compactable = new Buf[compactableBuffers];
System.arraycopy(bufs, 0, compactable, 0, compactable.length);
System.arraycopy(bufs, compactable.length, bufs, 0, bufs.length - compactable.length);
System.arraycopy(compactable, 0, bufs, bufs.length - compactable.length, compactable.length);
} else {
compactable = bufs;
}
for (Buf buf : compactable) {
buf.reset();
}
computeBufferOffsets();
if (writableBytes() >= size) {
// Now we have enough space.
return;
}
}
}
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
}
void extendWith(Buf extension) {
@ -600,6 +637,35 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
@Override
public void compact() {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
int distance = roff;
if (distance == 0) {
return;
}
int pos = 0;
var oldOrder = order;
order = ByteOrder.BIG_ENDIAN;
try {
var cursor = openCursor();
while (cursor.readLong()) {
setLong(pos, cursor.getLong());
pos += Long.BYTES;
}
while (cursor.readByte()) {
setByte(pos, cursor.getByte());
pos++;
}
} finally {
order = oldOrder;
}
readerOffset(0);
writerOffset(woff - distance);
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {

View File

@ -323,38 +323,50 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
@Override
public void ensureWritable(int size) {
public void ensureWritable(int size, boolean allowCompaction) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + size - (long) writableBytes();
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
newSegment.copyFrom(seg);
// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}
seg = newSegment;
drop.attach(this);
if (writableBytes() >= size) {
// We already have enough space.
return;
}
if (allowCompaction && writableBytes() + readerOffset() >= size) {
// We can solve this with compaction.
compact();
return;
}
// Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes();
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
// Copy contents.
newSegment.copyFrom(seg);
// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}
seg = newSegment;
drop.attach(this);
}
@Override
@ -384,6 +396,20 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
return bifurcatedBuf;
}
@Override
public void compact() {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
int distance = roff;
if (distance == 0) {
return;
}
seg.copyFrom(seg.asSlice(roff, woff - roff));
roff -= distance;
woff -= distance;
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
@Override
public byte readByte() {

View File

@ -383,7 +383,7 @@ public class BufTest {
}
}
private void verifyInaccessible(Buf buf) {
private static void verifyInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.readByte());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
assertThrows(IllegalStateException.class, () -> buf.readChar());
@ -1689,6 +1689,29 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(64)) {
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
while (buf.readableBytes() > 0) {
buf.readByte();
}
buf.ensureWritable(4, true);
buf.writeInt(42);
assertThat(buf.capacity()).isEqualTo(64);
buf.writerOffset(60).readerOffset(60);
buf.ensureWritable(8, true);
buf.writeLong(42);
// Don't assert the capacity on this one, because single-component
// composite buffers may choose to allocate rather than compact.
}
}
@ParameterizedTest
@MethodSource("allocators")
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
@ -2139,7 +2162,7 @@ public class BufTest {
}
}
private void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
private static void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
try (Buf a = buf.bifurcate()) {
a.ensureWritable(4);
buf.ensureWritable(4);
@ -2195,6 +2218,43 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void compactMustDiscardReadBytes(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16, ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L).writeInt(0x090A0B0C);
assertEquals(0x01020304, buf.readInt());
assertEquals(12, buf.writerOffset());
assertEquals(4, buf.readerOffset());
assertEquals(4, buf.writableBytes());
assertEquals(8, buf.readableBytes());
assertEquals(16, buf.capacity());
buf.compact();
assertEquals(8, buf.writerOffset());
assertEquals(0, buf.readerOffset());
assertEquals(8, buf.writableBytes());
assertEquals(8, buf.readableBytes());
assertEquals(16, buf.capacity());
assertEquals(0x05060708090A0B0CL, buf.readLong());
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void compactMustThrowForUnownedBuffer(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8, ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
assertEquals((byte) 0x01, buf.readByte());
try (Buf ignore = buf.acquire()) {
assertThrows(IllegalStateException.class, () -> buf.compact());
assertEquals(1, buf.readerOffset());
}
assertEquals((byte) 0x02, buf.readByte());
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")