PooledByteBufAllocatorTest may has memory visiblity issues as it uses non concurrent queue
Motivation: PooledByteBufAllocatorTest uses an ArrayQueue but access it from multiple threads (not concurrently but still from different threads). This may leak to memory visibility issues. Modifications: - Use a concurrent queue - Some cleanup Result: Non racy test code.
This commit is contained in:
parent
c131ef9f6f
commit
b5e40b2dde
@ -21,12 +21,13 @@ import io.netty.util.concurrent.FastThreadLocalThread;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
@ -323,11 +324,9 @@ public class PooledByteBufAllocatorTest {
|
||||
}
|
||||
}
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final Queue<ByteBuf> buffers = new ArrayDeque<ByteBuf>(10);
|
||||
private final Queue<ByteBuf> buffers = new ConcurrentLinkedQueue<ByteBuf>();
|
||||
private final ByteBufAllocator allocator;
|
||||
private volatile boolean finished;
|
||||
private volatile Throwable error;
|
||||
private final AtomicReference<Object> finish = new AtomicReference<Object>();
|
||||
|
||||
public AllocationThread(ByteBufAllocator allocator) {
|
||||
this.allocator = allocator;
|
||||
@ -337,7 +336,7 @@ public class PooledByteBufAllocatorTest {
|
||||
public void run() {
|
||||
try {
|
||||
int idx = 0;
|
||||
while (!finished) {
|
||||
while (finish.get() == null) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
buffers.add(allocator.directBuffer(
|
||||
ALLOCATION_SIZES[Math.abs(idx++ % ALLOCATION_SIZES.length)],
|
||||
@ -346,12 +345,10 @@ public class PooledByteBufAllocatorTest {
|
||||
releaseBuffers();
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
error = cause;
|
||||
finished = true;
|
||||
finish.set(cause);
|
||||
} finally {
|
||||
releaseBuffers();
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
private void releaseBuffers() {
|
||||
@ -365,22 +362,24 @@ public class PooledByteBufAllocatorTest {
|
||||
}
|
||||
|
||||
public boolean isFinished() {
|
||||
return finished;
|
||||
return finish.get() != null;
|
||||
}
|
||||
|
||||
public void finish() throws Throwable {
|
||||
try {
|
||||
finished = true;
|
||||
latch.await();
|
||||
checkForError();
|
||||
// Mark as finish if not already done but ensure we not override the previous set error.
|
||||
finish.compareAndSet(null, Boolean.TRUE);
|
||||
join();
|
||||
} finally {
|
||||
releaseBuffers();
|
||||
}
|
||||
checkForError();
|
||||
}
|
||||
|
||||
public void checkForError() throws Throwable {
|
||||
if (error != null) {
|
||||
throw error;
|
||||
Object obj = finish.get();
|
||||
if (obj instanceof Throwable) {
|
||||
throw (Throwable) obj;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user