From 128403b492d8490d9fc770b7a2dad669433b2790 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Wed, 22 May 2019 11:11:24 -0700 Subject: [PATCH] Introduce ByteBuf.maxFastWritableBytes() method (#9086) Motivation ByteBuf capacity is automatically increased as needed up to maxCapacity when writing beyond the buffer's current capacity. However there's no way to tell in general whether such an increase will result in a relatively costly internal buffer re-allocation. For unpooled buffers it always does, in pooled cases it depends on the size of the associated chunk of allocated memory, which I don't think is currently exposed in any way. It would sometimes be useful to know where this limit is when making external decisions about whether to reuse or preemptively reallocate. It would also be advantageous to take this limit into account when auto-increasing the capacity during writes, to defer such reallocation until really necessary. Modifications Introduce new AbstractByteBuf.maxFastWritableBytes() method which will return a value >= writableBytes() and <= maxWritableBytes(). Make use of the new method in the sizing decision made by the AbstractByteBuf.ensureWritable(...) methods. Result Less reallocation/copying. --- .../java/io/netty/buffer/AbstractByteBuf.java | 19 +++++- .../main/java/io/netty/buffer/ByteBuf.java | 10 +++- .../java/io/netty/buffer/PooledByteBuf.java | 5 ++ .../java/io/netty/buffer/SwappedByteBuf.java | 5 ++ .../java/io/netty/buffer/WrappedByteBuf.java | 5 ++ .../netty/buffer/WrappedCompositeByteBuf.java | 5 ++ .../io/netty/buffer/AbstractByteBufTest.java | 12 ++++ .../buffer/AbstractPooledByteBufTest.java | 59 +++++++++++++++++++ 8 files changed, 117 insertions(+), 3 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java index ad2f793b1a..d3fd389dae 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java @@ -280,6 +280,7 @@ public abstract class AbstractByteBuf extends ByteBuf { if (minWritableBytes <= writableBytes()) { return; } + final int writerIndex = writerIndex(); if (checkBounds) { if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( @@ -289,7 +290,14 @@ public abstract class AbstractByteBuf extends ByteBuf { } // Normalize the current capacity to the power of 2. - int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); + int minNewCapacity = writerIndex + minWritableBytes; + int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity); + + int fastCapacity = writerIndex + maxFastWritableBytes(); + // Grow by a smaller amount if it will avoid reallocation + if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) { + newCapacity = fastCapacity; + } // Adjust to the new capacity. capacity(newCapacity); @@ -316,7 +324,14 @@ public abstract class AbstractByteBuf extends ByteBuf { } // Normalize the current capacity to the power of 2. - int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); + int minNewCapacity = writerIndex + minWritableBytes; + int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity); + + int fastCapacity = writerIndex + maxFastWritableBytes(); + // Grow by a smaller amount if it will avoid reallocation + if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) { + newCapacity = fastCapacity; + } // Adjust to the new capacity. capacity(newCapacity); diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index 83ecc6685a..41aab726ef 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -245,7 +245,6 @@ import java.nio.charset.UnsupportedCharsetException; * Please refer to {@link ByteBufInputStream} and * {@link ByteBufOutputStream}. */ -@SuppressWarnings("ClassMayBeInterface") public abstract class ByteBuf implements ReferenceCounted, Comparable { /** @@ -422,6 +421,15 @@ public abstract class ByteBuf implements ReferenceCounted, Comparable { */ public abstract int maxWritableBytes(); + /** + * Returns the maximum number of bytes which can be written for certain without involving + * an internal reallocation or data-copy. The returned value will be ≥ {@link #writableBytes()} + * and ≤ {@link #maxWritableBytes()}. + */ + public int maxFastWritableBytes() { + return writableBytes(); + } + /** * Returns {@code true} * if and only if {@code (this.writerIndex - this.readerIndex)} is greater diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java index 3b17087e11..3b42709a6e 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java @@ -82,6 +82,11 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { return length; } + @Override + public int maxFastWritableBytes() { + return Math.min(maxLength, maxCapacity()) - writerIndex; + } + @Override public final ByteBuf capacity(int newCapacity) { checkNewCapacity(newCapacity); diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index abf27663f7..556dcfadfe 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -151,6 +151,11 @@ public class SwappedByteBuf extends ByteBuf { return buf.maxWritableBytes(); } + @Override + public int maxFastWritableBytes() { + return buf.maxFastWritableBytes(); + } + @Override public boolean isReadable() { return buf.isReadable(); diff --git a/buffer/src/main/java/io/netty/buffer/WrappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/WrappedByteBuf.java index 33570e2004..ff602ab342 100644 --- a/buffer/src/main/java/io/netty/buffer/WrappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/WrappedByteBuf.java @@ -151,6 +151,11 @@ class WrappedByteBuf extends ByteBuf { return buf.maxWritableBytes(); } + @Override + public int maxFastWritableBytes() { + return buf.maxFastWritableBytes(); + } + @Override public final boolean isReadable() { return buf.isReadable(); diff --git a/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java index 2559519e8c..e58623a246 100644 --- a/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/WrappedCompositeByteBuf.java @@ -98,6 +98,11 @@ class WrappedCompositeByteBuf extends CompositeByteBuf { return wrapped.maxWritableBytes(); } + @Override + public int maxFastWritableBytes() { + return wrapped.maxFastWritableBytes(); + } + @Override public int ensureWritable(int minWritableBytes, boolean force) { return wrapped.ensureWritable(minWritableBytes, force); diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java index 59194ab374..1642f9dc20 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java @@ -4878,4 +4878,16 @@ public abstract class AbstractByteBufTest { buffer.release(); } } + + @Test + public void testMaxFastWritableBytes() { + ByteBuf buffer = newBuffer(150, 500).writerIndex(100); + assertEquals(50, buffer.writableBytes()); + assertEquals(150, buffer.capacity()); + assertEquals(500, buffer.maxCapacity()); + assertEquals(400, buffer.maxWritableBytes()); + // Default implementation has fast writable == writable + assertEquals(50, buffer.maxFastWritableBytes()); + buffer.release(); + } } diff --git a/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java index eb76157ab1..90f1bf4584 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java @@ -21,6 +21,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest { @@ -59,4 +61,61 @@ public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest { buf.release(); } } + + @Override + @Test + public void testMaxFastWritableBytes() { + ByteBuf buffer = newBuffer(150, 500).writerIndex(100); + assertEquals(50, buffer.writableBytes()); + assertEquals(150, buffer.capacity()); + assertEquals(500, buffer.maxCapacity()); + assertEquals(400, buffer.maxWritableBytes()); + + int chunkSize = pooledByteBuf(buffer).maxLength; + assertTrue(chunkSize >= 150); + int remainingInAlloc = Math.min(chunkSize - 100, 400); + assertEquals(remainingInAlloc, buffer.maxFastWritableBytes()); + + // write up to max, chunk alloc should not change (same handle) + long handleBefore = pooledByteBuf(buffer).handle; + buffer.writeBytes(new byte[remainingInAlloc]); + assertEquals(handleBefore, pooledByteBuf(buffer).handle); + + assertEquals(0, buffer.maxFastWritableBytes()); + // writing one more should trigger a reallocation (new handle) + buffer.writeByte(7); + assertNotEquals(handleBefore, pooledByteBuf(buffer).handle); + + // should not exceed maxCapacity even if chunk alloc does + buffer.capacity(500); + assertEquals(500 - buffer.writerIndex(), buffer.maxFastWritableBytes()); + buffer.release(); + } + + private static PooledByteBuf pooledByteBuf(ByteBuf buffer) { + // might need to unwrap if swapped (LE) and/or leak-aware-wrapped + while (!(buffer instanceof PooledByteBuf)) { + buffer = buffer.unwrap(); + } + return (PooledByteBuf) buffer; + } + + @Test + public void testEnsureWritableDoesntGrowTooMuch() { + ByteBuf buffer = newBuffer(150, 500).writerIndex(100); + + assertEquals(50, buffer.writableBytes()); + int fastWritable = buffer.maxFastWritableBytes(); + assertTrue(fastWritable > 50); + + long handleBefore = pooledByteBuf(buffer).handle; + + // capacity expansion should not cause reallocation + // (should grow precisely the specified amount) + buffer.ensureWritable(fastWritable); + assertEquals(handleBefore, pooledByteBuf(buffer).handle); + assertEquals(100 + fastWritable, buffer.capacity()); + assertEquals(buffer.writableBytes(), buffer.maxFastWritableBytes()); + buffer.release(); + } }