diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBuf.java index 2ed83ad6f7..a54e3af64f 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBuf.java @@ -37,10 +37,10 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf private final ByteBufAllocator alloc; private long memoryAddress; - private ByteBuffer buffer; private ByteBuffer tmpNioBuf; private int capacity; private boolean doNotFree; + ByteBuffer buffer; /** * Creates a new direct buffer. @@ -65,7 +65,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf } this.alloc = alloc; - setByteBuffer(allocateDirect(initialCapacity)); + setByteBuffer(allocateDirect(initialCapacity), false); } /** @@ -96,7 +96,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf this.alloc = alloc; doNotFree = true; - setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN)); + setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN), false); writerIndex(initialCapacity); } @@ -114,16 +114,17 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf PlatformDependent.freeDirectBuffer(buffer); } - private void setByteBuffer(ByteBuffer buffer) { - ByteBuffer oldBuffer = this.buffer; - if (oldBuffer != null) { - if (doNotFree) { - doNotFree = false; - } else { - freeDirect(oldBuffer); + final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { + if (tryFree) { + ByteBuffer oldBuffer = this.buffer; + if (oldBuffer != null) { + if (doNotFree) { + doNotFree = false; + } else { + freeDirect(oldBuffer); + } } } - this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null; @@ -158,7 +159,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf newBuffer.position(0).limit(oldBuffer.capacity()); newBuffer.put(oldBuffer); newBuffer.clear(); - setByteBuffer(newBuffer); + setByteBuffer(newBuffer, true); } else if (newCapacity < oldCapacity) { ByteBuffer oldBuffer = buffer; ByteBuffer newBuffer = allocateDirect(newCapacity); @@ -173,7 +174,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf } else { setIndex(newCapacity, newCapacity); } - setByteBuffer(newBuffer); + setByteBuffer(newBuffer, true); } return this; } diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java index 5011a1a520..98899a75be 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java @@ -34,4 +34,39 @@ final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByt protected void freeDirect(ByteBuffer buffer) { PlatformDependent.freeDirectNoCleaner(buffer); } + + @Override + public ByteBuf capacity(int newCapacity) { + ensureAccessible(); + if (newCapacity < 0 || newCapacity > maxCapacity()) { + throw new IllegalArgumentException("newCapacity: " + newCapacity); + } + + int readerIndex = readerIndex(); + int writerIndex = writerIndex(); + int oldCapacity = capacity(); + + if (newCapacity > oldCapacity) { + ByteBuffer oldBuffer = buffer; + ByteBuffer newBuffer = PlatformDependent.reallocateDirectNoCleaner(oldBuffer, newCapacity); + setByteBuffer(newBuffer, false); + } else if (newCapacity < oldCapacity) { + ByteBuffer oldBuffer = buffer; + ByteBuffer newBuffer = allocateDirect(newCapacity); + if (readerIndex < newCapacity) { + if (writerIndex > newCapacity) { + writerIndex = newCapacity; + writerIndex(writerIndex); + } + oldBuffer.position(readerIndex).limit(writerIndex); + newBuffer.position(readerIndex).limit(writerIndex); + newBuffer.put(oldBuffer); + newBuffer.clear(); + } else { + setIndex(newCapacity, newCapacity); + } + setByteBuffer(newBuffer, true); + } + return this; + } } diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index fb7ffef18e..214aba2f94 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -543,25 +543,29 @@ public final class PlatformDependent { public static ByteBuffer allocateDirectNoCleaner(int capacity) { assert USE_DIRECT_BUFFER_NO_CLEANER; - if (DIRECT_MEMORY_COUNTER != null) { - for (;;) { - long usedMemory = DIRECT_MEMORY_COUNTER.get(); - long newUsedMemory = usedMemory + capacity; - if (newUsedMemory > DIRECT_MEMORY_LIMIT) { - throw new OutOfDirectMemoryError("failed to allocate " + capacity - + " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')'); - } - if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) { - break; - } - } - } + incrementMemoryCounter(capacity); try { return PlatformDependent0.allocateDirectNoCleaner(capacity); } catch (Throwable e) { - if (DIRECT_MEMORY_COUNTER != null) { - DIRECT_MEMORY_COUNTER.addAndGet(-capacity); - } + decrementMemoryCounter(capacity); + throwException(e); + return null; + } + } + + /** + * Reallocate a new {@link ByteBuffer} with the given {@code capacity}. {@link ByteBuffer}s reallocated with + * this method MUST be deallocated via {@link #freeDirectNoCleaner(ByteBuffer)}. + */ + public static ByteBuffer reallocateDirectNoCleaner(ByteBuffer buffer, int capacity) { + assert USE_DIRECT_BUFFER_NO_CLEANER; + + int len = capacity - buffer.capacity(); + incrementMemoryCounter(len); + try { + return PlatformDependent0.reallocateDirectNoCleaner(buffer, capacity); + } catch (Throwable e) { + decrementMemoryCounter(len); throwException(e); return null; } @@ -576,6 +580,26 @@ public final class PlatformDependent { int capacity = buffer.capacity(); PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer)); + decrementMemoryCounter(capacity); + } + + private static void incrementMemoryCounter(int capacity) { + if (DIRECT_MEMORY_COUNTER != null) { + for (;;) { + long usedMemory = DIRECT_MEMORY_COUNTER.get(); + long newUsedMemory = usedMemory + capacity; + if (newUsedMemory > DIRECT_MEMORY_LIMIT) { + throw new OutOfDirectMemoryError("failed to allocate " + capacity + + " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')'); + } + if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) { + break; + } + } + } + } + + private static void decrementMemoryCounter(int capacity) { if (DIRECT_MEMORY_COUNTER != null) { long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity); assert usedMemory >= 0; diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index aebfff81b6..98801528fd 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -228,9 +228,15 @@ final class PlatformDependent0 { return DIRECT_BUFFER_CONSTRUCTOR != null; } + static ByteBuffer reallocateDirectNoCleaner(ByteBuffer buffer, int capacity) { + return newDirectBuffer(UNSAFE.reallocateMemory(directBufferAddress(buffer), capacity), capacity); + } + static ByteBuffer allocateDirectNoCleaner(int capacity) { - assert DIRECT_BUFFER_CONSTRUCTOR != null; - long address = UNSAFE.allocateMemory(capacity); + return newDirectBuffer(UNSAFE.allocateMemory(capacity), capacity); + } + + private static ByteBuffer newDirectBuffer(long address, int capacity) { try { return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity); } catch (Throwable cause) {