Pooled buffers must reset their state before reuse
Motivation: Buffers should always behave the same, regardless of their underlying implementation and how they are allocated. Modification: The SizeClassedMemoryPool did not properly reset the internal buffer state prior to reusing them. The offsets, byte order, and contents are now cleared before a buffer is reused. Result: There is no way to observe externally whether a buffer was reused or not.
This commit is contained in:
parent
b0acb61f03
commit
53d2e4b955
@ -288,9 +288,10 @@ public interface Buf extends Rc<Buf>, BufAccessors {
|
|||||||
* Resets the {@linkplain #readerOffset() read offset} and the {@linkplain #writerOffset() write offset} on this
|
* Resets the {@linkplain #readerOffset() read offset} and the {@linkplain #writerOffset() write offset} on this
|
||||||
* buffer to their initial values.
|
* buffer to their initial values.
|
||||||
*/
|
*/
|
||||||
default void reset() {
|
default Buf reset() {
|
||||||
readerOffset(0);
|
readerOffset(0);
|
||||||
writerOffset(0);
|
writerOffset(0);
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
package io.netty.buffer.api;
|
package io.netty.buffer.api;
|
||||||
|
|
||||||
import java.lang.invoke.VarHandle;
|
import java.lang.invoke.VarHandle;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
@ -42,7 +43,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
|
|||||||
var sizeClassPool = getSizeClassPool(size);
|
var sizeClassPool = getSizeClassPool(size);
|
||||||
Send<Buf> send = sizeClassPool.poll();
|
Send<Buf> send = sizeClassPool.poll();
|
||||||
if (send != null) {
|
if (send != null) {
|
||||||
return send.receive();
|
return send.receive().reset().fill((byte) 0).order(ByteOrder.nativeOrder());
|
||||||
}
|
}
|
||||||
return createBuf(size, getDrop());
|
return createBuf(size, getDrop());
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@ import java.util.concurrent.ExecutorService;
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.stream.Stream.Builder;
|
import java.util.stream.Stream.Builder;
|
||||||
@ -85,6 +86,10 @@ public class BufTest {
|
|||||||
return fixtureCombinations().filter(f -> f.isDirect() && f.isCleaner() && f.isPooled());
|
return fixtureCombinations().filter(f -> f.isDirect() && f.isCleaner() && f.isPooled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Stream<Fixture> poolingAllocators() {
|
||||||
|
return fixtureCombinations().filter(f -> f.isPooled());
|
||||||
|
}
|
||||||
|
|
||||||
private static Stream<Fixture> fixtureCombinations() {
|
private static Stream<Fixture> fixtureCombinations() {
|
||||||
Fixture[] fxs = fixtures;
|
Fixture[] fxs = fixtures;
|
||||||
if (fxs != null) {
|
if (fxs != null) {
|
||||||
@ -1507,6 +1512,38 @@ public class BufTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource("poolingAllocators")
|
||||||
|
public void pooledBuffersMustResetStateBeforeReuse(Fixture fixture) {
|
||||||
|
try (Allocator allocator = fixture.createAllocator();
|
||||||
|
Buf expected = allocator.allocate(8)) {
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
try (Buf buf = allocator.allocate(8)) {
|
||||||
|
assertEquals(expected.capacity(), buf.capacity());
|
||||||
|
assertEquals(expected.readableBytes(), buf.readableBytes());
|
||||||
|
assertEquals(expected.readerOffset(), buf.readerOffset());
|
||||||
|
assertEquals(expected.writableBytes(), buf.writableBytes());
|
||||||
|
assertEquals(expected.writerOffset(), buf.writerOffset());
|
||||||
|
assertThat(buf.order()).isEqualTo(expected.order());
|
||||||
|
byte[] bytes = new byte[8];
|
||||||
|
buf.copyInto(0, bytes, 0, 8);
|
||||||
|
assertThat(bytes).containsExactly(0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
|
||||||
|
var tlr = ThreadLocalRandom.current();
|
||||||
|
buf.order(tlr.nextBoolean()? ByteOrder.LITTLE_ENDIAN : ByteOrder.BIG_ENDIAN);
|
||||||
|
for (int j = 0; j < tlr.nextInt(0, 8); j++) {
|
||||||
|
buf.writeByte((byte) 1);
|
||||||
|
}
|
||||||
|
if (buf.readableBytes() > 0) {
|
||||||
|
for (int j = 0; j < tlr.nextInt(0, buf.readableBytes()); j++) {
|
||||||
|
buf.readByte();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
|
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("allocators")
|
@MethodSource("allocators")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user