Allow to use native transports when sun.misc.Unsafe is not present on… (#8231)

* Allow to use native transports when sun.misc.Unsafe is not present on the system

Motivation:

We should be able to use the native transports (epoll / kqueue) even when sun.misc.Unsafe is not present on the system. This is especially important as Java11 will be released soon and does not allow access to it by default.

Modifications:

- Correctly disable usage of sun.misc.Unsafe when -PnoUnsafe is used while running the build
- Correctly increment metric when UnpooledDirectByteBuf is allocated. This was uncovered once -PnoUnsafe usage was fixed.
- Implement fallbacks in all our native transport code for when sun.misc.Unsafe is not present.

Result:

Fixes https://github.com/netty/netty/issues/8229.
This commit is contained in:
Norman Maurer 2018-08-29 19:36:33 +02:00 committed by GitHub
parent 5aaa16b24c
commit 54f565ac67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 481 additions and 127 deletions

View File

@ -64,7 +64,7 @@ public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
}
this.alloc = alloc;
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
setByteBuffer(allocateDirect(initialCapacity));
}
/**

View File

@ -401,7 +401,7 @@ public class FixedCompositeByteBufTest {
buf.release();
}
@Test(expected = UnsupportedOperationException.class)
@Test
public void testHasNoMemoryAddressWhenMultipleBuffers() {
ByteBuf buf1 = directBuffer(10);
if (!buf1.hasMemoryAddress()) {
@ -415,6 +415,8 @@ public class FixedCompositeByteBufTest {
try {
buf.memoryAddress();
fail();
} catch (UnsupportedOperationException expected) {
// expected
} finally {
buf.release();
}

View File

@ -136,7 +136,7 @@
<profile>
<id>noUnsafe</id>
<properties>
<argLine.noUnsafe>-Dio.netty.noUnsafe</argLine.noUnsafe>
<argLine.noUnsafe>-Dio.netty.noUnsafe=true</argLine.noUnsafe>
</properties>
</profile>
<profile>

View File

@ -38,6 +38,7 @@
#include <time.h>
#include "netty_epoll_linuxsocket.h"
#include "netty_unix_buffer.h"
#include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h"
#include "netty_unix_jni.h"
@ -452,6 +453,9 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix
if (netty_unix_socket_JNI_OnLoad(env, packagePrefix) == JNI_ERR) {
return JNI_ERR;
}
if (netty_unix_buffer_JNI_OnLoad(env, packagePrefix) == JNI_ERR) {
return JNI_ERR;
}
if (netty_epoll_linuxsocket_JNI_OnLoad(env, packagePrefix) == JNI_ERR) {
return JNI_ERR;
}
@ -501,6 +505,7 @@ static void netty_epoll_native_JNI_OnUnLoad(JNIEnv* env) {
netty_unix_errors_JNI_OnUnLoad(env);
netty_unix_filedescriptor_JNI_OnUnLoad(env);
netty_unix_socket_JNI_OnUnLoad(env);
netty_unix_buffer_JNI_OnUnLoad(env);
netty_epoll_linuxsocket_JNI_OnUnLoad(env);
}

View File

@ -513,22 +513,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
if (PlatformDependent.hasUnsafe()) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);
if (array.count() >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, array);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
}
if (array.count() >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, array);
}
// cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);

View File

@ -16,7 +16,6 @@
package io.netty.channel.epoll;
import io.netty.channel.unix.FileDescriptor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
/**
@ -58,15 +57,7 @@ public final class Epoll {
}
}
if (cause != null) {
UNAVAILABILITY_CAUSE = cause;
} else {
UNAVAILABILITY_CAUSE = PlatformDependent.hasUnsafe()
? null
: new IllegalStateException(
"sun.misc.Unsafe not available",
PlatformDependent.getUnsafeUnavailabilityCause());
}
UNAVAILABILITY_CAUSE = cause;
}
/**

View File

@ -15,8 +15,11 @@
*/
package io.netty.channel.epoll;
import io.netty.channel.unix.Buffer;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
/**
* This is an internal datastructure which can be directly passed to epoll_wait to reduce the overhead.
*
@ -41,6 +44,7 @@ final class EpollEventArray {
// The offsiet of the data union in the epoll_event struct
private static final int EPOLL_DATA_OFFSET = Native.offsetofEpollData();
private ByteBuffer memory;
private long memoryAddress;
private int length;
@ -49,11 +53,8 @@ final class EpollEventArray {
throw new IllegalArgumentException("length must be >= 1 but was " + length);
}
this.length = length;
memoryAddress = allocate(length);
}
private static long allocate(int length) {
return PlatformDependent.allocateMemory(length * EPOLL_EVENT_SIZE);
memory = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(length));
memoryAddress = Buffer.memoryAddress(memory);
}
/**
@ -77,28 +78,43 @@ final class EpollEventArray {
void increase() {
// double the size
length <<= 1;
free();
memoryAddress = allocate(length);
// There is no need to preserve what was in the memory before.
ByteBuffer buffer = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(length));
Buffer.free(memory);
memory = buffer;
memoryAddress = Buffer.memoryAddress(buffer);
}
/**
* Free this {@link EpollEventArray}. Any usage after calling this method may segfault the JVM!
*/
void free() {
PlatformDependent.freeMemory(memoryAddress);
Buffer.free(memory);
memoryAddress = 0;
}
/**
* Return the events for the {@code epoll_event} on this index.
*/
int events(int index) {
return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE);
return getInt(index, 0);
}
/**
* Return the file descriptor for the {@code epoll_event} on this index.
*/
int fd(int index) {
return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + EPOLL_DATA_OFFSET);
return getInt(index, EPOLL_DATA_OFFSET);
}
private int getInt(int index, int offset) {
if (PlatformDependent.hasUnsafe()) {
return PlatformDependent.getInt(memoryAddress + index * EPOLL_EVENT_SIZE + offset);
}
return memory.getInt(index * EPOLL_EVENT_SIZE + offset);
}
private static int calculateBufferCapacity(int capacity) {
return capacity * EPOLL_EVENT_SIZE;
}
}

View File

@ -19,10 +19,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
import io.netty.util.internal.ObjectUtil;
class EpollRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandle {
private final PreferredDirectByteBufAllocator preferredDirectByteBufAllocator =
new PreferredDirectByteBufAllocator();
private final RecvByteBufAllocator.ExtendedHandle delegate;
private final UncheckedBooleanSupplier defaultMaybeMoreDataSupplier = new UncheckedBooleanSupplier() {
@Override
@ -34,7 +37,7 @@ class EpollRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandl
private boolean receivedRdHup;
EpollRecvByteAllocatorHandle(RecvByteBufAllocator.ExtendedHandle handle) {
this.delegate = ObjectUtil.checkNotNull(handle, "handle");
delegate = ObjectUtil.checkNotNull(handle, "handle");
}
final void receivedRdHup() {
@ -69,7 +72,9 @@ class EpollRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandl
@Override
public final ByteBuf allocate(ByteBufAllocator alloc) {
return delegate.allocate(alloc);
// We need to ensure we always allocate a direct ByteBuf as we can only use a direct buffer to read via JNI.
preferredDirectByteBufAllocator.updateAllocator(alloc);
return delegate.allocate(preferredDirectByteBufAllocator);
}
@Override

View File

@ -28,6 +28,7 @@
#include "netty_kqueue_bsdsocket.h"
#include "netty_kqueue_eventarray.h"
#include "netty_unix_buffer.h"
#include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h"
#include "netty_unix_jni.h"
@ -293,6 +294,9 @@ static jint netty_kqueue_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefi
if (netty_unix_socket_JNI_OnLoad(env, packagePrefix) == JNI_ERR) {
return JNI_ERR;
}
if (netty_unix_buffer_JNI_OnLoad(env, packagePrefix) == JNI_ERR) {
return JNI_ERR;
}
if (netty_kqueue_bsdsocket_JNI_OnLoad(env, packagePrefix) == JNI_ERR) {
return JNI_ERR;
}
@ -314,6 +318,7 @@ static void netty_kqueue_native_JNI_OnUnLoad(JNIEnv* env) {
netty_unix_errors_JNI_OnUnLoad(env);
netty_unix_filedescriptor_JNI_OnUnLoad(env);
netty_unix_socket_JNI_OnUnLoad(env);
netty_unix_buffer_JNI_OnUnLoad(env);
netty_kqueue_bsdsocket_JNI_OnUnLoad(env);
netty_kqueue_eventarray_JNI_OnUnLoad(env);
}

View File

@ -33,7 +33,6 @@ import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.SocketWritableByteChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
@ -345,22 +344,13 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
if (PlatformDependent.hasUnsafe()) {
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);
if (array.count() >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, array);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
}
if (array.count() >= 1) {
// TODO: Handle the case where cnt == 1 specially.
return writeBytesMultiple(in, array);
}
// cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);

View File

@ -16,7 +16,6 @@
package io.netty.channel.kqueue;
import io.netty.channel.unix.FileDescriptor;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.UnstableApi;
@ -48,15 +47,7 @@ public final class KQueue {
}
}
if (cause != null) {
UNAVAILABILITY_CAUSE = cause;
} else {
UNAVAILABILITY_CAUSE = PlatformDependent.hasUnsafe()
? null
: new IllegalStateException(
"sun.misc.Unsafe not available",
PlatformDependent.getUnsafeUnavailabilityCause());
}
UNAVAILABILITY_CAUSE = cause;
}
/**

View File

@ -15,8 +15,11 @@
*/
package io.netty.channel.kqueue;
import io.netty.channel.unix.Buffer;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
/**
* Represents an array of kevent structures, backed by offheap memory.
*
@ -37,6 +40,7 @@ final class KQueueEventArray {
private static final int KQUEUE_FLAGS_OFFSET = Native.offsetofKEventFlags();
private static final int KQUEUE_DATA_OFFSET = Native.offsetofKeventData();
private ByteBuffer memory;
private long memoryAddress;
private int size;
private int capacity;
@ -45,7 +49,8 @@ final class KQueueEventArray {
if (capacity < 1) {
throw new IllegalArgumentException("capacity must be >= 1 but was " + capacity);
}
memoryAddress = PlatformDependent.allocateMemory(capacity * KQUEUE_EVENT_SIZE);
memory = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(capacity));
memoryAddress = Buffer.memoryAddress(memory);
this.capacity = capacity;
}
@ -73,11 +78,11 @@ final class KQueueEventArray {
}
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
checkSize();
evSet(getKEventOffset(size++), ch, ch.socket.intValue(), filter, flags, fflags);
reallocIfNeeded();
evSet(getKEventOffset(size++) + memoryAddress, ch, ch.socket.intValue(), filter, flags, fflags);
}
private void checkSize() {
private void reallocIfNeeded() {
if (size == capacity) {
realloc(true);
}
@ -89,15 +94,25 @@ final class KQueueEventArray {
void realloc(boolean throwIfFail) {
// Double the capacity while it is "sufficiently small", and otherwise increase by 50%.
int newLength = capacity <= 65536 ? capacity << 1 : capacity + capacity >> 1;
long newMemoryAddress = PlatformDependent.reallocateMemory(memoryAddress, newLength * KQUEUE_EVENT_SIZE);
if (newMemoryAddress != 0) {
memoryAddress = newMemoryAddress;
capacity = newLength;
return;
}
if (throwIfFail) {
throw new OutOfMemoryError("unable to allocate " + newLength + " new bytes! Existing capacity is: "
+ capacity);
try {
ByteBuffer buffer = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(newLength));
// Copy over the old content of the memory and reset the position as we always act on the buffer as if
// the position was never increased.
memory.position(0).limit(size);
buffer.put(memory);
buffer.position(0);
Buffer.free(memory);
memory = buffer;
memoryAddress = Buffer.memoryAddress(buffer);
} catch (OutOfMemoryError e) {
if (throwIfFail) {
OutOfMemoryError error = new OutOfMemoryError(
"unable to allocate " + newLength + " new bytes! Existing capacity is: " + capacity);
error.initCause(e);
throw error;
}
}
}
@ -105,36 +120,57 @@ final class KQueueEventArray {
* Free this {@link KQueueEventArray}. Any usage after calling this method may segfault the JVM!
*/
void free() {
PlatformDependent.freeMemory(memoryAddress);
Buffer.free(memory);
memoryAddress = size = capacity = 0;
}
long getKEventOffset(int index) {
return memoryAddress + index * KQUEUE_EVENT_SIZE;
private static int getKEventOffset(int index) {
return index * KQUEUE_EVENT_SIZE;
}
private long getKEventOffsetAddress(int index) {
return getKEventOffset(index) + memoryAddress;
}
private short getShort(int index, int offset) {
if (PlatformDependent.hasUnsafe()) {
return PlatformDependent.getShort(getKEventOffsetAddress(index) + offset);
}
return memory.getShort(getKEventOffset(index) + offset);
}
short flags(int index) {
return PlatformDependent.getShort(getKEventOffset(index) + KQUEUE_FLAGS_OFFSET);
return getShort(index, KQUEUE_FLAGS_OFFSET);
}
short filter(int index) {
return PlatformDependent.getShort(getKEventOffset(index) + KQUEUE_FILTER_OFFSET);
return getShort(index, KQUEUE_FILTER_OFFSET);
}
short fflags(int index) {
return PlatformDependent.getShort(getKEventOffset(index) + KQUEUE_FFLAGS_OFFSET);
return getShort(index, KQUEUE_FFLAGS_OFFSET);
}
int fd(int index) {
return PlatformDependent.getInt(getKEventOffset(index) + KQUEUE_IDENT_OFFSET);
if (PlatformDependent.hasUnsafe()) {
return PlatformDependent.getInt(getKEventOffsetAddress(index) + KQUEUE_IDENT_OFFSET);
}
return memory.getInt(getKEventOffset(index) + KQUEUE_IDENT_OFFSET);
}
long data(int index) {
return PlatformDependent.getLong(getKEventOffset(index) + KQUEUE_DATA_OFFSET);
if (PlatformDependent.hasUnsafe()) {
return PlatformDependent.getLong(getKEventOffsetAddress(index) + KQUEUE_DATA_OFFSET);
}
return memory.getLong(getKEventOffset(index) + KQUEUE_DATA_OFFSET);
}
AbstractKQueueChannel channel(int index) {
return getChannel(getKEventOffset(index));
return getChannel(getKEventOffsetAddress(index));
}
private static int calculateBufferCapacity(int capacity) {
return capacity * KQUEUE_EVENT_SIZE;
}
private static native void evSet(long keventAddress, AbstractKQueueChannel ch,

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
import io.netty.util.internal.ObjectUtil;
@ -26,7 +27,10 @@ import static java.lang.Math.max;
import static java.lang.Math.min;
final class KQueueRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandle {
private final PreferredDirectByteBufAllocator preferredDirectByteBufAllocator =
new PreferredDirectByteBufAllocator();
private final RecvByteBufAllocator.ExtendedHandle delegate;
private final UncheckedBooleanSupplier defaultMaybeMoreDataSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
@ -38,7 +42,7 @@ final class KQueueRecvByteAllocatorHandle implements RecvByteBufAllocator.Extend
private long numberBytesPending;
KQueueRecvByteAllocatorHandle(RecvByteBufAllocator.ExtendedHandle handle) {
this.delegate = ObjectUtil.checkNotNull(handle, "handle");
delegate = ObjectUtil.checkNotNull(handle, "handle");
}
@Override
@ -59,7 +63,10 @@ final class KQueueRecvByteAllocatorHandle implements RecvByteBufAllocator.Extend
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return overrideGuess ? alloc.ioBuffer(guess0()) : delegate.allocate(alloc);
// We need to ensure we always allocate a direct ByteBuf as we can only use a direct buffer to read via JNI.
preferredDirectByteBufAllocator.updateAllocator(alloc);
return overrideGuess ? preferredDirectByteBufAllocator.ioBuffer(guess0()) :
delegate.allocate(preferredDirectByteBufAllocator);
}
@Override

View File

@ -15,11 +15,15 @@
*/
package io.netty.channel.kqueue;
import io.netty.channel.unix.Buffer;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
import static io.netty.channel.unix.Limits.SIZEOF_JLONG;
final class NativeLongArray {
private ByteBuffer memory;
private long memoryAddress;
private int capacity;
private int size;
@ -28,13 +32,27 @@ final class NativeLongArray {
if (capacity < 1) {
throw new IllegalArgumentException("capacity must be >= 1 but was " + capacity);
}
memoryAddress = PlatformDependent.allocateMemory(capacity * SIZEOF_JLONG);
memory = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(capacity));
memoryAddress = Buffer.memoryAddress(memory);
this.capacity = capacity;
}
private static int idx(int index) {
return index * SIZEOF_JLONG;
}
private static int calculateBufferCapacity(int capacity) {
return capacity * SIZEOF_JLONG;
}
void add(long value) {
checkSize();
PlatformDependent.putLong(memoryOffset(size++), value);
reallocIfNeeded();
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.putLong(memoryOffset(size), value);
} else {
memory.putLong(idx(size), value);
}
++size;
}
void clear() {
@ -46,7 +64,7 @@ final class NativeLongArray {
}
void free() {
PlatformDependent.freeMemory(memoryAddress);
Buffer.free(memory);
memoryAddress = 0;
}
@ -59,25 +77,25 @@ final class NativeLongArray {
}
private long memoryOffset(int index) {
return memoryAddress + index * SIZEOF_JLONG;
return memoryAddress + idx(index);
}
private void checkSize() {
private void reallocIfNeeded() {
if (size == capacity) {
realloc();
}
}
// Double the capacity while it is "sufficiently small", and otherwise increase by 50%.
int newLength = capacity <= 65536 ? capacity << 1 : capacity + capacity >> 1;
ByteBuffer buffer = Buffer.allocateDirectWithNativeOrder(calculateBufferCapacity(newLength));
// Copy over the old content of the memory and reset the position as we always act on the buffer as if
// the position was never increased.
memory.position(0).limit(size);
buffer.put(memory);
buffer.position(0);
private void realloc() {
// Double the capacity while it is "sufficiently small", and otherwise increase by 50%.
int newLength = capacity <= 65536 ? capacity << 1 : capacity + capacity >> 1;
long newMemoryAddress = PlatformDependent.reallocateMemory(memoryAddress, newLength * SIZEOF_JLONG);
if (newMemoryAddress == 0) {
throw new OutOfMemoryError("unable to allocate " + newLength + " new bytes! Existing capacity is: "
+ capacity);
Buffer.free(memory);
memory = buffer;
memoryAddress = Buffer.memoryAddress(buffer);
capacity = newLength;
}
memoryAddress = newMemoryAddress;
capacity = newLength;
}
@Override

View File

@ -0,0 +1,52 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#include "netty_unix_jni.h"
#include "netty_unix_util.h"
#include "netty_unix_buffer.h"
// JNI Registered Methods Begin
static jlong netty_unix_buffer_memoryAddress0(JNIEnv* env, jclass clazz, jobject buffer) {
return (jlong) (*env)->GetDirectBufferAddress(env, buffer);
}
static jint netty_unix_buffer_addressSize0(JNIEnv* env, jclass clazz) {
return (jint) sizeof(int*);
}
// JNI Registered Methods End
// JNI Method Registration Table Begin
static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "memoryAddress0", "(Ljava/nio/ByteBuffer;)J", (void *) netty_unix_buffer_memoryAddress0 },
{ "addressSize0", "()I", (void *) netty_unix_buffer_addressSize0 }
};
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
// JNI Method Registration Table End
jint netty_unix_buffer_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) {
// We must register the statically referenced methods first!
if (netty_unix_util_register_natives(env,
packagePrefix,
"io/netty/channel/unix/Buffer",
statically_referenced_fixed_method_table,
statically_referenced_fixed_method_table_size) != 0) {
return JNI_ERR;
}
return NETTY_JNI_VERSION;
}
void netty_unix_buffer_JNI_OnUnLoad(JNIEnv* env) { }

View File

@ -0,0 +1,25 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#ifndef NETTY_UNIX_BUFFER_H_
#define NETTY_UNIX_BUFFER_H_
#include <jni.h>
// JNI initialization hooks. Users of this file are responsible for calling these in the JNI_OnLoad and JNI_OnUnload methods.
jint netty_unix_buffer_JNI_OnLoad(JNIEnv* env, const char* packagePrefix);
void netty_unix_buffer_JNI_OnUnLoad(JNIEnv* env);
#endif /* NETTY_UNIX_BUFFER_H_ */

View File

@ -0,0 +1,68 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.unix;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.UnstableApi;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@UnstableApi
public final class Buffer {
private Buffer() { }
/**
* Free the direct {@link ByteBuffer}.
*/
public static void free(ByteBuffer buffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
/**
* Returns a new {@link ByteBuffer} which has the same {@link ByteOrder} as the native order of the machine.
*/
public static ByteBuffer allocateDirectWithNativeOrder(int capacity) {
return ByteBuffer.allocateDirect(capacity).order(
PlatformDependent.BIG_ENDIAN_NATIVE_ORDER ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
}
/**
* Returns the memory address of the given direct {@link ByteBuffer}.
*/
public static long memoryAddress(ByteBuffer buffer) {
assert buffer.isDirect();
if (PlatformDependent.hasUnsafe()) {
return PlatformDependent.directBufferAddress(buffer);
}
return memoryAddress0(buffer);
}
/**
* Returns the size of a pointer.
*/
public static int addressSize() {
if (PlatformDependent.hasUnsafe()) {
return PlatformDependent.addressSize();
}
return addressSize0();
}
// If Unsafe can not be used we will need to do JNI calls.
private static native int addressSize0();
private static native long memoryAddress0(ByteBuffer buffer);
}

View File

@ -25,11 +25,6 @@ import java.nio.ByteBuffer;
import static io.netty.channel.unix.Limits.IOV_MAX;
import static io.netty.channel.unix.Limits.SSIZE_MAX;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static io.netty.util.internal.PlatformDependent.allocateMemory;
import static io.netty.util.internal.PlatformDependent.directBufferAddress;
import static io.netty.util.internal.PlatformDependent.freeMemory;
import static io.netty.util.internal.PlatformDependent.putInt;
import static io.netty.util.internal.PlatformDependent.putLong;
import static java.lang.Math.min;
/**
@ -52,7 +47,7 @@ import static java.lang.Math.min;
public final class IovArray implements MessageProcessor {
/** The size of an address which should be 8 for 64 bits and 4 for 32 bits. */
private static final int ADDRESS_SIZE = PlatformDependent.addressSize();
private static final int ADDRESS_SIZE = Buffer.addressSize();
/**
* The size of an {@code iovec} struct in bytes. This is calculated as we have 2 entries each of the size of the
@ -66,13 +61,15 @@ public final class IovArray implements MessageProcessor {
*/
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
private final ByteBuffer memory;
private final long memoryAddress;
private int count;
private long size;
private long maxBytes = SSIZE_MAX;
public IovArray() {
memoryAddress = allocateMemory(CAPACITY);
memory = Buffer.allocateDirectWithNativeOrder(CAPACITY);
memoryAddress = Buffer.memoryAddress(memory);
}
public void clear() {
@ -91,14 +88,23 @@ public final class IovArray implements MessageProcessor {
if (count == IOV_MAX) {
// No more room!
return false;
} else if (buf.hasMemoryAddress() && buf.nioBufferCount() == 1) {
} else if (buf.nioBufferCount() == 1) {
final int len = buf.readableBytes();
return len == 0 || add(buf.memoryAddress(), buf.readerIndex(), len);
if (len == 0) {
return true;
}
if (buf.hasMemoryAddress()) {
return add(buf.memoryAddress(), buf.readerIndex(), len);
} else {
ByteBuffer nioBuffer = buf.internalNioBuffer(buf.readerIndex(), len);
return add(Buffer.memoryAddress(nioBuffer), nioBuffer.position(), len);
}
} else {
ByteBuffer[] buffers = buf.nioBuffers();
for (ByteBuffer nioBuffer : buffers) {
final int len = nioBuffer.remaining();
if (len != 0 && (!add(directBufferAddress(nioBuffer), nioBuffer.position(), len) || count == IOV_MAX)) {
if (len != 0 &&
(!add(Buffer.memoryAddress(nioBuffer), nioBuffer.position(), len) || count == IOV_MAX)) {
return false;
}
}
@ -107,8 +113,7 @@ public final class IovArray implements MessageProcessor {
}
private boolean add(long addr, int offset, int len) {
final long baseOffset = memoryAddress(count);
final long lengthOffset = baseOffset + ADDRESS_SIZE;
assert addr != 0;
// If there is at least 1 entry then we enforce the maximum bytes. We want to accept at least one entry so we
// will attempt to write some data and make progress.
@ -121,17 +126,30 @@ public final class IovArray implements MessageProcessor {
// - http://linux.die.net/man/2/writev
return false;
}
final int baseOffset = idx(count);
final int lengthOffset = baseOffset + ADDRESS_SIZE;
size += len;
++count;
if (ADDRESS_SIZE == 8) {
// 64bit
putLong(baseOffset, addr + offset);
putLong(lengthOffset, len);
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.putLong(baseOffset + memoryAddress, addr + offset);
PlatformDependent.putLong(lengthOffset + memoryAddress, len);
} else {
memory.putLong(baseOffset, addr + offset);
memory.putLong(lengthOffset, len);
}
} else {
assert ADDRESS_SIZE == 4;
putInt(baseOffset, (int) addr + offset);
putInt(lengthOffset, len);
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.putInt(baseOffset + memoryAddress, (int) addr + offset);
PlatformDependent.putInt(lengthOffset + memoryAddress, len);
} else {
memory.putInt(baseOffset, (int) addr + offset);
memory.putInt(lengthOffset, len);
}
}
return true;
}
@ -176,18 +194,22 @@ public final class IovArray implements MessageProcessor {
* Returns the {@code memoryAddress} for the given {@code offset}.
*/
public long memoryAddress(int offset) {
return memoryAddress + IOV_SIZE * offset;
return memoryAddress + idx(offset);
}
/**
* Release the {@link IovArray}. Once release further using of it may crash the JVM!
*/
public void release() {
freeMemory(memoryAddress);
Buffer.free(memory);
}
@Override
public boolean processMessage(Object msg) throws Exception {
return (msg instanceof ByteBuf) && add((ByteBuf) msg);
return msg instanceof ByteBuf && add((ByteBuf) msg);
}
private static int idx(int index) {
return IOV_SIZE * index;
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.unix;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.internal.UnstableApi;
@UnstableApi
public final class PreferredDirectByteBufAllocator implements ByteBufAllocator {
private ByteBufAllocator allocator;
public void updateAllocator(ByteBufAllocator allocator) {
this.allocator = allocator;
}
@Override
public ByteBuf buffer() {
return allocator.directBuffer();
}
@Override
public ByteBuf buffer(int initialCapacity) {
return allocator.directBuffer(initialCapacity);
}
@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
return allocator.directBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf ioBuffer() {
return allocator.directBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
return allocator.directBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return allocator.directBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf heapBuffer() {
return allocator.heapBuffer();
}
@Override
public ByteBuf heapBuffer(int initialCapacity) {
return allocator.heapBuffer(initialCapacity);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
return allocator.heapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf directBuffer() {
return allocator.directBuffer();
}
@Override
public ByteBuf directBuffer(int initialCapacity) {
return allocator.directBuffer(initialCapacity);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return allocator.directBuffer(initialCapacity, maxCapacity);
}
@Override
public CompositeByteBuf compositeBuffer() {
return allocator.compositeDirectBuffer();
}
@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
return allocator.compositeDirectBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
return allocator.compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
return allocator.compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeDirectBuffer() {
return allocator.compositeDirectBuffer();
}
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
return allocator.compositeDirectBuffer(maxNumComponents);
}
@Override
public boolean isDirectBufferPooled() {
return allocator.isDirectBufferPooled();
}
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
return allocator.calculateNewCapacity(minNewCapacity, maxCapacity);
}
}