[#4198] Fix race-condition when allocate from multiple-thread.

Motivation:

Fix a race condition that was introduced by f18990a8a5 that could lead to a NPE when allocate from the PooledByteBufAllocator concurrently by many threads.

Modifications:

Correctly synchronize on the PoolSubPage head.

Result:

No more race.
This commit is contained in:
Norman Maurer 2015-10-22 22:09:41 +02:00
parent 85236d5446
commit d93f906255
4 changed files with 212 additions and 84 deletions

View File

@ -183,8 +183,8 @@ abstract class PoolArena<T> implements PoolArenaMetric {
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolSubpage#allocate()} and
* {@link PoolSubpage#free(int)} may modify the doubly linked list as well.
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;

View File

@ -306,6 +306,10 @@ final class PoolChunk<T> implements PoolChunkMetric {
* @return index in memoryMap
*/
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
int id = allocateNode(d);
if (id < 0) {
@ -320,13 +324,14 @@ final class PoolChunk<T> implements PoolChunkMetric {
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(this, id, runOffset(id), pageSize, normCapacity);
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(normCapacity);
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
/**
* Free a subpage or a run of pages
@ -343,10 +348,16 @@ final class PoolChunk<T> implements PoolChunkMetric {
if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
if (subpage.free(bitmapIdx & 0x3FFFFFFF)) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
return;
}
}
}
freeBytes += runLength(memoryMapIdx);
setValue(memoryMapIdx, depth(memoryMapIdx));
updateParentsFree(memoryMapIdx);

View File

@ -47,16 +47,16 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
bitmap = null;
}
PoolSubpage(PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(elemSize);
init(head, elemSize);
}
void init(int elemSize) {
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
@ -71,12 +71,8 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
bitmap[i] = 0;
}
}
PoolSubpage<T> head = chunk.arena.findSubpagePoolHead(elemSize);
synchronized (head) {
addToPool(head);
}
}
/**
* Returns the bitmap index of the subpage allocation.
@ -86,13 +82,6 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
return toHandle(0);
}
/**
* Synchronize on the head of the SubpagePool stored in the {@link PoolArena. This is needed as we synchronize
* on it when calling {@link PoolArena#allocate(PoolThreadCache, int, int)} und try to allocate out of the
* {@link PoolSubpage} pool for a given size.
*/
PoolSubpage<T> head = chunk.arena.findSubpagePoolHead(elemSize);
synchronized (head) {
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
@ -109,26 +98,15 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
return toHandle(bitmapIdx);
}
}
/**
* @return {@code true} if this subpage is in use.
* {@code false} if this subpage is not used by its chunk and thus it's OK to be released.
*/
boolean free(int bitmapIdx) {
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
/**
* Synchronize on the head of the SubpagePool stored in the {@link PoolArena. This is needed as we synchronize
* on it when calling {@link PoolArena#allocate(PoolThreadCache, int, int)} und try to allocate out of the
* {@link PoolSubpage} pool for a given size.
*/
PoolSubpage<T> head = chunk.arena.findSubpagePoolHead(elemSize);
synchronized (head) {
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
@ -156,7 +134,6 @@ final class PoolSubpage<T> implements PoolSubpageMetric {
return false;
}
}
}
private void addToPool(PoolSubpage<T> head) {
assert prev == null && next == null;

View File

@ -0,0 +1,140 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.SystemPropertyUtil;
import org.junit.Test;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class PooledByteBufAllocatorTest {
private static final int[] ALLOCATION_SIZES = new int[16 * 1024];
static {
for (int i = 0; i < ALLOCATION_SIZES.length; i++) {
ALLOCATION_SIZES[i] = i;
}
}
@Test
public void testConcurrentUsage() throws Throwable {
long runningTime = TimeUnit.MILLISECONDS.toNanos(SystemPropertyUtil.getLong(
"io.netty.buffer.PooledByteBufAllocatorTest.testConcurrentUsageTime", 15000));
// We use no caches and only one arena to maximize the chance of hitting the race-condition we
// had before.
ByteBufAllocator allocator = new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0);
List<AllocationThread> threads = new ArrayList<AllocationThread>();
try {
for (int i = 0; i < 512; i++) {
AllocationThread thread = new AllocationThread(allocator);
thread.start();
threads.add(thread);
}
long start = System.nanoTime();
while (!isExpired(start, runningTime)) {
checkForErrors(threads);
Thread.sleep(100);
}
} finally {
for (AllocationThread t : threads) {
t.finish();
}
}
}
private static boolean isExpired(long start, long expireTime) {
return System.nanoTime() - start > expireTime;
}
private static void checkForErrors(List<AllocationThread> threads) throws Throwable {
for (AllocationThread t : threads) {
if (t.isFinished()) {
t.checkForError();
}
}
}
private static final class AllocationThread extends Thread {
private final CountDownLatch latch = new CountDownLatch(1);
private final Queue<ByteBuf> buffers = new ArrayDeque<ByteBuf>(10);
private final ByteBufAllocator allocator;
private volatile boolean finished;
private volatile Throwable error;
public AllocationThread(ByteBufAllocator allocator) {
this.allocator = allocator;
}
@Override
public void run() {
try {
int idx = 0;
while (!finished) {
for (int i = 0; i < 10; i++) {
buffers.add(allocator.directBuffer(
ALLOCATION_SIZES[Math.abs(idx++ % ALLOCATION_SIZES.length)],
Integer.MAX_VALUE));
}
releaseBuffers();
}
} catch (Throwable cause) {
error = cause;
finished = true;
} finally {
releaseBuffers();
}
latch.countDown();
}
private void releaseBuffers() {
for (;;) {
ByteBuf buf = buffers.poll();
if (buf == null) {
break;
}
buf.release();
}
}
public boolean isFinished() {
return finished;
}
public void finish() throws Throwable {
try {
finished = true;
latch.await();
checkForError();
} finally {
releaseBuffers();
}
}
public void checkForError() throws Throwable {
if (error != null) {
throw error;
}
}
}
}