Update with support for shared segments
Motivation: We wish to make as little use of Unsafe as possible, yet have a flexible buffer API. With the proposed support for shared segments, we're able to clean up our buffer API and simplify the implementation. Modification: With shared segments, we are able to implement TransferSend using supported APIs. This allows us to remove RendezvousSend, and also our hacks in Statics. TransferSend now works by first creating a shared segment and using that for the handover. On the receiving end, if the segment was originally thread-confined, it will once again become confined, but this time to the receiver thread. Shared segments are just passed directly to their new owners. Pooled allocators always create shared memory segments for their buffers, so they can be shared easily via the pool. Result: We now have buffer ownership transfer with nice, convenient, APIs, and we have buffer pooling, all using supported APIs.
This commit is contained in:
parent
f6e5d26ce8
commit
94e3a00fd4
@ -80,7 +80,7 @@ public interface Allocator extends AutoCloseable {
|
|||||||
@Override
|
@Override
|
||||||
protected MemorySegment createMemorySegment(long size) {
|
protected MemorySegment createMemorySegment(long size) {
|
||||||
checkSize(size);
|
checkSize(size);
|
||||||
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
|
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]).withOwnerThread(null);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -90,7 +90,7 @@ public interface Allocator extends AutoCloseable {
|
|||||||
@Override
|
@Override
|
||||||
protected MemorySegment createMemorySegment(long size) {
|
protected MemorySegment createMemorySegment(long size) {
|
||||||
checkSize(size);
|
checkSize(size);
|
||||||
return MemorySegment.allocateNative(size);
|
return MemorySegment.allocateNative(size).withOwnerThread(null);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -100,7 +100,7 @@ public interface Allocator extends AutoCloseable {
|
|||||||
@Override
|
@Override
|
||||||
protected MemorySegment createMemorySegment(long size) {
|
protected MemorySegment createMemorySegment(long size) {
|
||||||
checkSize(size);
|
checkSize(size);
|
||||||
return MemorySegment.allocateNative(size);
|
return MemorySegment.allocateNative(size).withOwnerThread(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -21,7 +21,6 @@ import jdk.incubator.foreign.MemorySegment;
|
|||||||
import static io.netty.buffer.b2.Statics.*;
|
import static io.netty.buffer.b2.Statics.*;
|
||||||
|
|
||||||
class BBuf extends RcSupport<Buf, BBuf> implements Buf {
|
class BBuf extends RcSupport<Buf, BBuf> implements Buf {
|
||||||
static final Drop<BBuf> NO_DROP = buf -> { };
|
|
||||||
static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.segment.close();
|
static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.segment.close();
|
||||||
static final Drop<BBuf> SEGMENT_CLOSE_NATIVE = buf -> {
|
static final Drop<BBuf> SEGMENT_CLOSE_NATIVE = buf -> {
|
||||||
buf.segment.close();
|
buf.segment.close();
|
||||||
@ -196,22 +195,15 @@ class BBuf extends RcSupport<Buf, BBuf> implements Buf {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
|
protected Owned<BBuf> prepareSend() {
|
||||||
BBuf copy = new BBuf(segment.withOwnerThread(recipient), drop);
|
|
||||||
copy.read = read;
|
|
||||||
copy.write = write;
|
|
||||||
return copy;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected BBuf prepareSend() {
|
|
||||||
BBuf outer = this;
|
BBuf outer = this;
|
||||||
MemorySegment transferSegment = segment.withOwnerThread(TRANSFER_OWNER);
|
boolean isConfined = segment.ownerThread() == null;
|
||||||
return new BBuf(transferSegment, NO_DROP) {
|
MemorySegment transferSegment = isConfined? segment : segment.withOwnerThread(null);
|
||||||
|
return new Owned<BBuf>() {
|
||||||
@Override
|
@Override
|
||||||
public BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
|
public BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
|
||||||
overwriteMemorySegmentOwner(transferSegment, recipient);
|
var newSegment = isConfined? transferSegment.withOwnerThread(recipient) : transferSegment;
|
||||||
BBuf copy = new BBuf(transferSegment, drop);
|
BBuf copy = new BBuf(newSegment, drop);
|
||||||
copy.read = outer.read;
|
copy.read = outer.read;
|
||||||
copy.write = outer.write;
|
copy.write = outer.write;
|
||||||
return copy;
|
return copy;
|
||||||
|
@ -43,8 +43,7 @@ class NativeMemoryCleanerDrop implements Drop<BBuf> {
|
|||||||
var segment = buf.segment;
|
var segment = buf.segment;
|
||||||
cleanable = CLEANER.register(this, () -> {
|
cleanable = CLEANER.register(this, () -> {
|
||||||
if (segment.isAlive()) {
|
if (segment.isAlive()) {
|
||||||
// Clear owner so we can close from cleaner thread.
|
// TODO return segment to pool, or call out to external drop, instead of closing it directly.
|
||||||
overwriteMemorySegmentOwner(segment, null);
|
|
||||||
segment.close();
|
segment.close();
|
||||||
MEM_USAGE_NATIVE.add(-segment.byteSize());
|
MEM_USAGE_NATIVE.add(-segment.byteSize());
|
||||||
}
|
}
|
||||||
|
@ -15,8 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.b2;
|
package io.netty.buffer.b2;
|
||||||
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An Rc is a reference counted, thread-confined, resource of sorts. Because these resources are thread-confined, the
|
* An Rc is a reference counted, thread-confined, resource of sorts. Because these resources are thread-confined, the
|
||||||
* reference counting is NOT atomic. An Rc can only be accessed by one thread at a time - the owner thread that the
|
* reference counting is NOT atomic. An Rc can only be accessed by one thread at a time - the owner thread that the
|
||||||
@ -48,16 +46,6 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
|
|||||||
@Override
|
@Override
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
/**
|
|
||||||
* Send this Rc instance ot another Thread, transferring the ownsership fo the recipient, using a rendesvouz
|
|
||||||
* protocol. This method can be used when the sender wishes to block until the transfer completes. This requires
|
|
||||||
* that both threads be alive an running for the transfer to complete.
|
|
||||||
*
|
|
||||||
* @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance.
|
|
||||||
* @throws InterruptedException If this thread was interrupted
|
|
||||||
*/
|
|
||||||
void sendTo(Consumer<Send<I>> consumer) throws InterruptedException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
|
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
|
||||||
* when the receiving thread is not known up front.
|
* when the receiving thread is not known up front.
|
||||||
|
@ -15,9 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.b2;
|
package io.netty.buffer.b2;
|
||||||
|
|
||||||
import java.util.function.Consumer;
|
public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> implements Rc<I> {
|
||||||
|
|
||||||
public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> implements Rc<I>, Owned<T> {
|
|
||||||
private int acquires; // Closed if negative.
|
private int acquires; // Closed if negative.
|
||||||
private final Drop<T> drop;
|
private final Drop<T> drop;
|
||||||
|
|
||||||
@ -33,7 +31,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
|
|||||||
* @return This Rc instance.
|
* @return This Rc instance.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public I acquire() {
|
public final I acquire() {
|
||||||
if (acquires < 0) {
|
if (acquires < 0) {
|
||||||
throw new IllegalStateException("Resource is closed.");
|
throw new IllegalStateException("Resource is closed.");
|
||||||
}
|
}
|
||||||
@ -49,7 +47,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
|
|||||||
* @throws IllegalStateException If this Rc has already been closed.
|
* @throws IllegalStateException If this Rc has already been closed.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public final void close() {
|
||||||
if (acquires == -1) {
|
if (acquires == -1) {
|
||||||
throw new IllegalStateException("Double-free: Already closed and dropped.");
|
throw new IllegalStateException("Double-free: Already closed and dropped.");
|
||||||
}
|
}
|
||||||
@ -59,22 +57,6 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
|
|||||||
acquires--;
|
acquires--;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Send this Rc instance ot another Thread, transferring the ownsership fo the recipient, using a rendesvouz
|
|
||||||
* protocol. This method can be used when the sender wishes to block until the transfer completes. This requires
|
|
||||||
* that both threads be alive an running for the transfer to complete.
|
|
||||||
*
|
|
||||||
* @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance.
|
|
||||||
* @throws InterruptedException If this thread was interrupted
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void sendTo(Consumer<Send<I>> consumer) throws InterruptedException {
|
|
||||||
var send = new RendezvousSend<I, T>(impl(), drop);
|
|
||||||
consumer.accept(send);
|
|
||||||
send.finish();
|
|
||||||
acquires = -2; // close without dropping (also ignore future double-free attempts)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
|
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
|
||||||
* when the receiving thread is not known up front.
|
* when the receiving thread is not known up front.
|
||||||
@ -86,7 +68,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
|
|||||||
* currently owning thread.
|
* currently owning thread.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Send<I> send() {
|
public final Send<I> send() {
|
||||||
acquires = -2; // close without dropping (also ignore future double-free attempts)
|
acquires = -2; // close without dropping (also ignore future double-free attempts)
|
||||||
return new TransferSend<I, T>(prepareSend(), drop);
|
return new TransferSend<I, T>(prepareSend(), drop);
|
||||||
}
|
}
|
||||||
@ -94,14 +76,12 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
|
|||||||
/**
|
/**
|
||||||
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
|
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
|
||||||
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning
|
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning
|
||||||
* thread. In this state, the Rc instance should only allow a call to {@link #transferOwnership(Thread, Drop)} in
|
* thread. In this state, the Rc instance should only allow a call to {@link Owned#transferOwnership(Thread, Drop)} in
|
||||||
* the recipient thread.
|
* the recipient thread.
|
||||||
*
|
*
|
||||||
* @return This Rc instance in a deactivated state.
|
* @return This Rc instance in a deactivated state.
|
||||||
*/
|
*/
|
||||||
protected T prepareSend() {
|
protected abstract Owned<T> prepareSend();
|
||||||
return impl();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private I self() {
|
private I self() {
|
||||||
|
@ -1,67 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2020 The Netty Project
|
|
||||||
*
|
|
||||||
* The Netty Project licenses this file to you under the Apache License,
|
|
||||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at:
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package io.netty.buffer.b2;
|
|
||||||
|
|
||||||
import java.lang.invoke.VarHandle;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
|
|
||||||
import static io.netty.buffer.b2.Statics.*;
|
|
||||||
import static java.lang.invoke.MethodHandles.*;
|
|
||||||
|
|
||||||
class RendezvousSend<I extends Rc<I>, T extends Rc<I> & Owned<T>> implements Send<I> {
|
|
||||||
private static final VarHandle RECEIVED = findVarHandle(lookup(), RendezvousSend.class, "received", boolean.class);
|
|
||||||
private final CountDownLatch recipientLatch;
|
|
||||||
private final CountDownLatch sentLatch;
|
|
||||||
private final Drop<T> drop;
|
|
||||||
private final T outgoing;
|
|
||||||
@SuppressWarnings("unused")
|
|
||||||
private volatile boolean received; // Accessed via VarHandle
|
|
||||||
private volatile Thread recipient;
|
|
||||||
private volatile I incoming;
|
|
||||||
|
|
||||||
RendezvousSend(T outgoing, Drop<T> drop) {
|
|
||||||
this.outgoing = outgoing;
|
|
||||||
this.drop = drop;
|
|
||||||
recipientLatch = new CountDownLatch(1);
|
|
||||||
sentLatch = new CountDownLatch(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public I receive() {
|
|
||||||
if (!RECEIVED.compareAndSet(this, false, true)) {
|
|
||||||
throw new IllegalStateException("This object has already been received.");
|
|
||||||
}
|
|
||||||
recipient = Thread.currentThread();
|
|
||||||
recipientLatch.countDown();
|
|
||||||
try {
|
|
||||||
sentLatch.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
return incoming;
|
|
||||||
}
|
|
||||||
|
|
||||||
void finish() throws InterruptedException {
|
|
||||||
if (incoming != null) {
|
|
||||||
throw new IllegalStateException("Already sent.");
|
|
||||||
}
|
|
||||||
recipientLatch.await();
|
|
||||||
var transferred = outgoing.transferOwnership(recipient, drop);
|
|
||||||
incoming = (I) transferred;
|
|
||||||
drop.accept(transferred);
|
|
||||||
sentLatch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
@ -15,19 +15,11 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.b2;
|
package io.netty.buffer.b2;
|
||||||
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
|
||||||
import jdk.incubator.foreign.MemorySegment;
|
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles.Lookup;
|
import java.lang.invoke.MethodHandles.Lookup;
|
||||||
import java.lang.invoke.VarHandle;
|
import java.lang.invoke.VarHandle;
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
interface Statics {
|
interface Statics {
|
||||||
@SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod")
|
|
||||||
Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner");
|
|
||||||
long SCOPE = fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope");
|
|
||||||
long OWNER = fieldOffset("jdk.internal.foreign.MemoryScope", "owner");
|
|
||||||
LongAdder MEM_USAGE_NATIVE = new LongAdder();
|
LongAdder MEM_USAGE_NATIVE = new LongAdder();
|
||||||
|
|
||||||
static VarHandle findVarHandle(Lookup lookup, Class<?> recv, String name, Class<?> type) {
|
static VarHandle findVarHandle(Lookup lookup, Class<?> recv, String name, Class<?> type) {
|
||||||
@ -37,20 +29,4 @@ interface Statics {
|
|||||||
throw new ExceptionInInitializerError(e);
|
throw new ExceptionInInitializerError(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static long fieldOffset(String className, String fieldName) {
|
|
||||||
try {
|
|
||||||
Class<?> cls = Class.forName(className);
|
|
||||||
Field field = cls.getDeclaredField(fieldName);
|
|
||||||
return PlatformDependent.objectFieldOffset(field);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new ExceptionInInitializerError(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void overwriteMemorySegmentOwner(MemorySegment segment, Thread newOwner) {
|
|
||||||
Object scope = PlatformDependent.getObject(segment, SCOPE);
|
|
||||||
PlatformDependent.putObject(scope, OWNER, newOwner);
|
|
||||||
VarHandle.fullFence(); // Attempt to force visibility of overwritten final fields.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -20,18 +20,19 @@ import java.lang.invoke.VarHandle;
|
|||||||
import static io.netty.buffer.b2.Statics.*;
|
import static io.netty.buffer.b2.Statics.*;
|
||||||
import static java.lang.invoke.MethodHandles.*;
|
import static java.lang.invoke.MethodHandles.*;
|
||||||
|
|
||||||
class TransferSend<I extends Rc<I>, T extends Rc<I> & Owned<T>> implements Send<I> {
|
class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
|
||||||
private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
|
private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
|
||||||
private final T outgoing;
|
private final Owned<T> outgoing;
|
||||||
private final Drop<T> drop;
|
private final Drop<T> drop;
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
private volatile boolean received; // Accessed via VarHandle
|
private volatile boolean received; // Accessed via VarHandle
|
||||||
|
|
||||||
TransferSend(T outgoing, Drop<T> drop) {
|
TransferSend(Owned<T> outgoing, Drop<T> drop) {
|
||||||
this.outgoing = outgoing;
|
this.outgoing = outgoing;
|
||||||
this.drop = drop;
|
this.drop = drop;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public I receive() {
|
public I receive() {
|
||||||
if (!RECEIVED.compareAndSet(this, false, true)) {
|
if (!RECEIVED.compareAndSet(this, false, true)) {
|
||||||
|
@ -18,7 +18,6 @@ package io.netty.buffer.b2;
|
|||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AssumptionViolatedException;
|
import org.junit.AssumptionViolatedException;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
@ -89,50 +88,6 @@ public abstract class BBufTest {
|
|||||||
assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, buf.copy());
|
assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, buf.copy());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void allocateAndRendesvousWithThread() throws Exception {
|
|
||||||
ArrayBlockingQueue<Send<Buf>> queue = new ArrayBlockingQueue<>(10);
|
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
||||||
Future<Byte> future = executor.submit(() -> {
|
|
||||||
try (Buf byteBuf = queue.take().receive()) {
|
|
||||||
return byteBuf.readByte();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
executor.shutdown();
|
|
||||||
|
|
||||||
try (Buf buf = allocator.allocate(8)) {
|
|
||||||
buf.writeByte((byte) 42);
|
|
||||||
buf.sendTo(queue::offer);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals((byte) 42, future.get().byteValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void allocateAndRendesvousWithThreadViaSyncQueue() throws Exception {
|
|
||||||
SynchronousQueue<Send<Buf>> queue = new SynchronousQueue<>();
|
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
||||||
Future<Byte> future = executor.submit(() -> {
|
|
||||||
try (Buf byteBuf = queue.take().receive()) {
|
|
||||||
return byteBuf.readByte();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
executor.shutdown();
|
|
||||||
|
|
||||||
try (Buf buf = allocator.allocate(8)) {
|
|
||||||
buf.writeByte((byte) 42);
|
|
||||||
buf.sendTo(e -> {
|
|
||||||
try {
|
|
||||||
queue.put(e);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
throw new RuntimeException(ie);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEquals((byte) 42, future.get().byteValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void allocateAndSendToThread() throws Exception {
|
public void allocateAndSendToThread() throws Exception {
|
||||||
ArrayBlockingQueue<Send<Buf>> queue = new ArrayBlockingQueue<>(10);
|
ArrayBlockingQueue<Send<Buf>> queue = new ArrayBlockingQueue<>(10);
|
||||||
@ -198,7 +153,6 @@ public abstract class BBufTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore
|
|
||||||
@Test
|
@Test
|
||||||
public void mustAllowAllocatingMaxArraySizedBuffer() {
|
public void mustAllowAllocatingMaxArraySizedBuffer() {
|
||||||
try {
|
try {
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.b2;
|
package io.netty.buffer.b2;
|
||||||
|
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
@ -27,7 +26,6 @@ public class PooledDirectBBufWithCleanerTest extends DirectBBufTest {
|
|||||||
return Allocator.pooledDirectWithCleaner();
|
return Allocator.pooledDirectWithCleaner();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore
|
|
||||||
@Test
|
@Test
|
||||||
public void bufferMustBeClosedByCleaner() throws InterruptedException {
|
public void bufferMustBeClosedByCleaner() throws InterruptedException {
|
||||||
var allocator = createAllocator();
|
var allocator = createAllocator();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user