[#5224] Allow to use Unsafe.reallocateMemory(...) in UnpooledUnsafeNoCleanerDirectByteBuf.
Motivation: If the user uses unsafe direct buffers with no cleaner we can use Unsafe.reallocateMemory(...) as optimization when we need to expand the buffer. Modifications: Use Unsafe.relocateMemory(...) in UnpooledUnsafeNoCleanerDirectByteBuf. Result: Less expensive expanding of buffers.
This commit is contained in:
parent
338d6399bf
commit
0ca178acad
@ -36,10 +36,10 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
|||||||
private final ByteBufAllocator alloc;
|
private final ByteBufAllocator alloc;
|
||||||
|
|
||||||
private long memoryAddress;
|
private long memoryAddress;
|
||||||
private ByteBuffer buffer;
|
|
||||||
private ByteBuffer tmpNioBuf;
|
private ByteBuffer tmpNioBuf;
|
||||||
private int capacity;
|
private int capacity;
|
||||||
private boolean doNotFree;
|
private boolean doNotFree;
|
||||||
|
ByteBuffer buffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new direct buffer.
|
* Creates a new direct buffer.
|
||||||
@ -64,7 +64,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.alloc = alloc;
|
this.alloc = alloc;
|
||||||
setByteBuffer(allocateDirect(initialCapacity));
|
setByteBuffer(allocateDirect(initialCapacity), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -95,7 +95,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
|||||||
|
|
||||||
this.alloc = alloc;
|
this.alloc = alloc;
|
||||||
doNotFree = true;
|
doNotFree = true;
|
||||||
setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
|
setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN), false);
|
||||||
writerIndex(initialCapacity);
|
writerIndex(initialCapacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,16 +113,17 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
|||||||
PlatformDependent.freeDirectBuffer(buffer);
|
PlatformDependent.freeDirectBuffer(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setByteBuffer(ByteBuffer buffer) {
|
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
|
||||||
ByteBuffer oldBuffer = this.buffer;
|
if (tryFree) {
|
||||||
if (oldBuffer != null) {
|
ByteBuffer oldBuffer = this.buffer;
|
||||||
if (doNotFree) {
|
if (oldBuffer != null) {
|
||||||
doNotFree = false;
|
if (doNotFree) {
|
||||||
} else {
|
doNotFree = false;
|
||||||
freeDirect(oldBuffer);
|
} else {
|
||||||
|
freeDirect(oldBuffer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
memoryAddress = PlatformDependent.directBufferAddress(buffer);
|
memoryAddress = PlatformDependent.directBufferAddress(buffer);
|
||||||
tmpNioBuf = null;
|
tmpNioBuf = null;
|
||||||
@ -157,7 +158,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
|||||||
newBuffer.position(0).limit(oldBuffer.capacity());
|
newBuffer.position(0).limit(oldBuffer.capacity());
|
||||||
newBuffer.put(oldBuffer);
|
newBuffer.put(oldBuffer);
|
||||||
newBuffer.clear();
|
newBuffer.clear();
|
||||||
setByteBuffer(newBuffer);
|
setByteBuffer(newBuffer, true);
|
||||||
} else if (newCapacity < oldCapacity) {
|
} else if (newCapacity < oldCapacity) {
|
||||||
ByteBuffer oldBuffer = buffer;
|
ByteBuffer oldBuffer = buffer;
|
||||||
ByteBuffer newBuffer = allocateDirect(newCapacity);
|
ByteBuffer newBuffer = allocateDirect(newCapacity);
|
||||||
@ -172,7 +173,7 @@ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf
|
|||||||
} else {
|
} else {
|
||||||
setIndex(newCapacity, newCapacity);
|
setIndex(newCapacity, newCapacity);
|
||||||
}
|
}
|
||||||
setByteBuffer(newBuffer);
|
setByteBuffer(newBuffer, true);
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -34,4 +34,39 @@ final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByt
|
|||||||
protected void freeDirect(ByteBuffer buffer) {
|
protected void freeDirect(ByteBuffer buffer) {
|
||||||
PlatformDependent.freeDirectNoCleaner(buffer);
|
PlatformDependent.freeDirectNoCleaner(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuf capacity(int newCapacity) {
|
||||||
|
ensureAccessible();
|
||||||
|
if (newCapacity < 0 || newCapacity > maxCapacity()) {
|
||||||
|
throw new IllegalArgumentException("newCapacity: " + newCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
int readerIndex = readerIndex();
|
||||||
|
int writerIndex = writerIndex();
|
||||||
|
int oldCapacity = capacity();
|
||||||
|
|
||||||
|
if (newCapacity > oldCapacity) {
|
||||||
|
ByteBuffer oldBuffer = buffer;
|
||||||
|
ByteBuffer newBuffer = PlatformDependent.reallocateDirectNoCleaner(oldBuffer, newCapacity);
|
||||||
|
setByteBuffer(newBuffer, false);
|
||||||
|
} else if (newCapacity < oldCapacity) {
|
||||||
|
ByteBuffer oldBuffer = buffer;
|
||||||
|
ByteBuffer newBuffer = allocateDirect(newCapacity);
|
||||||
|
if (readerIndex < newCapacity) {
|
||||||
|
if (writerIndex > newCapacity) {
|
||||||
|
writerIndex = newCapacity;
|
||||||
|
writerIndex(writerIndex);
|
||||||
|
}
|
||||||
|
oldBuffer.position(readerIndex).limit(writerIndex);
|
||||||
|
newBuffer.position(readerIndex).limit(writerIndex);
|
||||||
|
newBuffer.put(oldBuffer);
|
||||||
|
newBuffer.clear();
|
||||||
|
} else {
|
||||||
|
setIndex(newCapacity, newCapacity);
|
||||||
|
}
|
||||||
|
setByteBuffer(newBuffer, true);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -457,25 +457,29 @@ public final class PlatformDependent {
|
|||||||
public static ByteBuffer allocateDirectNoCleaner(int capacity) {
|
public static ByteBuffer allocateDirectNoCleaner(int capacity) {
|
||||||
assert USE_DIRECT_BUFFER_NO_CLEANER;
|
assert USE_DIRECT_BUFFER_NO_CLEANER;
|
||||||
|
|
||||||
if (DIRECT_MEMORY_COUNTER != null) {
|
incrementMemoryCounter(capacity);
|
||||||
for (;;) {
|
|
||||||
long usedMemory = DIRECT_MEMORY_COUNTER.get();
|
|
||||||
long newUsedMemory = usedMemory + capacity;
|
|
||||||
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
|
|
||||||
throw new OutOfDirectMemoryError("failed to allocate " + capacity
|
|
||||||
+ " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')');
|
|
||||||
}
|
|
||||||
if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
return PlatformDependent0.allocateDirectNoCleaner(capacity);
|
return PlatformDependent0.allocateDirectNoCleaner(capacity);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
if (DIRECT_MEMORY_COUNTER != null) {
|
decrementMemoryCounter(capacity);
|
||||||
DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
|
throwException(e);
|
||||||
}
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reallocate a new {@link ByteBuffer} with the given {@code capacity}. {@link ByteBuffer}s reallocated with
|
||||||
|
* this method <strong>MUST</strong> be deallocated via {@link #freeDirectNoCleaner(ByteBuffer)}.
|
||||||
|
*/
|
||||||
|
public static ByteBuffer reallocateDirectNoCleaner(ByteBuffer buffer, int capacity) {
|
||||||
|
assert USE_DIRECT_BUFFER_NO_CLEANER;
|
||||||
|
|
||||||
|
int len = capacity - buffer.capacity();
|
||||||
|
incrementMemoryCounter(len);
|
||||||
|
try {
|
||||||
|
return PlatformDependent0.reallocateDirectNoCleaner(buffer, capacity);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
decrementMemoryCounter(len);
|
||||||
throwException(e);
|
throwException(e);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -490,6 +494,26 @@ public final class PlatformDependent {
|
|||||||
|
|
||||||
int capacity = buffer.capacity();
|
int capacity = buffer.capacity();
|
||||||
PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer));
|
PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer));
|
||||||
|
decrementMemoryCounter(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void incrementMemoryCounter(int capacity) {
|
||||||
|
if (DIRECT_MEMORY_COUNTER != null) {
|
||||||
|
for (;;) {
|
||||||
|
long usedMemory = DIRECT_MEMORY_COUNTER.get();
|
||||||
|
long newUsedMemory = usedMemory + capacity;
|
||||||
|
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
|
||||||
|
throw new OutOfDirectMemoryError("failed to allocate " + capacity
|
||||||
|
+ " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')');
|
||||||
|
}
|
||||||
|
if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void decrementMemoryCounter(int capacity) {
|
||||||
if (DIRECT_MEMORY_COUNTER != null) {
|
if (DIRECT_MEMORY_COUNTER != null) {
|
||||||
long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
|
long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
|
||||||
assert usedMemory >= 0;
|
assert usedMemory >= 0;
|
||||||
|
@ -173,9 +173,15 @@ final class PlatformDependent0 {
|
|||||||
return DIRECT_BUFFER_CONSTRUCTOR != null;
|
return DIRECT_BUFFER_CONSTRUCTOR != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static ByteBuffer reallocateDirectNoCleaner(ByteBuffer buffer, int capacity) {
|
||||||
|
return newDirectBuffer(UNSAFE.reallocateMemory(directBufferAddress(buffer), capacity), capacity);
|
||||||
|
}
|
||||||
|
|
||||||
static ByteBuffer allocateDirectNoCleaner(int capacity) {
|
static ByteBuffer allocateDirectNoCleaner(int capacity) {
|
||||||
assert DIRECT_BUFFER_CONSTRUCTOR != null;
|
return newDirectBuffer(UNSAFE.allocateMemory(capacity), capacity);
|
||||||
long address = UNSAFE.allocateMemory(capacity);
|
}
|
||||||
|
|
||||||
|
private static ByteBuffer newDirectBuffer(long address, int capacity) {
|
||||||
try {
|
try {
|
||||||
return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
|
return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
|
||||||
} catch (Throwable cause) {
|
} catch (Throwable cause) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user