Add a flag to allow Buf.ensureWritable to compact buffer

Motivation:
The main use case with Buf.compact is in conjunction with ensureWritable.
It turns out we can get a simpler API, and faster methods, by combining those two operations, because it allows us to relax some guarantees and skip some steps in certain cases, which wouldn't be as neat or clean if they were two separate steps.

Modification:
Add a new Buf.ensureWritable method, which takes an allowCompaction argument.
In MemSegBuf, we can just delegate to compact() when applicable.
In CompositeBuf, we can sometimes get away with just reorganising the bufs array.

Result:
We can now do ensureWritable without allocating in some cases, and this can in particular make the operation faster for CompositeBuf.
This commit is contained in:
Chris Vest 2020-12-16 16:44:20 +01:00
parent 252ac38305
commit 4036dac84d
4 changed files with 146 additions and 36 deletions

View File

@ -367,12 +367,53 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then it will be expanded using the {@link Allocator}
* the buffer was created with.
* This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is
* {@code false}.
*
* @param size The requested number of bytes of space that should be available for writing.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #countBorrows()} is not {@code 0}.
* That is, if {@link #isOwned()} is {@code false}.
*/
void ensureWritable(int size);
default void ensureWritable(int size) {
ensureWritable(size, true);
}
/**
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
* bytes.
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then space will be made available in one or all of
* the following available ways:
*
* <ul>
* <li>
* If {@code allowCompaction} is {@code true}, and sum of the read and writable bytes would be enough to
* satisfy the request, and it (depending on the buffer implementation) seems faster and easier to compact
* the existing buffer rather than allocation a new buffer, then the requested bytes will be made available
* that way. The compaction will not necessarily work the same way as the {@link #compact()} method, as the
* implementation may be able to make the requested bytes available with less effort than is strictly
* mandated by the {@link #compact()} method.
* </li>
* <li>
* Regardless of the value of the {@code allowCompaction}, the implementation may make more space available
* by just allocating more or larger buffers. This allocation would use the same {@link Allocator} that this
* buffer was created with.
* </li>
* <li>
* If {@code allowCompaction} is {@code true}, then the implementation may choose to do a combination of
* compaction and allocation.
* </li>
* </ul>
*
* @param size The requested number of bytes of space that should be available for writing.
* @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #isOwned()} is {@code false}.
*/
void ensureWritable(int size, boolean allowCompaction);
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.

View File

@ -523,20 +523,54 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
@Override
public void ensureWritable(int size) {
public void ensureWritable(int size, boolean allowCompaction) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
if (writableBytes() >= size) {
// We already have enough space.
return;
}
if (allowCompaction && size <= roff) {
// Let's see if we can solve some or all of the requested size with compaction.
// We always compact as much as is possible, regardless of size. This amortizes our work.
int compactableBuffers = 0;
for (Buf buf : bufs) {
if (buf.capacity() != buf.readerOffset()) {
break;
}
compactableBuffers++;
}
if (compactableBuffers > 0) {
Buf[] compactable;
if (compactableBuffers < bufs.length) {
compactable = new Buf[compactableBuffers];
System.arraycopy(bufs, 0, compactable, 0, compactable.length);
System.arraycopy(bufs, compactable.length, bufs, 0, bufs.length - compactable.length);
System.arraycopy(compactable, 0, bufs, bufs.length - compactable.length, compactable.length);
} else {
compactable = bufs;
}
for (Buf buf : compactable) {
buf.reset();
}
computeBufferOffsets();
if (writableBytes() >= size) {
// Now we have enough space.
return;
}
}
}
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int growth = size - writableBytes();
Buf extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension);
}
void extendWith(Buf extension) {

View File

@ -323,38 +323,50 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
@Override
public void ensureWritable(int size) {
public void ensureWritable(int size, boolean allowCompaction) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + size - (long) writableBytes();
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
newSegment.copyFrom(seg);
// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}
seg = newSegment;
drop.attach(this);
if (writableBytes() >= size) {
// We already have enough space.
return;
}
if (allowCompaction && writableBytes() + readerOffset() >= size) {
// We can solve this with compaction.
compact();
return;
}
// Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes();
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
// Copy contents.
newSegment.copyFrom(seg);
// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}
seg = newSegment;
drop.attach(this);
}
@Override

View File

@ -383,7 +383,7 @@ public class BufTest {
}
}
private void verifyInaccessible(Buf buf) {
private static void verifyInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.readByte());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
assertThrows(IllegalStateException.class, () -> buf.readChar());
@ -1689,6 +1689,29 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableWithCompactionMustNotAllocateIfCompactionIsEnough(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(64)) {
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
while (buf.readableBytes() > 0) {
buf.readByte();
}
buf.ensureWritable(4, true);
buf.writeInt(42);
assertThat(buf.capacity()).isEqualTo(64);
buf.writerOffset(60).readerOffset(60);
buf.ensureWritable(8, true);
buf.writeLong(42);
// Don't assert the capacity on this one, because single-component
// composite buffers may choose to allocate rather than compact.
}
}
@ParameterizedTest
@MethodSource("allocators")
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
@ -2139,7 +2162,7 @@ public class BufTest {
}
}
private void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
private static void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
try (Buf a = buf.bifurcate()) {
a.ensureWritable(4);
buf.ensureWritable(4);