netty5/buffer/src/main/java/io/netty5/buffer/api/pool/PoolChunkList.java

250 lines
8.6 KiB
Java
Raw Normal View History

Introduce alternative Buffer API (#11347) Motivation: In Netty 5 we wish to have a simpler, safe, future proof, and more consistent buffer API. We developed such an API in the incubating buffer repository, and taking it through multiple rounds of review and adjustments. This PR/commit bring the results of that work into the Netty 5 branch of the main Netty repository. Modifications: * `Buffer` is an interface, and all implementations are hidden behind it. There is no longer an inheritance hierarchy of abstract classes and implementations. * Reference counting is gone. After a buffer has been allocated, calling `close` on it will deallocate it. It is then up to users and integrators to ensure that the life-times of buffers are managed correctly. This is usually not a problem as buffers tend to flow through the pipeline to be released after a terminal IO operation. * Slice and duplicate methods are replaced with `split`. By removing slices, duplicate, and reference counting, there is no longer a possibility that a buffer and/or its memory can be shared and accessible through multiple routes. This solves the problem of data being accessed from multiple places in an uncoordinated way, and the problem of buffer memory being closed while being in use by some unsuspecting piece of code. Some adjustments will have to be made to other APIs, idioms, and usages, since `split` is not always a replacement for `slice` in some use cases. * The `split` has been added which allows memory to be shared among multiple buffers, but in non-overlapping regions. When the memory regions don't overlap, it will not be possible for the different buffers to interfere with each other. An internal, and completely transparent, reference counting system ensures that the backing memory is released once the last buffer view is closed. * A Send API has been introduced that can be used to enforce (in the type system) the transfer of buffer ownership. This is not expected to be used in the pipeline flow itself, but rather for other objects that wrap buffers and wish to avoid becoming "shared views" — the absence of "shared views" of memory is important for avoiding bugs in the absence of reference counting. * A new BufferAllocator API, where the choice of implementation determines factors like on-/off-heap, pooling or not. How access to the different allocators will be exposed to integrators will be decided later. Perhaps they'll be directly accessible on the `ChannelHandlerContext`. * The `PooledBufferAllocator` has been copied and modified to match the new allocator API. This includes unifying its implementation that was previously split across on-heap and off-heap. * The `PooledBufferAllocator` implementation has also been adjusted to allocate 4 MiB chunks by default, and a few changes have been made to the implementation to make a newly created, empty allocator use significantly less heap memory. * A `Resource` interface has been added, which defines the life-cycle methods and the `send` method. The `Buffer` interface extends this. * Analogues for `ByteBufHolder` has been added in the `BufferHolder` and `BufferRef` classes. * `ByteCursor` is added as a new way to iterate the data in buffers. The byte cursor API is designed to be more JIT friendly than an iterator, or the existing `ByteProcessor` interface. * `CompositeBuffer` no longer permit the same level of access to its internal components. The composite buffer enforces its ownership of its components via the `Send` API, and the components can only be individually accessed with the `forEachReadable` and `forEachWritable` methods. This keeps the API and behavioral differences between composite and non-composite buffers to a minimum. * Two implementations of the `Buffer` interface are provided with the API: One based on `ByteBuffer`, and one based on `sun.misc.Unsafe`. The `ByteBuffer` implementation is used by default. More implementations can be loaded from the classpath via service loading. The `MemorySegment` based implementation is left behind in the incubator repository. * An extensive and highly parameterised test suite has been added, to ensure that all implementations have consistent and correct behaviour, regardless of their configuration or composition. Result: We have a new buffer API that is simpler, better tested, more consistent in behaviour, and safer by design, than the existing `ByteBuf` API. The next legs of this journey will be about integrating this new API into Netty proper, and deprecate (and eventually remove) the `ByteBuf` API. This fixes #11024, #8601, #8543, #8542, #8534, #3358, and #3306.
2021-06-28 12:06:44 +02:00
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty5.buffer.api.pool;
Introduce alternative Buffer API (#11347) Motivation: In Netty 5 we wish to have a simpler, safe, future proof, and more consistent buffer API. We developed such an API in the incubating buffer repository, and taking it through multiple rounds of review and adjustments. This PR/commit bring the results of that work into the Netty 5 branch of the main Netty repository. Modifications: * `Buffer` is an interface, and all implementations are hidden behind it. There is no longer an inheritance hierarchy of abstract classes and implementations. * Reference counting is gone. After a buffer has been allocated, calling `close` on it will deallocate it. It is then up to users and integrators to ensure that the life-times of buffers are managed correctly. This is usually not a problem as buffers tend to flow through the pipeline to be released after a terminal IO operation. * Slice and duplicate methods are replaced with `split`. By removing slices, duplicate, and reference counting, there is no longer a possibility that a buffer and/or its memory can be shared and accessible through multiple routes. This solves the problem of data being accessed from multiple places in an uncoordinated way, and the problem of buffer memory being closed while being in use by some unsuspecting piece of code. Some adjustments will have to be made to other APIs, idioms, and usages, since `split` is not always a replacement for `slice` in some use cases. * The `split` has been added which allows memory to be shared among multiple buffers, but in non-overlapping regions. When the memory regions don't overlap, it will not be possible for the different buffers to interfere with each other. An internal, and completely transparent, reference counting system ensures that the backing memory is released once the last buffer view is closed. * A Send API has been introduced that can be used to enforce (in the type system) the transfer of buffer ownership. This is not expected to be used in the pipeline flow itself, but rather for other objects that wrap buffers and wish to avoid becoming "shared views" — the absence of "shared views" of memory is important for avoiding bugs in the absence of reference counting. * A new BufferAllocator API, where the choice of implementation determines factors like on-/off-heap, pooling or not. How access to the different allocators will be exposed to integrators will be decided later. Perhaps they'll be directly accessible on the `ChannelHandlerContext`. * The `PooledBufferAllocator` has been copied and modified to match the new allocator API. This includes unifying its implementation that was previously split across on-heap and off-heap. * The `PooledBufferAllocator` implementation has also been adjusted to allocate 4 MiB chunks by default, and a few changes have been made to the implementation to make a newly created, empty allocator use significantly less heap memory. * A `Resource` interface has been added, which defines the life-cycle methods and the `send` method. The `Buffer` interface extends this. * Analogues for `ByteBufHolder` has been added in the `BufferHolder` and `BufferRef` classes. * `ByteCursor` is added as a new way to iterate the data in buffers. The byte cursor API is designed to be more JIT friendly than an iterator, or the existing `ByteProcessor` interface. * `CompositeBuffer` no longer permit the same level of access to its internal components. The composite buffer enforces its ownership of its components via the `Send` API, and the components can only be individually accessed with the `forEachReadable` and `forEachWritable` methods. This keeps the API and behavioral differences between composite and non-composite buffers to a minimum. * Two implementations of the `Buffer` interface are provided with the API: One based on `ByteBuffer`, and one based on `sun.misc.Unsafe`. The `ByteBuffer` implementation is used by default. More implementations can be loaded from the classpath via service loading. The `MemorySegment` based implementation is left behind in the incubator repository. * An extensive and highly parameterised test suite has been added, to ensure that all implementations have consistent and correct behaviour, regardless of their configuration or composition. Result: We have a new buffer API that is simpler, better tested, more consistent in behaviour, and safer by design, than the existing `ByteBuf` API. The next legs of this journey will be about integrating this new API into Netty proper, and deprecate (and eventually remove) the `ByteBuf` API. This fixes #11024, #8601, #8543, #8542, #8534, #3358, and #3306.
2021-06-28 12:06:44 +02:00
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty5.buffer.api.AllocatorControl.UntetheredMemory;
import io.netty5.util.internal.StringUtil;
Introduce alternative Buffer API (#11347) Motivation: In Netty 5 we wish to have a simpler, safe, future proof, and more consistent buffer API. We developed such an API in the incubating buffer repository, and taking it through multiple rounds of review and adjustments. This PR/commit bring the results of that work into the Netty 5 branch of the main Netty repository. Modifications: * `Buffer` is an interface, and all implementations are hidden behind it. There is no longer an inheritance hierarchy of abstract classes and implementations. * Reference counting is gone. After a buffer has been allocated, calling `close` on it will deallocate it. It is then up to users and integrators to ensure that the life-times of buffers are managed correctly. This is usually not a problem as buffers tend to flow through the pipeline to be released after a terminal IO operation. * Slice and duplicate methods are replaced with `split`. By removing slices, duplicate, and reference counting, there is no longer a possibility that a buffer and/or its memory can be shared and accessible through multiple routes. This solves the problem of data being accessed from multiple places in an uncoordinated way, and the problem of buffer memory being closed while being in use by some unsuspecting piece of code. Some adjustments will have to be made to other APIs, idioms, and usages, since `split` is not always a replacement for `slice` in some use cases. * The `split` has been added which allows memory to be shared among multiple buffers, but in non-overlapping regions. When the memory regions don't overlap, it will not be possible for the different buffers to interfere with each other. An internal, and completely transparent, reference counting system ensures that the backing memory is released once the last buffer view is closed. * A Send API has been introduced that can be used to enforce (in the type system) the transfer of buffer ownership. This is not expected to be used in the pipeline flow itself, but rather for other objects that wrap buffers and wish to avoid becoming "shared views" — the absence of "shared views" of memory is important for avoiding bugs in the absence of reference counting. * A new BufferAllocator API, where the choice of implementation determines factors like on-/off-heap, pooling or not. How access to the different allocators will be exposed to integrators will be decided later. Perhaps they'll be directly accessible on the `ChannelHandlerContext`. * The `PooledBufferAllocator` has been copied and modified to match the new allocator API. This includes unifying its implementation that was previously split across on-heap and off-heap. * The `PooledBufferAllocator` implementation has also been adjusted to allocate 4 MiB chunks by default, and a few changes have been made to the implementation to make a newly created, empty allocator use significantly less heap memory. * A `Resource` interface has been added, which defines the life-cycle methods and the `send` method. The `Buffer` interface extends this. * Analogues for `ByteBufHolder` has been added in the `BufferHolder` and `BufferRef` classes. * `ByteCursor` is added as a new way to iterate the data in buffers. The byte cursor API is designed to be more JIT friendly than an iterator, or the existing `ByteProcessor` interface. * `CompositeBuffer` no longer permit the same level of access to its internal components. The composite buffer enforces its ownership of its components via the `Send` API, and the components can only be individually accessed with the `forEachReadable` and `forEachWritable` methods. This keeps the API and behavioral differences between composite and non-composite buffers to a minimum. * Two implementations of the `Buffer` interface are provided with the API: One based on `ByteBuffer`, and one based on `sun.misc.Unsafe`. The `ByteBuffer` implementation is used by default. More implementations can be loaded from the classpath via service loading. The `MemorySegment` based implementation is left behind in the incubator repository. * An extensive and highly parameterised test suite has been added, to ensure that all implementations have consistent and correct behaviour, regardless of their configuration or composition. Result: We have a new buffer API that is simpler, better tested, more consistent in behaviour, and safer by design, than the existing `ByteBuf` API. The next legs of this journey will be about integrating this new API into Netty proper, and deprecate (and eventually remove) the `ByteBuf` API. This fixes #11024, #8601, #8543, #8542, #8534, #3358, and #3306.
2021-06-28 12:06:44 +02:00
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
final class PoolChunkList implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.emptyIterator();
private final PoolArena arena;
private final PoolChunkList nextList;
private final int minUsage;
private final int maxUsage;
private final int maxCapacity;
private PoolChunk head;
private final int freeMinThreshold;
private final int freeMaxThreshold;
// This is only update once when create the linked like list of PoolChunkList in PoolArena constructor.
private PoolChunkList prevList;
PoolChunkList(PoolArena arena, PoolChunkList nextList, int minUsage, int maxUsage, int chunkSize) {
assert minUsage <= maxUsage;
this.arena = arena;
this.nextList = nextList;
this.minUsage = minUsage;
this.maxUsage = maxUsage;
maxCapacity = calculateMaxCapacity(minUsage, chunkSize);
// the thresholds are aligned with PoolChunk.usage() logic:
// 1) basic logic: usage() = 100 - freeBytes * 100L / chunkSize
// so, for example: (usage() >= maxUsage) condition can be transformed in the following way:
// 100 - freeBytes * 100L / chunkSize >= maxUsage
// freeBytes <= chunkSize * (100 - maxUsage) / 100
// let freeMinThreshold = chunkSize * (100 - maxUsage) / 100, then freeBytes <= freeMinThreshold
//
// 2) usage() returns an int value and has a floor rounding during a calculation,
// to be aligned absolute thresholds should be shifted for "the rounding step":
// freeBytes * 100 / chunkSize < 1
// the condition can be converted to: freeBytes < 1 * chunkSize / 100
// this is why we have + 0.99999999 shifts. A example why just +1 shift cannot be used:
// freeBytes = 16777216 == freeMaxThreshold: 16777216, usage = 0 < minUsage: 1, chunkSize: 16777216
// At the same time we want to have zero thresholds in case of (maxUsage == 100) and (minUsage == 100).
//
freeMinThreshold = maxUsage == 100 ? 0 : (int) (chunkSize * (100.0 - maxUsage + 0.99999999) / 100L);
freeMaxThreshold = minUsage == 100 ? 0 : (int) (chunkSize * (100.0 - minUsage + 0.99999999) / 100L);
}
/**
* Calculates the maximum capacity of a buffer that will ever be possible to allocate out of the {@link PoolChunk}s
* that belong to the {@link PoolChunkList} with the given {@code minUsage} and {@code maxUsage} settings.
*/
private static int calculateMaxCapacity(int minUsage, int chunkSize) {
minUsage = minUsage0(minUsage);
if (minUsage == 100) {
// If the minUsage is 100 we can not allocate anything out of this list.
return 0;
}
// Calculate the maximum amount of bytes that can be allocated from a PoolChunk in this PoolChunkList.
//
// As an example:
// - If a PoolChunkList has minUsage == 25 we are allowed to allocate at most 75% of the chunkSize because
// this is the maximum amount available in any PoolChunk in this PoolChunkList.
return (int) (chunkSize * (100L - minUsage) / 100L);
}
void prevList(PoolChunkList prevList) {
assert this.prevList == null;
this.prevList = prevList;
}
UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache threadCache, PooledAllocatorControl control) {
int normCapacity = arena.sizeIdx2size(sizeIdx);
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty, or the requested capacity is larger than the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return null;
}
for (PoolChunk cur = head; cur != null; cur = cur.next) {
UntetheredMemory memory = cur.allocate(size, sizeIdx, threadCache, control);
if (memory != null) {
if (cur.freeBytes <= freeMinThreshold) {
remove(cur);
nextList.add(cur);
}
return memory;
}
}
return null;
}
boolean free(PoolChunk chunk, long handle, int normCapacity) {
chunk.free(handle, normCapacity);
if (chunk.freeBytes > freeMaxThreshold) {
remove(chunk);
// Move the PoolChunk down the PoolChunkList linked-list.
return move0(chunk);
}
return true;
}
private boolean move(PoolChunk chunk) {
if (chunk.freeBytes > freeMaxThreshold) {
// Move the PoolChunk down the PoolChunkList linked-list.
return move0(chunk);
}
// PoolChunk fits into this PoolChunkList, adding it here.
add0(chunk);
return true;
}
/**
* Moves the {@link PoolChunk} down the {@link PoolChunkList} linked-list, so it will end up in the right
* {@link PoolChunkList} that has the correct minUsage / maxUsage in respect to {@link PoolChunk#usage()}.
*/
private boolean move0(PoolChunk chunk) {
if (prevList == null) {
// There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and
// all memory associated with the PoolChunk will be released.
return false;
}
return prevList.move(chunk);
}
void add(PoolChunk chunk) {
if (chunk.freeBytes <= freeMinThreshold) {
nextList.add(chunk);
return;
}
add0(chunk);
}
/**
* Adds the {@link PoolChunk} to this {@link PoolChunkList}.
*/
void add0(PoolChunk chunk) {
chunk.parent = this;
if (head == null) {
head = chunk;
chunk.prev = null;
chunk.next = null;
} else {
chunk.prev = null;
chunk.next = head;
head.prev = chunk;
head = chunk;
}
}
private void remove(PoolChunk cur) {
if (cur == head) {
head = cur.next;
if (head != null) {
head.prev = null;
}
} else {
PoolChunk next = cur.next;
cur.prev.next = next;
if (next != null) {
next.prev = cur.prev;
}
}
}
@Override
public int minUsage() {
return minUsage0(minUsage);
}
@Override
public int maxUsage() {
return min(maxUsage, 100);
}
private static int minUsage0(int value) {
return max(1, value);
}
@Override
public Iterator<PoolChunkMetric> iterator() {
synchronized (arena) {
if (head == null) {
return EMPTY_METRICS;
}
List<PoolChunkMetric> metrics = new ArrayList<>();
for (PoolChunk cur = head;;) {
metrics.add(cur);
cur = cur.next;
if (cur == null) {
break;
}
}
return metrics.iterator();
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
synchronized (arena) {
if (head == null) {
return "none";
}
for (PoolChunk cur = head;;) {
buf.append(cur);
cur = cur.next;
if (cur == null) {
break;
}
buf.append(StringUtil.NEWLINE);
}
}
return buf.toString();
}
void destroy() {
PoolChunk chunk = head;
while (chunk != null) {
chunk.destroy();
chunk = chunk.next;
}
head = null;
}
}