Optimize native transport for gathering writes
Motivation: While benchmarking the native transport with gathering writes I noticed that it is quite slow. This is due the fact that we need to do a lot of array copies to get the buffers into the iov array. Modification: Introduce a new class calles IovArray which allows to fill buffers directly in a iov array that can be passed over to JNI without any array copies. This gives a nice optimization in terms of speed when doing gathering writes. Result: Big performance improvement when doing gathering writes. See the included benchmark... Before: [nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256 http://xxx:8080/plaintext Running 2m test @ http://xxx:8080/plaintext 16 threads and 256 connections Thread Stats Avg Stdev Max +/- Stdev Latency 23.44ms 16.37ms 259.57ms 91.77% Req/Sec 181.99k 31.69k 304.60k 78.12% 346544071 requests in 2.00m, 46.48GB read Requests/sec: 2887885.09 Transfer/sec: 396.59MB With this change: [nmaurer@xxx]~% wrk/wrk -H 'Host: localhost' -H 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' -H 'Connection: keep-alive' -d 120 -c 256 -t 16 --pipeline 256 http://xxx:8080/plaintext Running 2m test @ http://xxx:8080/plaintext 16 threads and 256 connections Thread Stats Avg Stdev Max +/- Stdev Latency 21.93ms 16.33ms 305.73ms 92.34% Req/Sec 194.56k 33.75k 309.33k 77.04% 369617503 requests in 2.00m, 49.57GB read Requests/sec: 3080169.65 Transfer/sec: 423.00MB
This commit is contained in:
parent
5b2bdd844d
commit
88bd6e7a93
@ -80,6 +80,8 @@ public final class PlatformDependent {
|
||||
|
||||
private static final int BIT_MODE = bitMode0();
|
||||
|
||||
private static final int ADDRESS_SIZE = addressSize0();
|
||||
|
||||
static {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
|
||||
@ -173,6 +175,22 @@ public final class PlatformDependent {
|
||||
return BIT_MODE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the address size of the OS.
|
||||
* 4 (for 32 bits systems ) and 8 (for 64 bits systems).
|
||||
*/
|
||||
public static int addressSize() {
|
||||
return ADDRESS_SIZE;
|
||||
}
|
||||
|
||||
public static long allocateMemory(long size) {
|
||||
return PlatformDependent0.allocateMemory(size);
|
||||
}
|
||||
|
||||
public static void freeMemory(long address) {
|
||||
PlatformDependent0.freeMemory(address);
|
||||
}
|
||||
|
||||
/**
|
||||
* Raises an exception bypassing compiler checks for checked exceptions.
|
||||
*/
|
||||
@ -815,6 +833,13 @@ public final class PlatformDependent {
|
||||
}
|
||||
}
|
||||
|
||||
private static int addressSize0() {
|
||||
if (!hasUnsafe()) {
|
||||
return -1;
|
||||
}
|
||||
return PlatformDependent0.addressSize();
|
||||
}
|
||||
|
||||
private PlatformDependent() {
|
||||
// only static method supported
|
||||
}
|
||||
|
@ -365,6 +365,18 @@ final class PlatformDependent0 {
|
||||
}
|
||||
}
|
||||
|
||||
static int addressSize() {
|
||||
return UNSAFE.addressSize();
|
||||
}
|
||||
|
||||
static long allocateMemory(long size) {
|
||||
return UNSAFE.allocateMemory(size);
|
||||
}
|
||||
|
||||
static void freeMemory(long address) {
|
||||
UNSAFE.freeMemory(address);
|
||||
}
|
||||
|
||||
private PlatformDependent0() {
|
||||
}
|
||||
|
||||
|
@ -44,10 +44,7 @@ jfieldID limitFieldId = NULL;
|
||||
jfieldID fileChannelFieldId = NULL;
|
||||
jfieldID transferedFieldId = NULL;
|
||||
jfieldID fdFieldId = NULL;
|
||||
jfieldID fileDescriptorFieldId = NULL;
|
||||
jfieldID readerIndexFieldId = NULL;
|
||||
jfieldID writerIndexFieldId = NULL;
|
||||
jfieldID memoryAddressFieldId = NULL;
|
||||
jfieldID fileDescriptorFieldId = NULL;;
|
||||
jmethodID inetSocketAddrMethodId = NULL;
|
||||
jmethodID datagramSocketAddrMethodId = NULL;
|
||||
jclass runtimeExceptionClass = NULL;
|
||||
@ -393,27 +390,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
|
||||
throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress");
|
||||
return JNI_ERR;
|
||||
}
|
||||
|
||||
jclass addressEntryClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollChannelOutboundBuffer$AddressEntry");
|
||||
if (addressEntryClass == NULL) {
|
||||
// pending exception...
|
||||
return JNI_ERR;
|
||||
}
|
||||
readerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "readerIndex", "I");
|
||||
if (readerIndexFieldId == NULL) {
|
||||
// pending exception...
|
||||
return JNI_ERR;
|
||||
}
|
||||
writerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "writerIndex", "I");
|
||||
if (writerIndexFieldId == NULL) {
|
||||
// pending exception...
|
||||
return JNI_ERR;
|
||||
}
|
||||
memoryAddressFieldId = (*env)->GetFieldID(env, addressEntryClass, "memoryAddress", "J");
|
||||
if (memoryAddressFieldId == NULL) {
|
||||
// pending exception...
|
||||
return JNI_ERR;
|
||||
}
|
||||
return JNI_VERSION_1_6;
|
||||
}
|
||||
}
|
||||
@ -691,7 +667,7 @@ JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFromAddress(JNI
|
||||
return recvFrom0(env, fd, (void*) address, pos, limit);
|
||||
}
|
||||
|
||||
jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint length) {
|
||||
jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec * iov, jint length) {
|
||||
ssize_t res;
|
||||
int err;
|
||||
do {
|
||||
@ -755,28 +731,8 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env,
|
||||
return writev0(env, clazz, fd, iov, length);
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) {
|
||||
struct iovec iov[length];
|
||||
int iovidx = 0;
|
||||
int i;
|
||||
int num = offset + length;
|
||||
for (i = offset; i < num; i++) {
|
||||
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i);
|
||||
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId);
|
||||
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId);
|
||||
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId);
|
||||
|
||||
iov[iovidx].iov_base = memoryAddress + readerIndex;
|
||||
iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex);
|
||||
iovidx++;
|
||||
|
||||
// Explicit delete local reference as otherwise the local references will only be released once the native method returns.
|
||||
// Also there may be a lot of these and JNI specification only specify that 16 must be able to be created.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2623
|
||||
(*env)->DeleteLocalRef(env, addressEntry);
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length) {
|
||||
struct iovec * iov = (struct iovec *) memoryAddress;
|
||||
return writev0(env, clazz, fd, iov, length);
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz,
|
||||
jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
|
||||
jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
|
||||
jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
|
||||
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length);
|
||||
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint length);
|
||||
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
|
||||
|
||||
|
@ -20,17 +20,13 @@ import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.util.Recycler;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain an array of {@link AddressEntry}
|
||||
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain a {@link IovArray}
|
||||
* and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce
|
||||
* GC pressure a lot.
|
||||
*/
|
||||
final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
|
||||
private AddressEntry[] addresses;
|
||||
private int addressCount;
|
||||
private long addressSize;
|
||||
private static final Recycler<EpollChannelOutboundBuffer> RECYCLER = new Recycler<EpollChannelOutboundBuffer>() {
|
||||
@Override
|
||||
protected EpollChannelOutboundBuffer newObject(Handle<EpollChannelOutboundBuffer> handle) {
|
||||
@ -49,7 +45,6 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
|
||||
|
||||
private EpollChannelOutboundBuffer(Recycler.Handle<? extends ChannelOutboundBuffer> handle) {
|
||||
super(handle);
|
||||
addresses = new AddressEntry[INITIAL_CAPACITY];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -68,25 +63,19 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array of {@link AddressEntry}'s if the currently pending messages are made of {@link ByteBuf} only.
|
||||
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #addressCount()} and
|
||||
* {@link #addressSize()} ()} will return the number of {@link AddressEntry}'s in the returned array and the total
|
||||
* number of readable bytes of the NIO buffers respectively.
|
||||
* Returns a {@link IovArray} if the currently pending messages.
|
||||
* <p>
|
||||
* Note that the returned array is reused and thus should not escape
|
||||
* Note that the returned {@link IovArray} is reused and thus should not escape
|
||||
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
|
||||
* Refer to {@link EpollSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
|
||||
* </p>
|
||||
*/
|
||||
AddressEntry[] memoryAddresses() {
|
||||
long addressSize = 0;
|
||||
int addressCount = 0;
|
||||
IovArray iovArray() {
|
||||
IovArray array = IovArray.get();
|
||||
final Entry[] buffer = entries();
|
||||
final int mask = buffer.length - 1;
|
||||
AddressEntry[] addresses = this.addresses;
|
||||
Object m;
|
||||
final int mask = entryMask();
|
||||
int unflushed = unflushed();
|
||||
int flushed = flushed();
|
||||
Object m;
|
||||
|
||||
while (flushed != unflushed && (m = buffer[flushed].msg()) != null) {
|
||||
if (!(m instanceof ByteBuf)) {
|
||||
// Just break out of the loop as we can still use gathering writes for the buffers that we
|
||||
@ -94,108 +83,20 @@ final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
|
||||
break;
|
||||
}
|
||||
|
||||
AddressEntry entry = (AddressEntry) buffer[flushed];
|
||||
Entry entry = buffer[flushed];
|
||||
|
||||
// Check if the entry was cancelled. if so we just skip it.
|
||||
if (!entry.isCancelled()) {
|
||||
ByteBuf buf = (ByteBuf) m;
|
||||
final int readerIndex = buf.readerIndex();
|
||||
final int readableBytes = buf.writerIndex() - readerIndex;
|
||||
|
||||
if (readableBytes > 0) {
|
||||
addressSize += readableBytes;
|
||||
// See if there is enough space to at least store one more entry.
|
||||
int neededSpace = addressCount + 1;
|
||||
if (neededSpace > addresses.length) {
|
||||
this.addresses = addresses =
|
||||
expandAddressesArray(addresses, neededSpace, addressCount);
|
||||
}
|
||||
entry.memoryAddress = buf.memoryAddress();
|
||||
entry.readerIndex = buf.readerIndex();
|
||||
entry.writerIndex = buf.writerIndex();
|
||||
|
||||
addresses[addressCount ++] = entry;
|
||||
if (!array.add(buf)) {
|
||||
// Can not hold more data so break here.
|
||||
// We will handle this on the next write loop.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
flushed = flushed + 1 & mask;
|
||||
}
|
||||
this.addressCount = addressCount;
|
||||
this.addressSize = addressSize;
|
||||
|
||||
return addresses;
|
||||
}
|
||||
|
||||
private static AddressEntry[] expandAddressesArray(AddressEntry[] array, int neededSpace, int size) {
|
||||
int newCapacity = array.length;
|
||||
do {
|
||||
// double capacity until it is big enough
|
||||
// See https://github.com/netty/netty/issues/1890
|
||||
newCapacity <<= 1;
|
||||
|
||||
if (newCapacity < 0) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
} while (neededSpace > newCapacity);
|
||||
|
||||
AddressEntry[] newArray = new AddressEntry[newCapacity];
|
||||
System.arraycopy(array, 0, newArray, 0, size);
|
||||
|
||||
return newArray;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of {@link AddressEntry}'s which can be written.
|
||||
*/
|
||||
int addressCount() {
|
||||
return addressCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of bytes that can be written via gathering writes.
|
||||
*/
|
||||
long addressSize() {
|
||||
return addressSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recycle() {
|
||||
if (addresses.length > INITIAL_CAPACITY) {
|
||||
addresses = new AddressEntry[INITIAL_CAPACITY];
|
||||
} else {
|
||||
// null out the nio buffers array so the can be GC'ed
|
||||
// https://github.com/netty/netty/issues/1763
|
||||
Arrays.fill(addresses, null);
|
||||
}
|
||||
super.recycle();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AddressEntry newEntry() {
|
||||
return new AddressEntry();
|
||||
}
|
||||
|
||||
static final class AddressEntry extends Entry {
|
||||
// These fields will be accessed via JNI directly so be carefully when touch them!
|
||||
long memoryAddress;
|
||||
int readerIndex;
|
||||
int writerIndex;
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
memoryAddress = -1;
|
||||
readerIndex = 0;
|
||||
writerIndex = 0;
|
||||
super.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int cancel() {
|
||||
memoryAddress = -1;
|
||||
readerIndex = 0;
|
||||
writerIndex = 0;
|
||||
return super.cancel();
|
||||
}
|
||||
return array;
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,6 @@ import io.netty.channel.ConnectTimeoutException;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
@ -114,7 +113,28 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
}
|
||||
boolean done = false;
|
||||
long writtenBytes = 0;
|
||||
if (buf.nioBufferCount() == 1) {
|
||||
if (buf.hasMemoryAddress()) {
|
||||
long memoryAddress = buf.memoryAddress();
|
||||
int readerIndex = buf.readerIndex();
|
||||
for (;;) {
|
||||
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, readableBytes);
|
||||
if (localFlushedAmount > 0) {
|
||||
writtenBytes += localFlushedAmount;
|
||||
if (writtenBytes == readableBytes) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
readerIndex += localFlushedAmount;
|
||||
readableBytes -= localFlushedAmount;
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
}
|
||||
updateOutboundBuffer(in, writtenBytes, 1, done);
|
||||
return done;
|
||||
} else if (buf.nioBufferCount() == 1) {
|
||||
int readerIndex = buf.readerIndex();
|
||||
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
|
||||
for (;;) {
|
||||
@ -143,49 +163,44 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(
|
||||
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses,
|
||||
int addressCnt, long expectedWrittenBytes) throws IOException {
|
||||
EpollChannelOutboundBuffer in, IovArray array) throws IOException {
|
||||
boolean done = false;
|
||||
long expectedWrittenBytes = array.size();
|
||||
int cnt = array.count();
|
||||
long writtenBytes = 0;
|
||||
int offset = 0;
|
||||
int end = offset + addressCnt;
|
||||
loop: while (addressCnt > 0) {
|
||||
for (;;) {
|
||||
int cnt = addressCnt > Native.IOV_MAX? Native.IOV_MAX : addressCnt;
|
||||
|
||||
long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break loop;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
|
||||
if (expectedWrittenBytes == 0) {
|
||||
// Written everything, just break out here (fast-path)
|
||||
done = true;
|
||||
break loop;
|
||||
}
|
||||
|
||||
do {
|
||||
AddressEntry address = addresses[offset];
|
||||
int readerIndex = address.readerIndex;
|
||||
int bytes = address.writerIndex - readerIndex;
|
||||
if (bytes > localWrittenBytes) {
|
||||
address.readerIndex += (int) localWrittenBytes;
|
||||
// incomplete write
|
||||
break;
|
||||
} else {
|
||||
offset++;
|
||||
addressCnt--;
|
||||
localWrittenBytes -= bytes;
|
||||
}
|
||||
} while (offset < end && localWrittenBytes > 0);
|
||||
int end = offset + cnt;
|
||||
int messages = cnt;
|
||||
for (;;) {
|
||||
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
|
||||
if (expectedWrittenBytes == 0) {
|
||||
// Written everything, just break out here (fast-path)
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
|
||||
do {
|
||||
long bytes = array.processWritten(offset, localWrittenBytes);
|
||||
if (bytes == -1) {
|
||||
// incomplete write
|
||||
break;
|
||||
} else {
|
||||
offset++;
|
||||
cnt--;
|
||||
localWrittenBytes -= bytes;
|
||||
}
|
||||
} while (offset < end && localWrittenBytes > 0);
|
||||
}
|
||||
|
||||
updateOutboundBuffer(in, writtenBytes, msgCount, done);
|
||||
updateOutboundBuffer(in, writtenBytes, messages, done);
|
||||
return done;
|
||||
}
|
||||
|
||||
@ -315,15 +330,14 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
// Do gathering write if:
|
||||
// * the outbound buffer contains more than one messages and
|
||||
// * they are all buffers rather than a file region.
|
||||
if (msgCount > 1) {
|
||||
if (msgCount >= 1) {
|
||||
if (PlatformDependent.hasUnsafe()) {
|
||||
// this means we can cast to EpollChannelOutboundBuffer and write the AdressEntry directly.
|
||||
// this means we can cast to EpollChannelOutboundBuffer and write the IovArray directly.
|
||||
EpollChannelOutboundBuffer epollIn = (EpollChannelOutboundBuffer) in;
|
||||
// Ensure the pending writes are made of memoryaddresses only.
|
||||
AddressEntry[] addresses = epollIn.memoryAddresses();
|
||||
int addressesCnt = epollIn.addressCount();
|
||||
if (addressesCnt > 1) {
|
||||
if (!writeBytesMultiple(epollIn, msgCount, addresses, addressesCnt, epollIn.addressSize())) {
|
||||
IovArray array = epollIn.iovArray();
|
||||
int cnt = array.count();
|
||||
if (cnt > 1) {
|
||||
if (!writeBytesMultiple(epollIn, array)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
break;
|
||||
|
@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.util.concurrent.FastThreadLocal;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
/**
|
||||
* Represent an array of struct array and so can be passed directly over via JNI without the need to do any more
|
||||
* array copies.
|
||||
*
|
||||
* The buffers are written out directly into direct memory to match the struct iov. See also <code>man writev</code>.
|
||||
*
|
||||
* <pre>
|
||||
* struct iovec {
|
||||
* void *iov_base;
|
||||
* size_t iov_len;
|
||||
* };
|
||||
* </pre>
|
||||
*
|
||||
* See also
|
||||
* <a href="http://rkennke.wordpress.com/2007/07/30/efficient-jni-programming-iv-wrapping-native-data-objects/">
|
||||
* Efficient JNI programming IV: Wrapping native data objects</a>.
|
||||
*/
|
||||
final class IovArray {
|
||||
// Maximal number of struct iov entries that can be passed to writev(...)
|
||||
private static final int IOV_MAX = Native.IOV_MAX;
|
||||
// The size of an address which should be 8 for 64 bits and 4 for 32 bits.
|
||||
private static final int ADDRESS_SIZE = PlatformDependent.addressSize();
|
||||
// The size of an struct iov entry in bytes. This is calculated as we have 2 entries each of the size of the
|
||||
// address.
|
||||
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
|
||||
// The needed memory to hold up to IOV_MAX iov entries.
|
||||
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
|
||||
|
||||
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
|
||||
@Override
|
||||
protected IovArray initialValue() throws Exception {
|
||||
return new IovArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onRemoval(IovArray value) throws Exception {
|
||||
// free the direct memory now
|
||||
PlatformDependent.freeMemory(value.memoryAddress);
|
||||
}
|
||||
};
|
||||
|
||||
private final long memoryAddress;
|
||||
private int count;
|
||||
private long size;
|
||||
|
||||
private IovArray() {
|
||||
memoryAddress = PlatformDependent.allocateMemory(CAPACITY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to add the given {@link ByteBuf}. Returns {@code true} on success,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
boolean add(ByteBuf buf) {
|
||||
if (count == IOV_MAX) {
|
||||
// No more room!
|
||||
return false;
|
||||
}
|
||||
int len = buf.readableBytes();
|
||||
long addr = buf.memoryAddress();
|
||||
int offset = buf.readerIndex();
|
||||
|
||||
long baseOffset = memoryAddress(count++);
|
||||
long lengthOffset = baseOffset + ADDRESS_SIZE;
|
||||
if (ADDRESS_SIZE == 8) {
|
||||
// 64bit
|
||||
PlatformDependent.putLong(baseOffset, addr + offset);
|
||||
PlatformDependent.putLong(lengthOffset, len);
|
||||
} else {
|
||||
assert ADDRESS_SIZE == 4;
|
||||
PlatformDependent.putInt(baseOffset, (int) addr + offset);
|
||||
PlatformDependent.putInt(lengthOffset, len);
|
||||
}
|
||||
size += len;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the written iov entries. This will return the length of the iov entry on the given index if it is
|
||||
* smaller then the given {@code written} value. Otherwise it returns {@code -1}.
|
||||
*/
|
||||
long processWritten(int index, long written) {
|
||||
long baseOffset = memoryAddress(index);
|
||||
long lengthOffset = baseOffset + ADDRESS_SIZE;
|
||||
if (ADDRESS_SIZE == 8) {
|
||||
// 64bit
|
||||
long len = PlatformDependent.getLong(lengthOffset);
|
||||
if (len > written) {
|
||||
long offset = PlatformDependent.getLong(baseOffset);
|
||||
PlatformDependent.putLong(baseOffset, offset + written);
|
||||
PlatformDependent.putLong(lengthOffset, len - written);
|
||||
return -1;
|
||||
}
|
||||
return len;
|
||||
} else {
|
||||
assert ADDRESS_SIZE == 4;
|
||||
long len = PlatformDependent.getInt(lengthOffset);
|
||||
if (len > written) {
|
||||
int offset = PlatformDependent.getInt(baseOffset);
|
||||
PlatformDependent.putInt(baseOffset, (int) (offset + written));
|
||||
PlatformDependent.putInt(lengthOffset, (int) (len - written));
|
||||
return -1;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number if iov entries.
|
||||
*/
|
||||
int count() {
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size in bytes
|
||||
*/
|
||||
long size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@code memoryAddress} for the given {@code offset}.
|
||||
*/
|
||||
long memoryAddress(int offset) {
|
||||
return memoryAddress + IOV_SIZE * offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link IovArray} which can be filled.
|
||||
*/
|
||||
static IovArray get() {
|
||||
IovArray array = ARRAY.get();
|
||||
array.size = 0;
|
||||
array.count = 0;
|
||||
return array;
|
||||
}
|
||||
}
|
@ -18,7 +18,6 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry;
|
||||
import io.netty.util.internal.NativeLibraryLoader;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
@ -70,7 +69,7 @@ final class Native {
|
||||
public static native int writeAddress(int fd, long address, int pos, int limit) throws IOException;
|
||||
|
||||
public static native long writev(int fd, ByteBuffer[] buffers, int offset, int length) throws IOException;
|
||||
public static native long writevAddresses(int fd, AddressEntry[] addresses, int offset, int length)
|
||||
public static native long writevAddresses(int fd, long memoryAddress, int length)
|
||||
throws IOException;
|
||||
|
||||
public static native int read(int fd, ByteBuffer buf, int pos, int limit) throws IOException;
|
||||
|
@ -531,6 +531,10 @@ public class ChannelOutboundBuffer {
|
||||
return unflushed;
|
||||
}
|
||||
|
||||
protected final int entryMask() {
|
||||
return buffer.length - 1;
|
||||
}
|
||||
|
||||
protected ByteBuf copyToDirectByteBuf(ByteBuf buf) {
|
||||
int readableBytes = buf.readableBytes();
|
||||
ByteBufAllocator alloc = channel.alloc();
|
||||
|
@ -89,7 +89,7 @@ public final class NioSocketChannelOutboundBuffer extends ChannelOutboundBuffer
|
||||
long nioBufferSize = 0;
|
||||
int nioBufferCount = 0;
|
||||
final Entry[] buffer = entries();
|
||||
final int mask = buffer.length - 1;
|
||||
final int mask = entryMask();
|
||||
ByteBuffer[] nioBuffers = this.nioBuffers;
|
||||
Object m;
|
||||
int unflushed = unflushed();
|
||||
|
Loading…
Reference in New Issue
Block a user