From d93f9062554d07eec6d376d85d886e40b6a907f7 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 22 Oct 2015 22:09:41 +0200 Subject: [PATCH] [#4198] Fix race-condition when allocate from multiple-thread. Motivation: Fix a race condition that was introduced by f18990a8a507d52fc40416d169db340105b10ec0 that could lead to a NPE when allocate from the PooledByteBufAllocator concurrently by many threads. Modifications: Correctly synchronize on the PoolSubPage head. Result: No more race. --- .../main/java/io/netty/buffer/PoolArena.java | 4 +- .../main/java/io/netty/buffer/PoolChunk.java | 53 ++++--- .../java/io/netty/buffer/PoolSubpage.java | 99 +++++-------- .../buffer/PooledByteBufAllocatorTest.java | 140 ++++++++++++++++++ 4 files changed, 212 insertions(+), 84 deletions(-) create mode 100644 buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index 72d5e01cf4..4cb6af2b6a 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -183,8 +183,8 @@ abstract class PoolArena implements PoolArenaMetric { final PoolSubpage head = table[tableIdx]; /** - * Synchronize on the head. This is needed as {@link PoolSubpage#allocate()} and - * {@link PoolSubpage#free(int)} may modify the doubly linked list as well. + * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and + * {@link PoolChunk#free(long)} may modify the doubly linked list as well. */ synchronized (head) { final PoolSubpage s = head.next; diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunk.java b/buffer/src/main/java/io/netty/buffer/PoolChunk.java index bf2108cacf..46af3d4e67 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolChunk.java +++ b/buffer/src/main/java/io/netty/buffer/PoolChunk.java @@ -306,26 +306,31 @@ final class PoolChunk implements PoolChunkMetric { * @return index in memoryMap */ private long allocateSubpage(int normCapacity) { - int d = maxOrder; // subpages are only be allocated from pages i.e., leaves - int id = allocateNode(d); - if (id < 0) { - return id; + // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it. + // This is need as we may add it back and so alter the linked-list structure. + PoolSubpage head = arena.findSubpagePoolHead(normCapacity); + synchronized (head) { + int d = maxOrder; // subpages are only be allocated from pages i.e., leaves + int id = allocateNode(d); + if (id < 0) { + return id; + } + + final PoolSubpage[] subpages = this.subpages; + final int pageSize = this.pageSize; + + freeBytes -= pageSize; + + int subpageIdx = subpageIdx(id); + PoolSubpage subpage = subpages[subpageIdx]; + if (subpage == null) { + subpage = new PoolSubpage(head, this, id, runOffset(id), pageSize, normCapacity); + subpages[subpageIdx] = subpage; + } else { + subpage.init(head, normCapacity); + } + return subpage.allocate(); } - - final PoolSubpage[] subpages = this.subpages; - final int pageSize = this.pageSize; - - freeBytes -= pageSize; - - int subpageIdx = subpageIdx(id); - PoolSubpage subpage = subpages[subpageIdx]; - if (subpage == null) { - subpage = new PoolSubpage(this, id, runOffset(id), pageSize, normCapacity); - subpages[subpageIdx] = subpage; - } else { - subpage.init(normCapacity); - } - return subpage.allocate(); } /** @@ -343,8 +348,14 @@ final class PoolChunk implements PoolChunkMetric { if (bitmapIdx != 0) { // free a subpage PoolSubpage subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage != null && subpage.doNotDestroy; - if (subpage.free(bitmapIdx & 0x3FFFFFFF)) { - return; + + // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it. + // This is need as we may add it back and so alter the linked-list structure. + PoolSubpage head = arena.findSubpagePoolHead(subpage.elemSize); + synchronized (head) { + if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) { + return; + } } } freeBytes += runLength(memoryMapIdx); diff --git a/buffer/src/main/java/io/netty/buffer/PoolSubpage.java b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java index 0c713d2bd2..0f7cbd0e48 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolSubpage.java +++ b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java @@ -47,16 +47,16 @@ final class PoolSubpage implements PoolSubpageMetric { bitmap = null; } - PoolSubpage(PoolChunk chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { + PoolSubpage(PoolSubpage head, PoolChunk chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { this.chunk = chunk; this.memoryMapIdx = memoryMapIdx; this.runOffset = runOffset; this.pageSize = pageSize; bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64 - init(elemSize); + init(head, elemSize); } - void init(int elemSize) { + void init(PoolSubpage head, int elemSize) { doNotDestroy = true; this.elemSize = elemSize; if (elemSize != 0) { @@ -71,11 +71,7 @@ final class PoolSubpage implements PoolSubpageMetric { bitmap[i] = 0; } } - - PoolSubpage head = chunk.arena.findSubpagePoolHead(elemSize); - synchronized (head) { - addToPool(head); - } + addToPool(head); } /** @@ -86,75 +82,56 @@ final class PoolSubpage implements PoolSubpageMetric { return toHandle(0); } - /** - * Synchronize on the head of the SubpagePool stored in the {@link PoolArena. This is needed as we synchronize - * on it when calling {@link PoolArena#allocate(PoolThreadCache, int, int)} und try to allocate out of the - * {@link PoolSubpage} pool for a given size. - */ - PoolSubpage head = chunk.arena.findSubpagePoolHead(elemSize); - synchronized (head) { - if (numAvail == 0 || !doNotDestroy) { - return -1; - } - - final int bitmapIdx = getNextAvail(); - int q = bitmapIdx >>> 6; - int r = bitmapIdx & 63; - assert (bitmap[q] >>> r & 1) == 0; - bitmap[q] |= 1L << r; - - if (-- numAvail == 0) { - removeFromPool(); - } - - return toHandle(bitmapIdx); + if (numAvail == 0 || !doNotDestroy) { + return -1; } + + final int bitmapIdx = getNextAvail(); + int q = bitmapIdx >>> 6; + int r = bitmapIdx & 63; + assert (bitmap[q] >>> r & 1) == 0; + bitmap[q] |= 1L << r; + + if (-- numAvail == 0) { + removeFromPool(); + } + + return toHandle(bitmapIdx); } /** * @return {@code true} if this subpage is in use. * {@code false} if this subpage is not used by its chunk and thus it's OK to be released. */ - boolean free(int bitmapIdx) { - + boolean free(PoolSubpage head, int bitmapIdx) { if (elemSize == 0) { return true; } + int q = bitmapIdx >>> 6; + int r = bitmapIdx & 63; + assert (bitmap[q] >>> r & 1) != 0; + bitmap[q] ^= 1L << r; - /** - * Synchronize on the head of the SubpagePool stored in the {@link PoolArena. This is needed as we synchronize - * on it when calling {@link PoolArena#allocate(PoolThreadCache, int, int)} und try to allocate out of the - * {@link PoolSubpage} pool for a given size. - */ - PoolSubpage head = chunk.arena.findSubpagePoolHead(elemSize); + setNextAvail(bitmapIdx); - synchronized (head) { - int q = bitmapIdx >>> 6; - int r = bitmapIdx & 63; - assert (bitmap[q] >>> r & 1) != 0; - bitmap[q] ^= 1L << r; + if (numAvail ++ == 0) { + addToPool(head); + return true; + } - setNextAvail(bitmapIdx); - - if (numAvail ++ == 0) { - addToPool(head); + if (numAvail != maxNumElems) { + return true; + } else { + // Subpage not in use (numAvail == maxNumElems) + if (prev == next) { + // Do not remove if this subpage is the only one left in the pool. return true; } - if (numAvail != maxNumElems) { - return true; - } else { - // Subpage not in use (numAvail == maxNumElems) - if (prev == next) { - // Do not remove if this subpage is the only one left in the pool. - return true; - } - - // Remove this subpage from the pool if there are other subpages left in the pool. - doNotDestroy = false; - removeFromPool(); - return false; - } + // Remove this subpage from the pool if there are other subpages left in the pool. + doNotDestroy = false; + removeFromPool(); + return false; } } diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java new file mode 100644 index 0000000000..907a7a2651 --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import io.netty.util.internal.SystemPropertyUtil; +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class PooledByteBufAllocatorTest { + + private static final int[] ALLOCATION_SIZES = new int[16 * 1024]; + static { + for (int i = 0; i < ALLOCATION_SIZES.length; i++) { + ALLOCATION_SIZES[i] = i; + } + } + + @Test + public void testConcurrentUsage() throws Throwable { + long runningTime = TimeUnit.MILLISECONDS.toNanos(SystemPropertyUtil.getLong( + "io.netty.buffer.PooledByteBufAllocatorTest.testConcurrentUsageTime", 15000)); + + // We use no caches and only one arena to maximize the chance of hitting the race-condition we + // had before. + ByteBufAllocator allocator = new PooledByteBufAllocator(true, 1, 1, 8192, 11, 0, 0, 0); + List threads = new ArrayList(); + try { + for (int i = 0; i < 512; i++) { + AllocationThread thread = new AllocationThread(allocator); + thread.start(); + threads.add(thread); + } + + long start = System.nanoTime(); + while (!isExpired(start, runningTime)) { + checkForErrors(threads); + Thread.sleep(100); + } + } finally { + for (AllocationThread t : threads) { + t.finish(); + } + } + } + + private static boolean isExpired(long start, long expireTime) { + return System.nanoTime() - start > expireTime; + } + + private static void checkForErrors(List threads) throws Throwable { + for (AllocationThread t : threads) { + if (t.isFinished()) { + t.checkForError(); + } + } + } + + private static final class AllocationThread extends Thread { + private final CountDownLatch latch = new CountDownLatch(1); + private final Queue buffers = new ArrayDeque(10); + private final ByteBufAllocator allocator; + private volatile boolean finished; + private volatile Throwable error; + + public AllocationThread(ByteBufAllocator allocator) { + this.allocator = allocator; + } + + @Override + public void run() { + try { + int idx = 0; + while (!finished) { + for (int i = 0; i < 10; i++) { + buffers.add(allocator.directBuffer( + ALLOCATION_SIZES[Math.abs(idx++ % ALLOCATION_SIZES.length)], + Integer.MAX_VALUE)); + } + releaseBuffers(); + } + } catch (Throwable cause) { + error = cause; + finished = true; + } finally { + releaseBuffers(); + } + latch.countDown(); + } + + private void releaseBuffers() { + for (;;) { + ByteBuf buf = buffers.poll(); + if (buf == null) { + break; + } + buf.release(); + } + } + + public boolean isFinished() { + return finished; + } + + public void finish() throws Throwable { + try { + finished = true; + latch.await(); + checkForError(); + } finally { + releaseBuffers(); + } + } + + public void checkForError() throws Throwable { + if (error != null) { + throw error; + } + } + } +}