diff --git a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java index 2c132de..53405e7 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java @@ -80,7 +80,7 @@ public interface Allocator extends AutoCloseable { @Override protected MemorySegment createMemorySegment(long size) { checkSize(size); - return MemorySegment.ofArray(new byte[Math.toIntExact(size)]); + return MemorySegment.ofArray(new byte[Math.toIntExact(size)]).withOwnerThread(null); } }; } @@ -90,7 +90,7 @@ public interface Allocator extends AutoCloseable { @Override protected MemorySegment createMemorySegment(long size) { checkSize(size); - return MemorySegment.allocateNative(size); + return MemorySegment.allocateNative(size).withOwnerThread(null); } }; } @@ -100,7 +100,7 @@ public interface Allocator extends AutoCloseable { @Override protected MemorySegment createMemorySegment(long size) { checkSize(size); - return MemorySegment.allocateNative(size); + return MemorySegment.allocateNative(size).withOwnerThread(null); } @Override diff --git a/buffer/src/main/java/io/netty/buffer/b2/BBuf.java b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java index 9919857..4a95181 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/BBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java @@ -21,7 +21,6 @@ import jdk.incubator.foreign.MemorySegment; import static io.netty.buffer.b2.Statics.*; class BBuf extends RcSupport implements Buf { - static final Drop NO_DROP = buf -> { }; static final Drop SEGMENT_CLOSE = buf -> buf.segment.close(); static final Drop SEGMENT_CLOSE_NATIVE = buf -> { buf.segment.close(); @@ -196,22 +195,15 @@ class BBuf extends RcSupport implements Buf { } @Override - public BBuf transferOwnership(Thread recipient, Drop drop) { - BBuf copy = new BBuf(segment.withOwnerThread(recipient), drop); - copy.read = read; - copy.write = write; - return copy; - } - - @Override - protected BBuf prepareSend() { + protected Owned prepareSend() { BBuf outer = this; - MemorySegment transferSegment = segment.withOwnerThread(TRANSFER_OWNER); - return new BBuf(transferSegment, NO_DROP) { + boolean isConfined = segment.ownerThread() == null; + MemorySegment transferSegment = isConfined? segment : segment.withOwnerThread(null); + return new Owned() { @Override public BBuf transferOwnership(Thread recipient, Drop drop) { - overwriteMemorySegmentOwner(transferSegment, recipient); - BBuf copy = new BBuf(transferSegment, drop); + var newSegment = isConfined? transferSegment.withOwnerThread(recipient) : transferSegment; + BBuf copy = new BBuf(newSegment, drop); copy.read = outer.read; copy.write = outer.write; return copy; diff --git a/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java b/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java index d3d282f..95ae3ce 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java +++ b/buffer/src/main/java/io/netty/buffer/b2/NativeMemoryCleanerDrop.java @@ -43,8 +43,7 @@ class NativeMemoryCleanerDrop implements Drop { var segment = buf.segment; cleanable = CLEANER.register(this, () -> { if (segment.isAlive()) { - // Clear owner so we can close from cleaner thread. - overwriteMemorySegmentOwner(segment, null); + // TODO return segment to pool, or call out to external drop, instead of closing it directly. segment.close(); MEM_USAGE_NATIVE.add(-segment.byteSize()); } diff --git a/buffer/src/main/java/io/netty/buffer/b2/Rc.java b/buffer/src/main/java/io/netty/buffer/b2/Rc.java index a18318a..ac7ea13 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Rc.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Rc.java @@ -15,8 +15,6 @@ */ package io.netty.buffer.b2; -import java.util.function.Consumer; - /** * An Rc is a reference counted, thread-confined, resource of sorts. Because these resources are thread-confined, the * reference counting is NOT atomic. An Rc can only be accessed by one thread at a time - the owner thread that the @@ -48,16 +46,6 @@ public interface Rc> extends AutoCloseable { @Override void close(); - /** - * Send this Rc instance ot another Thread, transferring the ownsership fo the recipient, using a rendesvouz - * protocol. This method can be used when the sender wishes to block until the transfer completes. This requires - * that both threads be alive an running for the transfer to complete. - * - * @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance. - * @throws InterruptedException If this thread was interrupted - */ - void sendTo(Consumer> consumer) throws InterruptedException; - /** * Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used * when the receiving thread is not known up front. diff --git a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java index 2f40447..1ee365e 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java +++ b/buffer/src/main/java/io/netty/buffer/b2/RcSupport.java @@ -15,9 +15,7 @@ */ package io.netty.buffer.b2; -import java.util.function.Consumer; - -public abstract class RcSupport, T extends RcSupport> implements Rc, Owned { +public abstract class RcSupport, T extends RcSupport> implements Rc { private int acquires; // Closed if negative. private final Drop drop; @@ -33,7 +31,7 @@ public abstract class RcSupport, T extends RcSupport> impl * @return This Rc instance. */ @Override - public I acquire() { + public final I acquire() { if (acquires < 0) { throw new IllegalStateException("Resource is closed."); } @@ -49,7 +47,7 @@ public abstract class RcSupport, T extends RcSupport> impl * @throws IllegalStateException If this Rc has already been closed. */ @Override - public void close() { + public final void close() { if (acquires == -1) { throw new IllegalStateException("Double-free: Already closed and dropped."); } @@ -59,22 +57,6 @@ public abstract class RcSupport, T extends RcSupport> impl acquires--; } - /** - * Send this Rc instance ot another Thread, transferring the ownsership fo the recipient, using a rendesvouz - * protocol. This method can be used when the sender wishes to block until the transfer completes. This requires - * that both threads be alive an running for the transfer to complete. - * - * @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance. - * @throws InterruptedException If this thread was interrupted - */ - @Override - public void sendTo(Consumer> consumer) throws InterruptedException { - var send = new RendezvousSend(impl(), drop); - consumer.accept(send); - send.finish(); - acquires = -2; // close without dropping (also ignore future double-free attempts) - } - /** * Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used * when the receiving thread is not known up front. @@ -86,7 +68,7 @@ public abstract class RcSupport, T extends RcSupport> impl * currently owning thread. */ @Override - public Send send() { + public final Send send() { acquires = -2; // close without dropping (also ignore future double-free attempts) return new TransferSend(prepareSend(), drop); } @@ -94,14 +76,12 @@ public abstract class RcSupport, T extends RcSupport> impl /** * Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread. * This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning - * thread. In this state, the Rc instance should only allow a call to {@link #transferOwnership(Thread, Drop)} in + * thread. In this state, the Rc instance should only allow a call to {@link Owned#transferOwnership(Thread, Drop)} in * the recipient thread. * * @return This Rc instance in a deactivated state. */ - protected T prepareSend() { - return impl(); - } + protected abstract Owned prepareSend(); @SuppressWarnings("unchecked") private I self() { diff --git a/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java b/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java deleted file mode 100644 index 88535ba..0000000 --- a/buffer/src/main/java/io/netty/buffer/b2/RendezvousSend.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2020 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.buffer.b2; - -import java.lang.invoke.VarHandle; -import java.util.concurrent.CountDownLatch; - -import static io.netty.buffer.b2.Statics.*; -import static java.lang.invoke.MethodHandles.*; - -class RendezvousSend, T extends Rc & Owned> implements Send { - private static final VarHandle RECEIVED = findVarHandle(lookup(), RendezvousSend.class, "received", boolean.class); - private final CountDownLatch recipientLatch; - private final CountDownLatch sentLatch; - private final Drop drop; - private final T outgoing; - @SuppressWarnings("unused") - private volatile boolean received; // Accessed via VarHandle - private volatile Thread recipient; - private volatile I incoming; - - RendezvousSend(T outgoing, Drop drop) { - this.outgoing = outgoing; - this.drop = drop; - recipientLatch = new CountDownLatch(1); - sentLatch = new CountDownLatch(1); - } - - @Override - public I receive() { - if (!RECEIVED.compareAndSet(this, false, true)) { - throw new IllegalStateException("This object has already been received."); - } - recipient = Thread.currentThread(); - recipientLatch.countDown(); - try { - sentLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return incoming; - } - - void finish() throws InterruptedException { - if (incoming != null) { - throw new IllegalStateException("Already sent."); - } - recipientLatch.await(); - var transferred = outgoing.transferOwnership(recipient, drop); - incoming = (I) transferred; - drop.accept(transferred); - sentLatch.countDown(); - } -} diff --git a/buffer/src/main/java/io/netty/buffer/b2/Statics.java b/buffer/src/main/java/io/netty/buffer/b2/Statics.java index cb52a61..2a5a126 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Statics.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Statics.java @@ -15,19 +15,11 @@ */ package io.netty.buffer.b2; -import io.netty.util.internal.PlatformDependent; -import jdk.incubator.foreign.MemorySegment; - import java.lang.invoke.MethodHandles.Lookup; import java.lang.invoke.VarHandle; -import java.lang.reflect.Field; import java.util.concurrent.atomic.LongAdder; interface Statics { - @SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod") - Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner"); - long SCOPE = fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope"); - long OWNER = fieldOffset("jdk.internal.foreign.MemoryScope", "owner"); LongAdder MEM_USAGE_NATIVE = new LongAdder(); static VarHandle findVarHandle(Lookup lookup, Class recv, String name, Class type) { @@ -37,20 +29,4 @@ interface Statics { throw new ExceptionInInitializerError(e); } } - - static long fieldOffset(String className, String fieldName) { - try { - Class cls = Class.forName(className); - Field field = cls.getDeclaredField(fieldName); - return PlatformDependent.objectFieldOffset(field); - } catch (Exception e) { - throw new ExceptionInInitializerError(e); - } - } - - static void overwriteMemorySegmentOwner(MemorySegment segment, Thread newOwner) { - Object scope = PlatformDependent.getObject(segment, SCOPE); - PlatformDependent.putObject(scope, OWNER, newOwner); - VarHandle.fullFence(); // Attempt to force visibility of overwritten final fields. - } } diff --git a/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java b/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java index c308093..3c479c4 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java +++ b/buffer/src/main/java/io/netty/buffer/b2/TransferSend.java @@ -20,18 +20,19 @@ import java.lang.invoke.VarHandle; import static io.netty.buffer.b2.Statics.*; import static java.lang.invoke.MethodHandles.*; -class TransferSend, T extends Rc & Owned> implements Send { +class TransferSend, T extends Rc> implements Send { private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class); - private final T outgoing; + private final Owned outgoing; private final Drop drop; @SuppressWarnings("unused") private volatile boolean received; // Accessed via VarHandle - TransferSend(T outgoing, Drop drop) { + TransferSend(Owned outgoing, Drop drop) { this.outgoing = outgoing; this.drop = drop; } + @SuppressWarnings("unchecked") @Override public I receive() { if (!RECEIVED.compareAndSet(this, false, true)) { diff --git a/buffer/src/test/java/io/netty/buffer/b2/BBufTest.java b/buffer/src/test/java/io/netty/buffer/b2/BBufTest.java index cde7945..8144835 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/BBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/BBufTest.java @@ -18,7 +18,6 @@ package io.netty.buffer.b2; import org.junit.After; import org.junit.AssumptionViolatedException; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.ArrayBlockingQueue; @@ -89,50 +88,6 @@ public abstract class BBufTest { assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, buf.copy()); } - @Test - public void allocateAndRendesvousWithThread() throws Exception { - ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10); - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(() -> { - try (Buf byteBuf = queue.take().receive()) { - return byteBuf.readByte(); - } - }); - executor.shutdown(); - - try (Buf buf = allocator.allocate(8)) { - buf.writeByte((byte) 42); - buf.sendTo(queue::offer); - } - - assertEquals((byte) 42, future.get().byteValue()); - } - - @Test - public void allocateAndRendesvousWithThreadViaSyncQueue() throws Exception { - SynchronousQueue> queue = new SynchronousQueue<>(); - ExecutorService executor = Executors.newSingleThreadExecutor(); - Future future = executor.submit(() -> { - try (Buf byteBuf = queue.take().receive()) { - return byteBuf.readByte(); - } - }); - executor.shutdown(); - - try (Buf buf = allocator.allocate(8)) { - buf.writeByte((byte) 42); - buf.sendTo(e -> { - try { - queue.put(e); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } - }); - } - - assertEquals((byte) 42, future.get().byteValue()); - } - @Test public void allocateAndSendToThread() throws Exception { ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10); @@ -198,7 +153,6 @@ public abstract class BBufTest { } } - @Ignore @Test public void mustAllowAllocatingMaxArraySizedBuffer() { try { diff --git a/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java index aa94f9d..374c4ae 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufWithCleanerTest.java @@ -15,7 +15,6 @@ */ package io.netty.buffer.b2; -import org.junit.Ignore; import org.junit.Test; import static org.hamcrest.Matchers.*; @@ -27,7 +26,6 @@ public class PooledDirectBBufWithCleanerTest extends DirectBBufTest { return Allocator.pooledDirectWithCleaner(); } - @Ignore @Test public void bufferMustBeClosedByCleaner() throws InterruptedException { var allocator = createAllocator();