Address the remaining PR comments from the prototype.

This commit is contained in:
Chris Vest 2020-08-24 16:03:02 +02:00
parent 3598c575d8
commit 917bfb24f7
13 changed files with 200 additions and 16 deletions

View File

@ -4,6 +4,9 @@ import jdk.incubator.foreign.MemorySegment;
import static io.netty.buffer.b2.BBuf.*;
/**
* Interface for {@link BBuf} allocators.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface Allocator extends AutoCloseable {
static void checkSize(long size) {
@ -18,8 +21,19 @@ public interface Allocator extends AutoCloseable {
}
}
/**
* Allocate a {@link BBuf} of the given size in bytes. This method may throw an {@link OutOfMemoryError} if there is
* not enough free memory available to allocate a {@link BBuf} of the requested size.
*
* @param size The size of {@link BBuf} to allocate.
* @return The newly allocated {@link BBuf}.
*/
BBuf allocate(long size);
/**
* Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be
* used after this method has been called on it.
*/
@Override
default void close() {
}

View File

@ -24,6 +24,10 @@ public class BBuf extends Rc<BBuf> {
}
public BBuf readerIndex(int index) {
if (index < 0 || segment.byteSize() <= index) {
throw new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [0 to " + segment.byteSize() + "].");
}
read = index;
return this;
}
@ -53,11 +57,11 @@ public class BBuf extends Rc<BBuf> {
segment.fill(value);
}
public long getNativeAddress() {
long getNativeAddress() {
try {
return segment.address().toRawLongValue();
} catch (UnsupportedOperationException e) {
return 0; // This is a heap segment. Probably.
return 0; // This is a heap segment.
}
}

View File

@ -2,11 +2,27 @@ package io.netty.buffer.b2;
import java.util.function.Consumer;
/**
* The Drop interface is used by {@link Rc} instances to implement their resource disposal mechanics. The {@link
* #drop(Rc)} method will be called by the Rc when their last reference is closed.
*
* @param <T>
*/
@FunctionalInterface
public interface Drop<T extends Rc<T>> extends Consumer<T> {
/**
* Dispose of the resources in the given Rc.
*
* @param obj The Rc instance being dropped.
*/
void drop(T obj);
/**
* Called when the resource changes owner.
*
* @param obj The new Rc instance with the new owner.
*/
@Override
default void accept(T t) {
default void accept(T obj) {
}
}

View File

@ -2,14 +2,32 @@ package io.netty.buffer.b2;
import java.util.function.Consumer;
/**
* An Rc is a reference counted, thread-confined, resource of sorts. Because these resources are thread-confined, the
* reference counting is NOT atomic. An Rc can only be accessed by one thread at a time - the owner thread that the
* resource is confined to.
* <p>
* When the last reference is closed (accounted for using {@link AutoCloseable} and try-with-resources statements,
* ideally), then the resource is desposed of, or released, or returned to the pool it came from. The precise action is
* implemented by the {@link Drop} instance given as an argument to the Rc constructor.
*
* @param <T> The concrete subtype.
*/
public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
private int acquires; // closed if negative
private int acquires; // Closed if negative.
private final Drop<T> drop;
Rc(Drop<T> drop) {
this.drop = drop;
}
/**
* Increment the reference count.
* <p>
* Note, this method is not thread-safe because Rc's are meant to thread-confined.
*
* @return This Rc instance.
*/
public T acquire() {
if (acquires < 0) {
throw new IllegalStateException("Resource is closed.");
@ -18,6 +36,13 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
return self();
}
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>
* Note, this method is not thread-safe because Rc's are meant to be thread-confined.
*
* @throws IllegalStateException If this Rc has already been closed.
*/
@Override
public void close() {
if (acquires == -1) {
@ -29,6 +54,14 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
acquires--;
}
/**
* Send this Rc instance ot another Thread, transferring the ownsership fo the recipient, using a rendesvouz
* protocol. This method can be used when the sender wishes to block until the transfer completes. This requires
* that both threads be alive an running for the transfer to complete.
*
* @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance.
* @throws InterruptedException If this thread was interrupted
*/
public void sendTo(Consumer<Send<T>> consumer) throws InterruptedException {
var send = new RendezvousSend<>(self(), drop);
consumer.accept(send);
@ -37,6 +70,12 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
}
/**
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
* when the receiving thread is not known up front.
* <p>
* This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link
* #close()} will have no effect, so this method is safe to call within a try-with-resources statement.
*
* @implNote Not possible without hacks because we need the receiving thread in order to set the new owner in the
* currently owning thread.
*/
@ -46,21 +85,27 @@ public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
}
/**
* Transfer the ownership of this Rc, to the given recipient thread.
* This Rc is invalidated but without disposing of its internal state.
* Then a new Rc with the given owner is produced in its stead.
* Transfer the ownership of this Rc, to the given recipient thread. This Rc is invalidated but without disposing of
* its internal state. Then a new Rc with the given owner is produced in its stead.
* <p>
* This method is called by {@link Send} implementations.
* These implementations will ensure that the transfer of ownership (the calling of this
* method) happens-before the new owner begins accessing the new object.
* This ensures that the new Rc is safely published to the new owners.
* This method is called by {@link Send} implementations. These implementations will ensure that the transfer of
* ownership (the calling of this method) happens-before the new owner begins accessing the new object. This ensures
* that the new Rc is safely published to the new owners.
*
* @param recipient The new owner of the state represented by this Rc.
* @param drop The drop object that knows how to dispose of the state represented by this Rc.
* @param drop The drop object that knows how to dispose of the state represented by this Rc.
* @return A new Rc instance that is exactly the same as this Rc, except it has the new owner.
*/
protected abstract T transferOwnership(Thread recipient, Drop<T> drop);
/**
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning
* thread. In this state, the Rc instance should only allow a call to {@link #transferOwnership(Thread, Drop)} in
* the recipient thread.
*
* @return This Rc instance in a deactivated state.
*/
protected T prepareSend() {
return self();
}

View File

@ -0,0 +1,8 @@
package io.netty.buffer.b2;
/**
* Thrown when resource disposal fails while closing a resource pool.
*/
public class ResourceDisposeFailedException extends RuntimeException {
private static final long serialVersionUID = -1413426368835341993L;
}

View File

@ -1,6 +1,28 @@
package io.netty.buffer.b2;
/**
* A Send object is a temporary holder of an {@link Rc}, used for transferring the ownership of the Rc from one thread
* to another.
* <p>
* Prior to the Send being created, the originating Rc is invalidated, to prevent access while it is being sent. This
* means it cannot be accessed, closed, or disposed of, while it is in-flight. Once the Rc is {@linkplain #receive()
* received}, the new ownership is established.
* <p>
* Care must be taken to ensure that the Rc is always received by some thread. Failure to do so can result in a resource
* leak.
*
* @param <T>
*/
@FunctionalInterface
public interface Send<T extends Rc<T>> {
/**
* Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the
* sent Rc in the sending thread happens-before the return of this method.
* <p>
* This method can only be called once, and will throw otherwise.
*
* @return The sent Rc instance.
* @throws IllegalStateException If this method is called more than once.
*/
T receive();
}

View File

@ -3,6 +3,7 @@ package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -45,12 +46,22 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
@Override
public void close() {
if (CLOSE.compareAndSet(this, false, true)) {
var capturedExceptions = new ArrayList<Exception>(4);
pool.forEach((k,v) -> {
Send<BBuf> send;
while ((send = v.poll()) != null) {
dispose(send.receive());
try {
dispose(send.receive());
} catch (Exception e) {
capturedExceptions.add(e);
}
}
});
if (!capturedExceptions.isEmpty()) {
var exception = new ResourceDisposeFailedException();
capturedExceptions.forEach(exception::addSuppressed);
throw exception;
}
}
}

View File

@ -1,8 +1,10 @@
package io.netty.buffer.b2;
import org.junit.AssumptionViolatedException;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.BufferUnderflowException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -182,6 +184,7 @@ public abstract class BBufTest {
}
}
@Ignore
@Test
public void mustAllowAllocatingMaxArraySizedBuffer() {
try (Allocator allocator = createAllocator()) {
@ -193,5 +196,40 @@ public abstract class BBufTest {
}
}
}
@Test
public void setReaderIndexMustThrowOnNegativeIndex() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
try {
buf.readerIndex(-1);
fail("Expected an exception to be thrown.");
} catch (IndexOutOfBoundsException e) {
// Good.
}
}
}
@Test
public void setReaderIndexMustThrowOnOversizedIndex() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
try {
buf.readerIndex(8);
fail("Expected an exception to be thrown.");
} catch (IndexOutOfBoundsException e) {
// Good.
}
}
}
@Test
public void setReaderIndexMustNotThrowWithinBounds() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
buf.readerIndex(0);
buf.readerIndex(7);
}
}
}

View File

@ -1,8 +1,20 @@
package io.netty.buffer.b2;
import org.junit.Test;
import static org.junit.Assert.*;
public class DirectBBufTest extends BBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.direct();
}
@Test
public void directBufferMustHaveNonZeroAddress() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
assertNotEquals(0, buf.getNativeAddress());
}
}
}

View File

@ -1,8 +1,20 @@
package io.netty.buffer.b2;
import org.junit.Test;
import static org.junit.Assert.*;
public class HeapBBufTest extends BBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.heap();
}
@Test
public void heapBufferMustHaveZeroAddress() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
assertEquals(0, buf.getNativeAddress());
}
}
}

View File

@ -1,6 +1,6 @@
package io.netty.buffer.b2;
public class PooledDirectBBufTest extends BBufTest {
public class PooledDirectBBufTest extends DirectBBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.pooledDirect();

View File

@ -1,16 +1,18 @@
package io.netty.buffer.b2;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public class PooledDirectBBufWithCleanerTest extends BBufTest {
public class PooledDirectBBufWithCleanerTest extends DirectBBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.pooledDirectWithCleaner();
}
@Ignore
@Test
public void bufferMustBeClosedByCleaner() throws InterruptedException {
var allocator = createAllocator();

View File

@ -1,6 +1,6 @@
package io.netty.buffer.b2;
public class PooledHeapBBufTest extends BBufTest {
public class PooledHeapBBufTest extends HeapBBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.pooledHeap();