From 91b378005461b85f965aa296daa087bc009b9da0 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 11 Apr 2014 21:08:26 +0200 Subject: [PATCH] [#2377] Implement epoll based DatagramChannel Motivation: There is currently no epoll based DatagramChannel. We should add one to make the set of provided channels complete and also to be able to offer better performance compared to the NioDatagramChannel once SO_REUSEPORT is implemented. Modifications: Add implementation of DatagramChannel which uses epoll. This implementation does currently not support multicast yet which will me implemented later on. As most users will not use multicast anyway I think it is fair to just add the EpollDatagramChannel without the support for now. We shipped NioDatagramChannel without support earlier too ... Result: Be able to use EpollDatagramChannel for max. performance on linux --- .../main/c/io_netty_channel_epoll_Native.c | 159 +++++- .../main/c/io_netty_channel_epoll_Native.h | 10 +- .../channel/epoll/AbstractEpollChannel.java | 28 +- .../channel/epoll/EpollDatagramChannel.java | 454 ++++++++++++++++++ .../epoll/EpollDatagramChannelConfig.java | 281 +++++++++++ .../EpollDatagramChannelOutboundBuffer.java | 63 +++ .../epoll/EpollServerSocketChannel.java | 2 +- .../channel/epoll/EpollSocketChannel.java | 16 +- .../java/io/netty/channel/epoll/Native.java | 102 +++- .../epoll/EpollDatagramUnicastTest.java | 29 ++ .../epoll/EpollSocketTestPermutation.java | 34 ++ 11 files changed, 1129 insertions(+), 49 deletions(-) create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index d8b3184454..d9c1362706 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -49,11 +49,14 @@ jfieldID readerIndexFieldId = NULL; jfieldID writerIndexFieldId = NULL; jfieldID memoryAddressFieldId = NULL; jmethodID inetSocketAddrMethodId = NULL; +jmethodID datagramSocketAddrMethodId = NULL; jclass runtimeExceptionClass = NULL; jclass ioExceptionClass = NULL; jclass closedChannelExceptionClass = NULL; jmethodID closedChannelExceptionMethodId = NULL; jclass inetSocketAddressClass = NULL; +jclass datagramSocketAddressClass = NULL; + static int socketType; // util methods @@ -142,6 +145,23 @@ jobject createInetSocketAddress(JNIEnv * env, struct sockaddr_storage addr) { return socketAddr; } +jobject createDatagramSocketAddress(JNIEnv * env, struct sockaddr_storage addr, int len) { + char ipstr[INET6_ADDRSTRLEN]; + int port; + if (addr.ss_family == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + port = ntohs(s->sin_port); + inet_ntop(AF_INET, &s->sin_addr, ipstr, sizeof ipstr); + } else { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; + port = ntohs(s->sin6_port); + inet_ntop(AF_INET6, &s->sin6_addr, ipstr, sizeof ipstr); + } + jstring ipString = (*env)->NewStringUTF(env, ipstr); + jobject socketAddr = (*env)->NewObject(env, datagramSocketAddressClass, datagramSocketAddrMethodId, ipString, port, len); + return socketAddr; +} + void init_sockaddr(JNIEnv * env, jbyteArray address, jint scopeId, jint jport, struct sockaddr_storage * addr) { uint16_t port = htons((uint16_t) jport); jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0); @@ -176,6 +196,16 @@ static int socket_type() { return AF_INET6; } } + +void init_in_addr(JNIEnv * env, jbyteArray address, struct in_addr * addr) { + jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0); + if (socketType == AF_INET6) { + memcpy(addr, addressBytes, 16); + } else { + memcpy(addr, addressBytes + 12, 4); + } + (*env)->ReleaseByteArrayElements(env, address, addressBytes, JNI_ABORT); +} // util methods end jint JNI_OnLoad(JavaVM* vm, void* reserved) { @@ -236,6 +266,18 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } + jclass localDatagramSocketAddressClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollDatagramChannel$DatagramSocketAddress"); + if (localDatagramSocketAddressClass == NULL) { + // pending exception... + return JNI_ERR; + } + datagramSocketAddressClass = (jclass) (*env)->NewGlobalRef(env, localDatagramSocketAddressClass); + if (datagramSocketAddressClass == NULL) { + // out-of-memory! + throwOutOfMemoryError(env, "Error allocating memory"); + return JNI_ERR; + } + void *mem = malloc(1); if (mem == NULL) { throwOutOfMemoryError(env, "Error allocating native buffer"); @@ -328,6 +370,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { } socketType = socket_type(); + datagramSocketAddrMethodId = (*env)->GetMethodID(env, datagramSocketAddressClass, "", "(Ljava/lang/String;II)V"); + if (datagramSocketAddrMethodId == NULL) { + throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress"); + return JNI_ERR; + } + jclass addressEntryClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollChannelOutboundBuffer$AddressEntry"); if (addressEntryClass == NULL) { // pending exception... @@ -371,6 +419,9 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) { if (inetSocketAddressClass != NULL) { (*env)->DeleteGlobalRef(env, inetSocketAddressClass); } + if (datagramSocketAddressClass != NULL) { + (*env)->DeleteGlobalRef(env, datagramSocketAddressClass); + } } } @@ -501,7 +552,6 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * e } } - jint write0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) { ssize_t res; int err; @@ -545,6 +595,86 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * return write0(env, clazz, fd, (void *) address, pos, limit); } +jint sendTo0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit ,jbyteArray address, jint scopeId, jint port) { + struct sockaddr_storage addr; + init_sockaddr(env, address, scopeId, port, &addr); + + ssize_t res; + int err; + do { + res = sendto(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr *)&addr, sizeof(struct sockaddr_storage)); + // keep on writing if it was interrupted + } while(res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + // network stack saturated... try again later + if (err == EAGAIN || err == EWOULDBLOCK) { + return 0; + } + if (err == EBADF) { + throwClosedChannelException(env); + return -1; + } + throwIOException(env, exceptionMessage("Error while sendto(...): ", err)); + return -1; + } + return (jint) res; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) { + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return -1; + } + return sendTo0(env, fd, buffer, pos, limit, address, scopeId, port); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit ,jbyteArray address, jint scopeId, jint port) { + return sendTo0(env, fd, (void*) memoryAddress, pos, limit, address, scopeId, port); +} + +jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + ssize_t res; + int err; + + do { + res = recvfrom(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr *)&addr, &addrlen); + // Keep on reading if we was interrupted + } while (res == -1 && ((err = errno) == EINTR)); + + if (res < 0) { + if (err == EAGAIN || err == EWOULDBLOCK) { + // Nothing left to read + return NULL; + } + if (err == EBADF) { + throwClosedChannelException(env); + return NULL; + } + throwIOException(env, exceptionMessage("Error while recvFrom(...): ", err)); + return NULL; + } + + return createDatagramSocketAddress(env, addr, res); +} + +JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFrom(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) { + void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer); + if (buffer == NULL) { + throwRuntimeException(env, "Unable to access address of buffer"); + return NULL; + } + + return recvFrom0(env, fd, buffer, pos, limit); +} + +JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFromAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit) { + return recvFrom0(env, fd, (void*) address, pos, limit); +} + void incrementPosition(JNIEnv * env, jobject bufObj, int written) { // Get the current position using the (*env)->GetIntField if possible and fallback // to slower (*env)->CallIntMethod(...) if needed @@ -715,9 +845,9 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, } } -JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz) { +jint socket0(JNIEnv * env, jclass clazz, int type) { // TODO: Maybe also respect -Djava.net.preferIPv4Stack=true - int fd = socket(socketType, SOCK_STREAM | SOCK_NONBLOCK, 0); + int fd = socket(socketType, type | SOCK_NONBLOCK, 0); if (fd == -1) { int err = errno; throwIOException(env, exceptionMessage("Error creating socket: ", err)); @@ -734,6 +864,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, j return fd; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketDgram(JNIEnv * env, jclass clazz) { + return socket0(env, clazz, SOCK_DGRAM); +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketStream(JNIEnv * env, jclass clazz) { + return socket0(env, clazz, SOCK_STREAM); +} + JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) { struct sockaddr_storage addr; init_sockaddr(env, address, scopeId, port, &addr); @@ -937,6 +1075,10 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger)); } +JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv * env, jclass clazz, jint fd, jint optval) { + setOption(env, fd, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval)); +} + JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd) { int optval; if (getOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) { @@ -1005,6 +1147,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv return optval; } +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv *env, jclass clazz, jint fd) { + int optval; + if (getOption(env, fd, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval)) == -1) { + return -1; + } + return optval; +} + JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz) { struct utsname name; @@ -1015,5 +1165,4 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn int err = errno; throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err)); return NULL; - -} +} \ No newline at end of file diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h index a82107ee02..c9eb68a599 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h @@ -33,12 +33,18 @@ jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint f jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length); jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length); +jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); +jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port); jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); +jobject Java_io_netty_channel_epoll_Native_recvFrom(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit); +jobject Java_io_netty_channel_epoll_Native_recvFromAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit); void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd); void Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write); -jint Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_socketStream(JNIEnv * env, jclass clazz); +jint Java_io_netty_channel_epoll_Native_socketDgram(JNIEnv * env, jclass clazz); + void Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port); void Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog); jboolean Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port); @@ -56,6 +62,7 @@ void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval); void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval); +void Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv *env, jclass clazz, jint fd, jint optval); jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd); @@ -64,4 +71,5 @@ jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass cl jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd); jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd); +jint Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv *env, jclass clazz, jint fd); jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 08925f2440..6ffae33bea 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -17,11 +17,9 @@ package io.netty.channel.epoll; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; import io.netty.channel.EventLoop; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.UnresolvedAddressException; @@ -33,8 +31,8 @@ abstract class AbstractEpollChannel extends AbstractChannel { volatile int fd; int id; - AbstractEpollChannel(int flag) { - this(null, socketFd(), flag, false); + AbstractEpollChannel(int fd, int flag) { + this(null, fd, flag, false); } AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) { @@ -45,14 +43,6 @@ abstract class AbstractEpollChannel extends AbstractChannel { this.active = active; } - private static int socketFd() { - try { - return Native.socket(); - } catch (IOException e) { - throw new ChannelException(e); - } - } - @Override public boolean isActive() { return active; @@ -120,6 +110,20 @@ abstract class AbstractEpollChannel extends AbstractChannel { } } + protected final void setEpollOut() { + if ((flags & Native.EPOLLOUT) == 0) { + flags |= Native.EPOLLOUT; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + + protected final void clearEpollOut() { + if ((flags & Native.EPOLLOUT) != 0) { + flags &= ~Native.EPOLLOUT; + ((EpollEventLoop) eventLoop()).modify(this); + } + } + @Override protected void doRegister() throws Exception { EpollEventLoop loop = (EpollEventLoop) eventLoop(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java new file mode 100644 index 0000000000..c562528de6 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -0,0 +1,454 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramChannelConfig; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.internal.StringUtil; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.NotYetConnectedException; + +/** + * {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for + * maximal performance. + */ +public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel { + private static final ChannelMetadata METADATA = new ChannelMetadata(true); + + private volatile InetSocketAddress local; + private volatile InetSocketAddress remote; + private volatile boolean connected; + private final EpollDatagramChannelConfig config; + + public EpollDatagramChannel() { + super(Native.socketDgramFd(), Native.EPOLLIN); + config = new EpollDatagramChannelConfig(this); + } + + @Override + public ChannelMetadata metadata() { + return METADATA; + } + + @Override + public boolean isActive() { + return fd != -1 && + ((config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()) + || active); + } + + @Override + public boolean isConnected() { + return connected; + } + + @Override + public ChannelFuture joinGroup(InetAddress multicastAddress) { + return joinGroup(multicastAddress, newPromise()); + } + + @Override + public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) { + try { + return joinGroup( + multicastAddress, + NetworkInterface.getByInetAddress(localAddress().getAddress()), + null, promise); + } catch (SocketException e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public ChannelFuture joinGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + return joinGroup(multicastAddress, networkInterface, newPromise()); + } + + @Override + public ChannelFuture joinGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface, + ChannelPromise promise) { + return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise); + } + + @Override + public ChannelFuture joinGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + return joinGroup(multicastAddress, networkInterface, source, newPromise()); + } + + @Override + public ChannelFuture joinGroup( + final InetAddress multicastAddress, final NetworkInterface networkInterface, + final InetAddress source, final ChannelPromise promise) { + + if (multicastAddress == null) { + throw new NullPointerException("multicastAddress"); + } + + if (networkInterface == null) { + throw new NullPointerException("networkInterface"); + } + + promise.setFailure(new UnsupportedOperationException("Multicast not supported")); + return promise; + } + + @Override + public ChannelFuture leaveGroup(InetAddress multicastAddress) { + return leaveGroup(multicastAddress, newPromise()); + } + + @Override + public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) { + try { + return leaveGroup( + multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise); + } catch (SocketException e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public ChannelFuture leaveGroup( + InetSocketAddress multicastAddress, NetworkInterface networkInterface) { + return leaveGroup(multicastAddress, networkInterface, newPromise()); + } + + @Override + public ChannelFuture leaveGroup( + InetSocketAddress multicastAddress, + NetworkInterface networkInterface, ChannelPromise promise) { + return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise); + } + + @Override + public ChannelFuture leaveGroup( + InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) { + return leaveGroup(multicastAddress, networkInterface, source, newPromise()); + } + + @Override + public ChannelFuture leaveGroup( + final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source, + final ChannelPromise promise) { + if (multicastAddress == null) { + throw new NullPointerException("multicastAddress"); + } + if (networkInterface == null) { + throw new NullPointerException("networkInterface"); + } + + promise.setFailure(new UnsupportedOperationException("Multicast not supported")); + + return promise; + } + + @Override + public ChannelFuture block( + InetAddress multicastAddress, NetworkInterface networkInterface, + InetAddress sourceToBlock) { + return block(multicastAddress, networkInterface, sourceToBlock, newPromise()); + } + + @Override + public ChannelFuture block( + final InetAddress multicastAddress, final NetworkInterface networkInterface, + final InetAddress sourceToBlock, final ChannelPromise promise) { + if (multicastAddress == null) { + throw new NullPointerException("multicastAddress"); + } + if (sourceToBlock == null) { + throw new NullPointerException("sourceToBlock"); + } + + if (networkInterface == null) { + throw new NullPointerException("networkInterface"); + } + promise.setFailure(new UnsupportedOperationException("Multicast not supported")); + return promise; + } + + @Override + public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) { + return block(multicastAddress, sourceToBlock, newPromise()); + } + + @Override + public ChannelFuture block( + InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) { + try { + return block( + multicastAddress, + NetworkInterface.getByInetAddress(localAddress().getAddress()), + sourceToBlock, promise); + } catch (Throwable e) { + promise.setFailure(e); + } + return promise; + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollDatagramChannelUnsafe(); + } + + @Override + protected InetSocketAddress localAddress0() { + return local; + } + + @Override + protected InetSocketAddress remoteAddress0() { + return remote; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + InetSocketAddress addr = (InetSocketAddress) localAddress; + checkResolvable(addr); + Native.bind(fd, addr.getAddress(), addr.getPort()); + local = Native.localAddress(fd); + active = true; + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + for (;;) { + Object msg = in.current(); + if (msg == null) { + // Wrote all messages. + clearEpollOut(); + break; + } + + boolean done = false; + for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { + if (doWriteMessage(msg)) { + done = true; + break; + } + } + + if (done) { + in.remove(); + } else { + // Did not write all messages. + setEpollOut(); + break; + } + } + } + + private boolean doWriteMessage(Object msg) throws IOException { + final Object m; + InetSocketAddress remoteAddress; + ByteBuf data; + if (msg instanceof DatagramPacket) { + @SuppressWarnings("unchecked") + DatagramPacket packet = (DatagramPacket) msg; + remoteAddress = packet.recipient(); + m = packet.content(); + } else { + m = msg; + remoteAddress = null; + } + + if (m instanceof ByteBufHolder) { + data = ((ByteBufHolder) m).content(); + } else if (m instanceof ByteBuf) { + data = (ByteBuf) m; + } else { + throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); + } + + int dataLen = data.readableBytes(); + if (dataLen == 0) { + return true; + } + + if (remoteAddress == null) { + remoteAddress = this.remote; + if (remoteAddress == null) { + throw new NotYetConnectedException(); + } + } + + final int writtenBytes; + if (data.hasMemoryAddress()) { + long memoryAddress = data.memoryAddress(); + writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(), + remoteAddress.getAddress(), remoteAddress.getPort()); + } else { + ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes()); + writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(), + remoteAddress.getAddress(), remoteAddress.getPort()); + } + return writtenBytes > 0; + } + + @Override + public EpollDatagramChannelConfig config() { + return config; + } + + @Override + protected ChannelOutboundBuffer newOutboundBuffer() { + return EpollDatagramChannelOutboundBuffer.newInstance(this); + } + + @Override + protected void doDisconnect() throws Exception { + connected = false; + } + + final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe { + private RecvByteBufAllocator.Handle allocHandle; + + @Override + public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) { + boolean success = false; + try { + try { + InetSocketAddress remoteAddress = (InetSocketAddress) remote; + if (local != null) { + InetSocketAddress localAddress = (InetSocketAddress) local; + doBind(localAddress); + } + + checkResolvable(remoteAddress); + EpollDatagramChannel.this.remote = remoteAddress; + EpollDatagramChannel.this.local = Native.localAddress(fd); + success = true; + } finally { + if (!success) { + doClose(); + } else { + channelPromise.setSuccess(); + connected = true; + } + } + } catch (Throwable cause) { + channelPromise.setFailure(cause); + } + } + + @Override + void epollInReady() { + DatagramChannelConfig config = config(); + RecvByteBufAllocator.Handle allocHandle = this.allocHandle; + if (allocHandle == null) { + this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); + } + + assert eventLoop().inEventLoop(); + final ChannelPipeline pipeline = pipeline(); + Throwable exception = null; + try { + try { + for (;;) { + boolean free = true; + ByteBuf data = allocHandle.allocate(config.getAllocator()); + int writerIndex = data.writerIndex(); + DatagramSocketAddress remoteAddress; + if (data.hasMemoryAddress()) { + // has a memory address so use optimized call + remoteAddress = Native.recvFromAddress( + fd, data.memoryAddress(), writerIndex, data.capacity()); + } else { + ByteBuffer nioData = data.internalNioBuffer(writerIndex, data.writableBytes()); + remoteAddress = Native.recvFrom( + fd, nioData, nioData.position(), nioData.limit()); + } + + if (remoteAddress == null) { + break; + } + + int readBytes = remoteAddress.receivedAmount; + data.writerIndex(data.writerIndex() + readBytes); + allocHandle.record(readBytes); + try { + readPending = false; + pipeline.fireChannelRead( + new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress)); + free = false; + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } finally { + if (free) { + data.release(); + } + } + } + } catch (Throwable t) { + exception = t; + } + pipeline.fireChannelReadComplete(); + + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (!config().isAutoRead() && !readPending) { + clearEpollIn(); + } + } + } + } + + /** + * Act as special {@link InetSocketAddress} to be able to easily pass all needed data from JNI without the need + * to create more objects then needed. + */ + static final class DatagramSocketAddress extends InetSocketAddress { + // holds the amount of received bytes + final int receivedAmount; + + DatagramSocketAddress(String addr, int port, int receivedAmount) { + super(addr, port); + this.receivedAmount = receivedAmount; + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java new file mode 100644 index 0000000000..18c754611e --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -0,0 +1,281 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.DatagramChannelConfig; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.util.Map; + +public final class EpollDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig { + private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048); + private final EpollDatagramChannel datagramChannel; + private boolean activeOnOpen; + + EpollDatagramChannelConfig(EpollDatagramChannel channel) { + super(channel); + this.datagramChannel = channel; + setRecvByteBufAllocator(DEFAULT_RCVBUF_ALLOCATOR); + } + + @Override + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF, + ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED, + ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, + ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == ChannelOption.SO_BROADCAST) { + return (T) Boolean.valueOf(isBroadcast()); + } + if (option == ChannelOption.SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == ChannelOption.SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == ChannelOption.SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) { + return (T) Boolean.valueOf(isLoopbackModeDisabled()); + } + if (option == ChannelOption.IP_MULTICAST_ADDR) { + T i = (T) getInterface(); + return i; + } + if (option == ChannelOption.IP_MULTICAST_IF) { + T i = (T) getNetworkInterface(); + return i; + } + if (option == ChannelOption.IP_MULTICAST_TTL) { + return (T) Integer.valueOf(getTimeToLive()); + } + if (option == ChannelOption.IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); + } + if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { + return (T) Boolean.valueOf(activeOnOpen); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == ChannelOption.SO_BROADCAST) { + setBroadcast((Boolean) value); + } else if (option == ChannelOption.SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == ChannelOption.SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == ChannelOption.SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) { + setLoopbackModeDisabled((Boolean) value); + } else if (option == ChannelOption.IP_MULTICAST_ADDR) { + setInterface((InetAddress) value); + } else if (option == ChannelOption.IP_MULTICAST_IF) { + setNetworkInterface((NetworkInterface) value); + } else if (option == ChannelOption.IP_MULTICAST_TTL) { + setTimeToLive((Integer) value); + } else if (option == ChannelOption.IP_TOS) { + setTrafficClass((Integer) value); + } else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) { + setActiveOnOpen((Boolean) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + private void setActiveOnOpen(boolean activeOnOpen) { + if (channel.isRegistered()) { + throw new IllegalStateException("Can only changed before channel was registered"); + } + this.activeOnOpen = activeOnOpen; + } + + @Override + public EpollDatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } + + @Override + public EpollDatagramChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public EpollDatagramChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public EpollDatagramChannelConfig setAutoClose(boolean autoClose) { + super.setAutoClose(autoClose); + return this; + } + + @Override + public EpollDatagramChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public EpollDatagramChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public EpollDatagramChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public EpollDatagramChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public EpollDatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public EpollDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public int getSendBufferSize() { + return Native.getSendBufferSize(datagramChannel.fd); + } + + @Override + public EpollDatagramChannelConfig setSendBufferSize(int sendBufferSize) { + Native.setSendBufferSize(datagramChannel.fd, sendBufferSize); + return this; + } + + @Override + public int getReceiveBufferSize() { + return Native.getReceiveBufferSize(datagramChannel.fd); + } + + @Override + public EpollDatagramChannelConfig setReceiveBufferSize(int receiveBufferSize) { + Native.setReceiveBufferSize(datagramChannel.fd, receiveBufferSize); + return this; + } + + @Override + public int getTrafficClass() { + return Native.getTrafficClass(datagramChannel.fd); + } + + @Override + public EpollDatagramChannelConfig setTrafficClass(int trafficClass) { + Native.setTrafficClass(datagramChannel.fd, trafficClass); + return this; + } + + @Override + public boolean isReuseAddress() { + return Native.isReuseAddress(datagramChannel.fd) == 1; + } + + @Override + public EpollDatagramChannelConfig setReuseAddress(boolean reuseAddress) { + Native.setReuseAddress(datagramChannel.fd, reuseAddress ? 1 : 0); + return this; + } + + @Override + public boolean isBroadcast() { + return Native.isBroadcast(datagramChannel.fd) == 1; + } + + @Override + public EpollDatagramChannelConfig setBroadcast(boolean broadcast) { + Native.setBroadcast(datagramChannel.fd, broadcast ? 1 : 0); + return this; + } + + @Override + public boolean isLoopbackModeDisabled() { + return false; + } + + @Override + public DatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) { + throw new UnsupportedOperationException("Multicast not supported"); + } + + @Override + public int getTimeToLive() { + return -1; + } + + @Override + public EpollDatagramChannelConfig setTimeToLive(int ttl) { + throw new UnsupportedOperationException("Multicast not supported"); + } + + @Override + public InetAddress getInterface() { + return null; + } + + @Override + public EpollDatagramChannelConfig setInterface(InetAddress interfaceAddress) { + throw new UnsupportedOperationException("Multicast not supported"); + } + + @Override + public NetworkInterface getNetworkInterface() { + return null; + } + + @Override + public EpollDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) { + throw new UnsupportedOperationException("Multicast not supported"); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java new file mode 100644 index 0000000000..494ef41274 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelOutboundBuffer.java @@ -0,0 +1,63 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.Recycler; + +final class EpollDatagramChannelOutboundBuffer extends ChannelOutboundBuffer { + private static final Recycler RECYCLER = + new Recycler() { + @Override + protected EpollDatagramChannelOutboundBuffer newObject(Handle handle) { + return new EpollDatagramChannelOutboundBuffer(handle); + } + }; + + static EpollDatagramChannelOutboundBuffer newInstance(EpollDatagramChannel channel) { + EpollDatagramChannelOutboundBuffer buffer = RECYCLER.get(); + buffer.channel = channel; + return buffer; + } + + private EpollDatagramChannelOutboundBuffer(Recycler.Handle handle) { + super(handle); + } + + @Override + protected Object beforeAdd(Object msg) { + if (msg instanceof DatagramPacket) { + DatagramPacket packet = (DatagramPacket) msg; + ByteBuf content = packet.content(); + if (isCopyNeeded(content)) { + ByteBuf direct = copyToDirectByteBuf(content); + return new DatagramPacket(direct, packet.recipient(), packet.sender()); + } + } else if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (isCopyNeeded(buf)) { + msg = copyToDirectByteBuf((ByteBuf) msg); + } + } + return msg; + } + + private static boolean isCopyNeeded(ByteBuf content) { + return !content.hasMemoryAddress() || content.nioBufferCount() != 1; + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index ae5d545b87..d9b89d73ff 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -34,7 +34,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme private volatile InetSocketAddress local; public EpollServerSocketChannel() { - super(Native.EPOLLACCEPT); + super(Native.socketStreamFd(), Native.EPOLLACCEPT); config = new EpollServerSocketChannelConfig(this); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index e89b3a0f84..12d5d1c365 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -76,7 +76,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } public EpollSocketChannel() { - super(Native.EPOLLIN); + super(Native.socketStreamFd(), Native.EPOLLIN); config = new EpollSocketChannelConfig(this); } @@ -102,20 +102,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So this.local = Native.localAddress(fd); } - private void setEpollOut() { - if ((flags & Native.EPOLLOUT) == 0) { - flags |= Native.EPOLLOUT; - ((EpollEventLoop) eventLoop()).modify(this); - } - } - - private void clearEpollOut() { - if ((flags & Native.EPOLLOUT) != 0) { - flags &= ~Native.EPOLLOUT; - ((EpollEventLoop) eventLoop()).modify(this); - } - } - /** * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * @param buf the {@link ByteBuf} from which the bytes should be written diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 0a9532ede4..3baae8212b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -16,6 +16,7 @@ package io.netty.channel.epoll; +import io.netty.channel.ChannelException; import io.netty.channel.DefaultFileRegion; import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry; import io.netty.util.internal.NativeLibraryLoader; @@ -76,9 +77,10 @@ final class Native { public static native long sendfile(int dest, DefaultFileRegion src, long offset, long length) throws IOException; - // socket operations - public static native int socket() throws IOException; - public static void bind(int fd, InetAddress addr, int port) throws IOException { + public static int sendTo( + int fd, ByteBuffer buf, int pos, int limit, InetAddress addr, int port) throws IOException { + // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected + // to be called frequently byte[] address; int scopeId; if (addr instanceof Inet6Address) { @@ -89,7 +91,60 @@ final class Native { scopeId = 0; address = ipv4MappedIpv6Address(addr.getAddress()); } - bind(fd, address, scopeId, port); + return sendTo(fd, buf, pos, limit, address, scopeId, port); + } + + private static native int sendTo( + int fd, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port) throws IOException; + + public static int sendToAddress( + int fd, long memoryAddress, int pos, int limit, InetAddress addr, int port) throws IOException { + // just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected + // to be called frequently + byte[] address; + int scopeId; + if (addr instanceof Inet6Address) { + address = addr.getAddress(); + scopeId = ((Inet6Address) addr).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + scopeId = 0; + address = ipv4MappedIpv6Address(addr.getAddress()); + } + return sendToAddress(fd, memoryAddress, pos, limit, address, scopeId, port); + } + + private static native int sendToAddress( + int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port) throws IOException; + + public static native EpollDatagramChannel.DatagramSocketAddress recvFrom( + int fd, ByteBuffer buf, int pos, int limit) throws IOException; + + public static native EpollDatagramChannel.DatagramSocketAddress recvFromAddress( + int fd, long memoryAddress, int pos, int limit) throws IOException; + + // socket operations + public static int socketStreamFd() { + try { + return socketStream(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + public static int socketDgramFd() { + try { + return socketDgram(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + private static native int socketStream() throws IOException; + private static native int socketDgram() throws IOException; + + public static void bind(int fd, InetAddress addr, int port) throws IOException { + NativeInetAddress address = toNativeInetAddress(addr); + bind(fd, address.address, address.scopeId, port); } private static byte[] ipv4MappedIpv6Address(byte[] ipv4) { @@ -102,17 +157,8 @@ final class Native { public static native void bind(int fd, byte[] address, int scopeId, int port) throws IOException; public static native void listen(int fd, int backlog) throws IOException; public static boolean connect(int fd, InetAddress addr, int port) throws IOException { - byte[] address; - int scopeId; - if (addr instanceof Inet6Address) { - address = addr.getAddress(); - scopeId = ((Inet6Address) addr).getScopeId(); - } else { - // convert to ipv4 mapped ipv6 address; - scopeId = 0; - address = ipv4MappedIpv6Address(addr.getAddress()); - } - return connect(fd, address, scopeId, port); + NativeInetAddress address = toNativeInetAddress(addr); + return connect(fd, address.address, address.scopeId, port); } public static native boolean connect(int fd, byte[] address, int scopeId, int port) throws IOException; public static native boolean finishConnect(int fd) throws IOException; @@ -132,6 +178,7 @@ final class Native { public static native int isTcpCork(int fd); public static native int getSoLinger(int fd); public static native int getTrafficClass(int fd); + public static native int isBroadcast(int fd); public static native void setKeepAlive(int fd, int keepAlive); public static native void setReceiveBufferSize(int fd, int receiveBufferSize); @@ -142,6 +189,31 @@ final class Native { public static native void setTcpCork(int fd, int tcpCork); public static native void setSoLinger(int fd, int soLinger); public static native void setTrafficClass(int fd, int tcpNoDelay); + public static native void setBroadcast(int fd, int broadcast); + + private static NativeInetAddress toNativeInetAddress(InetAddress addr) { + byte[] bytes = addr.getAddress(); + if (addr instanceof Inet6Address) { + return new NativeInetAddress(bytes, ((Inet6Address) addr).getScopeId()); + } else { + // convert to ipv4 mapped ipv6 address; + return new NativeInetAddress(ipv4MappedIpv6Address(bytes)); + } + } + + private static class NativeInetAddress { + final byte[] address; + final int scopeId; + + NativeInetAddress(byte[] address, int scopeId) { + this.address = address; + this.scopeId = scopeId; + } + + NativeInetAddress(byte[] address) { + this(address, 0); + } + } public static native String kernelVersion(); private Native() { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java new file mode 100644 index 0000000000..610cfb5d85 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDatagramUnicastTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.DatagramUnicastTest; + +import java.util.List; + +public class EpollDatagramUnicastTest extends DatagramUnicastTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.datagram(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index e08562e5e6..6d56c6b891 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -16,8 +16,12 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ChannelFactory; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.InternetProtocolFamily; +import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.testsuite.transport.TestsuitePermutation; @@ -85,4 +89,34 @@ class EpollSocketTestPermutation extends SocketTestPermutation { } ); } + + @Override + public List> datagram() { + // Make the list of Bootstrap factories. + List> bfs = Arrays.asList( + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory() { + @Override + public Channel newChannel() { + return new NioDatagramChannel(InternetProtocolFamily.IPv4); + } + + @Override + public String toString() { + return NioDatagramChannel.class.getSimpleName() + ".class"; + } + }); + } + }, + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class); + } + } + ); + return combo(bfs, bfs); + } }