[#5224] Allow to use Unsafe.reallocateMemory(...) in UnpooledUnsafeNoCleanerDirectByteBuf.
Motivation: If the user uses unsafe direct buffers with no cleaner we can use Unsafe.reallocateMemory(...) as optimization when we need to expand the buffer. Modifications: Use Unsafe.relocateMemory(...) in UnpooledUnsafeNoCleanerDirectByteBuf. Result: Less expensive expanding of buffers.
This commit is contained in:
parent
9bd94ea021
commit
3a7dcde320
@ -37,10 +37,10 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
||||
private final ByteBufAllocator alloc;
|
||||
|
||||
private long memoryAddress;
|
||||
private ByteBuffer buffer;
|
||||
private ByteBuffer tmpNioBuf;
|
||||
private int capacity;
|
||||
private boolean doNotFree;
|
||||
ByteBuffer buffer;
|
||||
|
||||
/**
|
||||
* Creates a new direct buffer.
|
||||
@ -65,7 +65,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
||||
}
|
||||
|
||||
this.alloc = alloc;
|
||||
setByteBuffer(allocateDirect(initialCapacity));
|
||||
setByteBuffer(allocateDirect(initialCapacity), false);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -96,7 +96,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
||||
|
||||
this.alloc = alloc;
|
||||
doNotFree = true;
|
||||
setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
|
||||
setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN), false);
|
||||
writerIndex(initialCapacity);
|
||||
}
|
||||
|
||||
@ -114,16 +114,17 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
||||
PlatformDependent.freeDirectBuffer(buffer);
|
||||
}
|
||||
|
||||
private void setByteBuffer(ByteBuffer buffer) {
|
||||
ByteBuffer oldBuffer = this.buffer;
|
||||
if (oldBuffer != null) {
|
||||
if (doNotFree) {
|
||||
doNotFree = false;
|
||||
} else {
|
||||
freeDirect(oldBuffer);
|
||||
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
|
||||
if (tryFree) {
|
||||
ByteBuffer oldBuffer = this.buffer;
|
||||
if (oldBuffer != null) {
|
||||
if (doNotFree) {
|
||||
doNotFree = false;
|
||||
} else {
|
||||
freeDirect(oldBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.buffer = buffer;
|
||||
memoryAddress = PlatformDependent.directBufferAddress(buffer);
|
||||
tmpNioBuf = null;
|
||||
@ -158,7 +159,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
||||
newBuffer.position(0).limit(oldBuffer.capacity());
|
||||
newBuffer.put(oldBuffer);
|
||||
newBuffer.clear();
|
||||
setByteBuffer(newBuffer);
|
||||
setByteBuffer(newBuffer, true);
|
||||
} else if (newCapacity < oldCapacity) {
|
||||
ByteBuffer oldBuffer = buffer;
|
||||
ByteBuffer newBuffer = allocateDirect(newCapacity);
|
||||
@ -173,7 +174,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
||||
} else {
|
||||
setIndex(newCapacity, newCapacity);
|
||||
}
|
||||
setByteBuffer(newBuffer);
|
||||
setByteBuffer(newBuffer, true);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -34,4 +34,39 @@ final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByt
|
||||
protected void freeDirect(ByteBuffer buffer) {
|
||||
PlatformDependent.freeDirectNoCleaner(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf capacity(int newCapacity) {
|
||||
ensureAccessible();
|
||||
if (newCapacity < 0 || newCapacity > maxCapacity()) {
|
||||
throw new IllegalArgumentException("newCapacity: " + newCapacity);
|
||||
}
|
||||
|
||||
int readerIndex = readerIndex();
|
||||
int writerIndex = writerIndex();
|
||||
int oldCapacity = capacity();
|
||||
|
||||
if (newCapacity > oldCapacity) {
|
||||
ByteBuffer oldBuffer = buffer;
|
||||
ByteBuffer newBuffer = PlatformDependent.reallocateDirectNoCleaner(oldBuffer, newCapacity);
|
||||
setByteBuffer(newBuffer, false);
|
||||
} else if (newCapacity < oldCapacity) {
|
||||
ByteBuffer oldBuffer = buffer;
|
||||
ByteBuffer newBuffer = allocateDirect(newCapacity);
|
||||
if (readerIndex < newCapacity) {
|
||||
if (writerIndex > newCapacity) {
|
||||
writerIndex = newCapacity;
|
||||
writerIndex(writerIndex);
|
||||
}
|
||||
oldBuffer.position(readerIndex).limit(writerIndex);
|
||||
newBuffer.position(readerIndex).limit(writerIndex);
|
||||
newBuffer.put(oldBuffer);
|
||||
newBuffer.clear();
|
||||
} else {
|
||||
setIndex(newCapacity, newCapacity);
|
||||
}
|
||||
setByteBuffer(newBuffer, true);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
@ -543,25 +543,29 @@ public final class PlatformDependent {
|
||||
public static ByteBuffer allocateDirectNoCleaner(int capacity) {
|
||||
assert USE_DIRECT_BUFFER_NO_CLEANER;
|
||||
|
||||
if (DIRECT_MEMORY_COUNTER != null) {
|
||||
for (;;) {
|
||||
long usedMemory = DIRECT_MEMORY_COUNTER.get();
|
||||
long newUsedMemory = usedMemory + capacity;
|
||||
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
|
||||
throw new OutOfDirectMemoryError("failed to allocate " + capacity
|
||||
+ " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')');
|
||||
}
|
||||
if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
incrementMemoryCounter(capacity);
|
||||
try {
|
||||
return PlatformDependent0.allocateDirectNoCleaner(capacity);
|
||||
} catch (Throwable e) {
|
||||
if (DIRECT_MEMORY_COUNTER != null) {
|
||||
DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
|
||||
}
|
||||
decrementMemoryCounter(capacity);
|
||||
throwException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reallocate a new {@link ByteBuffer} with the given {@code capacity}. {@link ByteBuffer}s reallocated with
|
||||
* this method <strong>MUST</strong> be deallocated via {@link #freeDirectNoCleaner(ByteBuffer)}.
|
||||
*/
|
||||
public static ByteBuffer reallocateDirectNoCleaner(ByteBuffer buffer, int capacity) {
|
||||
assert USE_DIRECT_BUFFER_NO_CLEANER;
|
||||
|
||||
int len = capacity - buffer.capacity();
|
||||
incrementMemoryCounter(len);
|
||||
try {
|
||||
return PlatformDependent0.reallocateDirectNoCleaner(buffer, capacity);
|
||||
} catch (Throwable e) {
|
||||
decrementMemoryCounter(len);
|
||||
throwException(e);
|
||||
return null;
|
||||
}
|
||||
@ -576,6 +580,26 @@ public final class PlatformDependent {
|
||||
|
||||
int capacity = buffer.capacity();
|
||||
PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer));
|
||||
decrementMemoryCounter(capacity);
|
||||
}
|
||||
|
||||
private static void incrementMemoryCounter(int capacity) {
|
||||
if (DIRECT_MEMORY_COUNTER != null) {
|
||||
for (;;) {
|
||||
long usedMemory = DIRECT_MEMORY_COUNTER.get();
|
||||
long newUsedMemory = usedMemory + capacity;
|
||||
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
|
||||
throw new OutOfDirectMemoryError("failed to allocate " + capacity
|
||||
+ " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')');
|
||||
}
|
||||
if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void decrementMemoryCounter(int capacity) {
|
||||
if (DIRECT_MEMORY_COUNTER != null) {
|
||||
long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
|
||||
assert usedMemory >= 0;
|
||||
|
@ -228,9 +228,15 @@ final class PlatformDependent0 {
|
||||
return DIRECT_BUFFER_CONSTRUCTOR != null;
|
||||
}
|
||||
|
||||
static ByteBuffer reallocateDirectNoCleaner(ByteBuffer buffer, int capacity) {
|
||||
return newDirectBuffer(UNSAFE.reallocateMemory(directBufferAddress(buffer), capacity), capacity);
|
||||
}
|
||||
|
||||
static ByteBuffer allocateDirectNoCleaner(int capacity) {
|
||||
assert DIRECT_BUFFER_CONSTRUCTOR != null;
|
||||
long address = UNSAFE.allocateMemory(capacity);
|
||||
return newDirectBuffer(UNSAFE.allocateMemory(capacity), capacity);
|
||||
}
|
||||
|
||||
private static ByteBuffer newDirectBuffer(long address, int capacity) {
|
||||
try {
|
||||
return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
|
||||
} catch (Throwable cause) {
|
||||
|
Loading…
Reference in New Issue
Block a user