Overall cleanup
- ChannelOutboundBuffer.Entry.buffers -> bufs for consistency - Make Native.IOV_MAX final because it's a constant - Naming changes - FlushedMessageProcessor -> MessageProcessor just in case we can reuse it for unflushed messages in the future - Add ChannelOutboundBuffer.Entry.recycle() that does not return the next entry, and use it wherever possible - Javadoc clean-up
This commit is contained in:
parent
e282e504f1
commit
5e5d1a58fd
@ -17,6 +17,7 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
@ -24,7 +25,7 @@ import io.netty.util.internal.PlatformDependent;
|
||||
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
|
||||
* array copies.
|
||||
*
|
||||
* The buffers are written out directly into direct memory to match the struct iov. See also <code>man writev</code>.
|
||||
* The buffers are written out directly into direct memory to match the struct iov. See also {@code man writev}.
|
||||
*
|
||||
* <pre>
|
||||
* struct iovec {
|
||||
@ -34,19 +35,24 @@ import io.netty.util.internal.PlatformDependent;
|
||||
* </pre>
|
||||
*
|
||||
* See also
|
||||
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/">
|
||||
* Efficient JNI programming IV: Wrapping native data objects</a>.
|
||||
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/"
|
||||
* >Efficient JNI programming IV: Wrapping native data objects</a>.
|
||||
*/
|
||||
final class IovArray implements ChannelOutboundBuffer.FlushedMessageProcessor {
|
||||
// Maximal number of struct iov entries that can be passed to writev(...)
|
||||
private static final int IOV_MAX = Native.IOV_MAX;
|
||||
// The size of an address which should be 8 for 64 bits and 4 for 32 bits.
|
||||
final class IovArray implements MessageProcessor {
|
||||
|
||||
/** The size of an address which should be 8 for 64 bits and 4 for 32 bits. */
|
||||
private static final int ADDRESS_SIZE = PlatformDependent.addressSize();
|
||||
// The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the
|
||||
// address.
|
||||
|
||||
/**
|
||||
* The size of an {@code iovec} struct in bytes. This is calculated as we have 2 entries each of the size of the
|
||||
* address.
|
||||
*/
|
||||
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
|
||||
// The needed memory to hold up to IOV_MAX iov entries.
|
||||
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
|
||||
|
||||
/** The needed memory to hold up to {@link Native#IOV_MAX} iov entries, where {@link Native#IOV_MAX} signified
|
||||
* the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}.
|
||||
*/
|
||||
private static final int CAPACITY = Native.IOV_MAX * IOV_SIZE;
|
||||
|
||||
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
|
||||
@Override
|
||||
@ -74,7 +80,7 @@ final class IovArray implements ChannelOutboundBuffer.FlushedMessageProcessor {
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
private boolean add(ByteBuf buf) {
|
||||
if (count == IOV_MAX) {
|
||||
if (count == Native.IOV_MAX) {
|
||||
// No more room!
|
||||
return false;
|
||||
}
|
||||
@ -149,7 +155,7 @@ final class IovArray implements ChannelOutboundBuffer.FlushedMessageProcessor {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(Object msg) throws Exception {
|
||||
public boolean processMessage(Object msg) throws Exception {
|
||||
return msg instanceof ByteBuf && add((ByteBuf) msg);
|
||||
}
|
||||
|
||||
|
@ -51,7 +51,7 @@ final class Native {
|
||||
public static final int EPOLLOUT = 0x02;
|
||||
public static final int EPOLLACCEPT = 0x04;
|
||||
public static final int EPOLLRDHUP = 0x08;
|
||||
public static int IOV_MAX = iovMax();
|
||||
public static final int IOV_MAX = iovMax();
|
||||
|
||||
public static native int eventFd();
|
||||
public static native void eventFdWrite(int fd, long value);
|
||||
|
@ -294,7 +294,7 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
|
||||
// recycle the entry
|
||||
e.recycleAndGetNext();
|
||||
e.recycle();
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -320,7 +320,7 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
|
||||
// recycle the entry
|
||||
e.recycleAndGetNext();
|
||||
e.recycle();
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -384,11 +384,11 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
nioBuffers[nioBufferCount ++] = nioBuf;
|
||||
} else {
|
||||
ByteBuffer[] nioBufs = entry.buffers;
|
||||
ByteBuffer[] nioBufs = entry.bufs;
|
||||
if (nioBufs == null) {
|
||||
// cached ByteBuffers as they may be expensive to create in terms
|
||||
// of Object allocation
|
||||
entry.buffers = nioBufs = buf.nioBuffers();
|
||||
entry.bufs = nioBufs = buf.nioBuffers();
|
||||
}
|
||||
nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
|
||||
}
|
||||
@ -565,18 +565,18 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
|
||||
/**
|
||||
* Call {@link FlushedMessageProcessor#process(Object)} foreach flushed message
|
||||
* in this {@link ChannelOutboundBuffer} until {@link FlushedMessageProcessor#process(Object)}
|
||||
* returns {@code false} or ther are no more flushed messages to process.
|
||||
* Call {@link MessageProcessor#processMessage(Object)} for each flushed message
|
||||
* in this {@link ChannelOutboundBuffer} until {@link MessageProcessor#processMessage(Object)}
|
||||
* returns {@code false} or there are no more flushed messages to process.
|
||||
*/
|
||||
public void forEachFlushedMessage(FlushedMessageProcessor processor) throws Exception {
|
||||
public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
|
||||
if (processor == null) {
|
||||
throw new NullPointerException("processor");
|
||||
}
|
||||
Entry entry = flushedEntry;
|
||||
while (entry != null) {
|
||||
if (!entry.cancelled) {
|
||||
if (!processor.process(entry.msg)) {
|
||||
if (!processor.processMessage(entry.msg)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -584,12 +584,12 @@ public final class ChannelOutboundBuffer {
|
||||
}
|
||||
}
|
||||
|
||||
public interface FlushedMessageProcessor {
|
||||
public interface MessageProcessor {
|
||||
/**
|
||||
* Will be called for each flushed message until it either there are no more flushed messages or this
|
||||
* method returns {@code false}.
|
||||
*/
|
||||
boolean process(Object msg) throws Exception;
|
||||
boolean processMessage(Object msg) throws Exception;
|
||||
}
|
||||
|
||||
static final class Entry {
|
||||
@ -603,7 +603,7 @@ public final class ChannelOutboundBuffer {
|
||||
private final Handle handle;
|
||||
Entry next;
|
||||
Object msg;
|
||||
ByteBuffer[] buffers;
|
||||
ByteBuffer[] bufs;
|
||||
ByteBuffer buf;
|
||||
ChannelPromise promise;
|
||||
long progress;
|
||||
@ -637,17 +637,16 @@ public final class ChannelOutboundBuffer {
|
||||
pendingSize = 0;
|
||||
total = 0;
|
||||
progress = 0;
|
||||
buffers = null;
|
||||
bufs = null;
|
||||
buf = null;
|
||||
return pSize;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
Entry recycleAndGetNext() {
|
||||
Entry e = next;
|
||||
void recycle() {
|
||||
next = null;
|
||||
buffers = null;
|
||||
bufs = null;
|
||||
buf = null;
|
||||
msg = null;
|
||||
promise = null;
|
||||
@ -657,7 +656,12 @@ public final class ChannelOutboundBuffer {
|
||||
count = -1;
|
||||
cancelled = false;
|
||||
RECYCLER.recycle(this, handle);
|
||||
return e;
|
||||
}
|
||||
|
||||
Entry recycleAndGetNext() {
|
||||
Entry next = this.next;
|
||||
recycle();
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user