[#2377] Implement epoll based DatagramChannel

Motivation:
There is currently no epoll based DatagramChannel. We should add one to make the set of provided channels complete and also to be able to offer better performance compared to the NioDatagramChannel once SO_REUSEPORT is implemented.

Modifications:
Add implementation of DatagramChannel which uses epoll. This implementation does currently not support multicast yet which will me implemented later on. As most users will not use multicast anyway I think it is fair to just add the EpollDatagramChannel without the support for now. We shipped NioDatagramChannel without support earlier too ...

Result:
Be able to use EpollDatagramChannel for max. performance on linux
This commit is contained in:
Norman Maurer 2014-04-11 21:08:26 +02:00
parent d1d8a6b6cd
commit 8683e1ef3e
11 changed files with 1125 additions and 49 deletions

View File

@ -49,11 +49,14 @@ jfieldID readerIndexFieldId = NULL;
jfieldID writerIndexFieldId = NULL;
jfieldID memoryAddressFieldId = NULL;
jmethodID inetSocketAddrMethodId = NULL;
jmethodID datagramSocketAddrMethodId = NULL;
jclass runtimeExceptionClass = NULL;
jclass ioExceptionClass = NULL;
jclass closedChannelExceptionClass = NULL;
jmethodID closedChannelExceptionMethodId = NULL;
jclass inetSocketAddressClass = NULL;
jclass datagramSocketAddressClass = NULL;
static int socketType;
// util methods
@ -142,6 +145,23 @@ jobject createInetSocketAddress(JNIEnv * env, struct sockaddr_storage addr) {
return socketAddr;
}
jobject createDatagramSocketAddress(JNIEnv * env, struct sockaddr_storage addr, int len) {
char ipstr[INET6_ADDRSTRLEN];
int port;
if (addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
port = ntohs(s->sin_port);
inet_ntop(AF_INET, &s->sin_addr, ipstr, sizeof ipstr);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
port = ntohs(s->sin6_port);
inet_ntop(AF_INET6, &s->sin6_addr, ipstr, sizeof ipstr);
}
jstring ipString = (*env)->NewStringUTF(env, ipstr);
jobject socketAddr = (*env)->NewObject(env, datagramSocketAddressClass, datagramSocketAddrMethodId, ipString, port, len);
return socketAddr;
}
void init_sockaddr(JNIEnv * env, jbyteArray address, jint scopeId, jint jport, struct sockaddr_storage * addr) {
uint16_t port = htons((uint16_t) jport);
jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0);
@ -176,6 +196,16 @@ static int socket_type() {
return AF_INET6;
}
}
void init_in_addr(JNIEnv * env, jbyteArray address, struct in_addr * addr) {
jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0);
if (socketType == AF_INET6) {
memcpy(addr, addressBytes, 16);
} else {
memcpy(addr, addressBytes + 12, 4);
}
(*env)->ReleaseByteArrayElements(env, address, addressBytes, JNI_ABORT);
}
// util methods end
jint JNI_OnLoad(JavaVM* vm, void* reserved) {
@ -236,6 +266,18 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
return JNI_ERR;
}
jclass localDatagramSocketAddressClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollDatagramChannel$DatagramSocketAddress");
if (localDatagramSocketAddressClass == NULL) {
// pending exception...
return JNI_ERR;
}
datagramSocketAddressClass = (jclass) (*env)->NewGlobalRef(env, localDatagramSocketAddressClass);
if (datagramSocketAddressClass == NULL) {
// out-of-memory!
throwOutOfMemoryError(env, "Error allocating memory");
return JNI_ERR;
}
void *mem = malloc(1);
if (mem == NULL) {
throwOutOfMemoryError(env, "Error allocating native buffer");
@ -328,6 +370,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
}
socketType = socket_type();
datagramSocketAddrMethodId = (*env)->GetMethodID(env, datagramSocketAddressClass, "<init>", "(Ljava/lang/String;II)V");
if (datagramSocketAddrMethodId == NULL) {
throwRuntimeException(env, "Unable to obtain constructor of DatagramSocketAddress");
return JNI_ERR;
}
jclass addressEntryClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollChannelOutboundBuffer$AddressEntry");
if (addressEntryClass == NULL) {
// pending exception...
@ -371,6 +419,9 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) {
if (inetSocketAddressClass != NULL) {
(*env)->DeleteGlobalRef(env, inetSocketAddressClass);
}
if (datagramSocketAddressClass != NULL) {
(*env)->DeleteGlobalRef(env, datagramSocketAddressClass);
}
}
}
@ -501,7 +552,6 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * e
}
}
jint write0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {
ssize_t res;
int err;
@ -545,6 +595,86 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv *
return write0(env, clazz, fd, (void *) address, pos, limit);
}
jint sendTo0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit ,jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
init_sockaddr(env, address, scopeId, port, &addr);
ssize_t res;
int err;
do {
res = sendto(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr *)&addr, sizeof(struct sockaddr_storage));
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));
if (res < 0) {
// network stack saturated... try again later
if (err == EAGAIN || err == EWOULDBLOCK) {
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while sendto(...): ", err));
return -1;
}
return (jint) res;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port) {
void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return -1;
}
return sendTo0(env, fd, buffer, pos, limit, address, scopeId, port);
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit ,jbyteArray address, jint scopeId, jint port) {
return sendTo0(env, fd, (void*) memoryAddress, pos, limit, address, scopeId, port);
}
jobject recvFrom0(JNIEnv * env, jint fd, void* buffer, jint pos, jint limit) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
ssize_t res;
int err;
do {
res = recvfrom(fd, buffer + pos, (size_t) (limit - pos), 0, (struct sockaddr *)&addr, &addrlen);
// Keep on reading if we was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// Nothing left to read
return NULL;
}
if (err == EBADF) {
throwClosedChannelException(env);
return NULL;
}
throwIOException(env, exceptionMessage("Error while recvFrom(...): ", err));
return NULL;
}
return createDatagramSocketAddress(env, addr, res);
}
JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFrom(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) {
void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
if (buffer == NULL) {
throwRuntimeException(env, "Unable to access address of buffer");
return NULL;
}
return recvFrom0(env, fd, buffer, pos, limit);
}
JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFromAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit) {
return recvFrom0(env, fd, (void*) address, pos, limit);
}
void incrementPosition(JNIEnv * env, jobject bufObj, int written) {
// Get the current position using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
@ -715,9 +845,9 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env,
}
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz) {
jint socket0(JNIEnv * env, jclass clazz, int type) {
// TODO: Maybe also respect -Djava.net.preferIPv4Stack=true
int fd = socket(socketType, SOCK_STREAM | SOCK_NONBLOCK, 0);
int fd = socket(socketType, type | SOCK_NONBLOCK, 0);
if (fd == -1) {
int err = errno;
throwIOException(env, exceptionMessage("Error creating socket: ", err));
@ -734,6 +864,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, j
return fd;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketDgram(JNIEnv * env, jclass clazz) {
return socket0(env, clazz, SOCK_DGRAM);
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketStream(JNIEnv * env, jclass clazz) {
return socket0(env, clazz, SOCK_STREAM);
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) {
struct sockaddr_storage addr;
init_sockaddr(env, address, scopeId, port, &addr);
@ -937,6 +1075,10 @@ JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv
setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger));
}
JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv * env, jclass clazz, jint fd, jint optval) {
setOption(env, fd, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval));
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
@ -1005,6 +1147,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv
return optval;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv *env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval)) == -1) {
return -1;
}
return optval;
}
JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz) {
struct utsname name;
@ -1015,5 +1165,4 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_kernelVersion(JNIEn
int err = errno;
throwRuntimeException(env, exceptionMessage("Error during uname(...): ", err));
return NULL;
}

View File

@ -33,12 +33,18 @@ jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint f
jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length);
jint Java_io_netty_channel_epoll_Native_sendTo(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_sendToAddress(JNIEnv * env, jclass clazz, jint fd, jlong memoryAddress, jint pos, jint limit, jbyteArray address, jint scopeId, jint port);
jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
jobject Java_io_netty_channel_epoll_Native_recvFrom(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jobject Java_io_netty_channel_epoll_Native_recvFromAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd);
void Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write);
jint Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_socketStream(JNIEnv * env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_socketDgram(JNIEnv * env, jclass clazz);
void Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port);
void Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog);
jboolean Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port);
@ -56,6 +62,7 @@ void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz,
void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval);
void Java_io_netty_channel_epoll_Native_setBroadcast(JNIEnv *env, jclass clazz, jint fd, jint optval);
jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isReusePort(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd);
@ -64,4 +71,5 @@ jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass cl
jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv *env, jclass clazz, jint fd);
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv *env, jclass clazz);

View File

@ -17,11 +17,9 @@ package io.netty.channel.epoll;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.UnresolvedAddressException;
@ -33,8 +31,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
volatile int fd;
int id;
AbstractEpollChannel(EventLoop eventLoop, int flag) {
this(null, eventLoop, socketFd(), flag, false);
AbstractEpollChannel(EventLoop eventLoop, int fd, int flag) {
this(null, eventLoop, fd, flag, false);
}
AbstractEpollChannel(Channel parent, EventLoop eventLoop, int fd, int flag, boolean active) {
@ -45,14 +43,6 @@ abstract class AbstractEpollChannel extends AbstractChannel {
this.active = active;
}
private static int socketFd() {
try {
return Native.socket();
} catch (IOException e) {
throw new ChannelException(e);
}
}
@Override
public boolean isActive() {
return active;
@ -120,6 +110,20 @@ abstract class AbstractEpollChannel extends AbstractChannel {
}
}
protected final void setEpollOut() {
if ((flags & Native.EPOLLOUT) == 0) {
flags |= Native.EPOLLOUT;
((EpollEventLoop) eventLoop()).modify(this);
}
}
protected final void clearEpollOut() {
if ((flags & Native.EPOLLOUT) != 0) {
flags &= ~Native.EPOLLOUT;
((EpollEventLoop) eventLoop()).modify(this);
}
}
@Override
protected void doRegister() throws Exception {
EpollEventLoop loop = (EpollEventLoop) eventLoop();

View File

@ -0,0 +1,455 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.NotYetConnectedException;
/**
* {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
* maximal performance.
*/
public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private volatile InetSocketAddress local;
private volatile InetSocketAddress remote;
private volatile boolean connected;
private final EpollDatagramChannelConfig config;
public EpollDatagramChannel(EventLoop loop) {
super(loop, Native.socketDgramFd(), Native.EPOLLIN);
config = new EpollDatagramChannelConfig(this);
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public boolean isActive() {
return fd != -1 &&
((config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered())
|| active);
}
@Override
public boolean isConnected() {
return connected;
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress) {
return joinGroup(multicastAddress, newPromise());
}
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
try {
return joinGroup(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()),
null, promise);
} catch (SocketException e) {
promise.setFailure(e);
}
return promise;
}
@Override
public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return joinGroup(multicastAddress, networkInterface, newPromise());
}
@Override
public ChannelFuture joinGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface,
ChannelPromise promise) {
return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
}
@Override
public ChannelFuture joinGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return joinGroup(multicastAddress, networkInterface, source, newPromise());
}
@Override
public ChannelFuture joinGroup(
final InetAddress multicastAddress, final NetworkInterface networkInterface,
final InetAddress source, final ChannelPromise promise) {
if (multicastAddress == null) {
throw new NullPointerException("multicastAddress");
}
if (networkInterface == null) {
throw new NullPointerException("networkInterface");
}
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
return promise;
}
@Override
public ChannelFuture leaveGroup(InetAddress multicastAddress) {
return leaveGroup(multicastAddress, newPromise());
}
@Override
public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
try {
return leaveGroup(
multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
} catch (SocketException e) {
promise.setFailure(e);
}
return promise;
}
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
return leaveGroup(multicastAddress, networkInterface, newPromise());
}
@Override
public ChannelFuture leaveGroup(
InetSocketAddress multicastAddress,
NetworkInterface networkInterface, ChannelPromise promise) {
return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
}
@Override
public ChannelFuture leaveGroup(
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
return leaveGroup(multicastAddress, networkInterface, source, newPromise());
}
@Override
public ChannelFuture leaveGroup(
final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
final ChannelPromise promise) {
if (multicastAddress == null) {
throw new NullPointerException("multicastAddress");
}
if (networkInterface == null) {
throw new NullPointerException("networkInterface");
}
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
return promise;
}
@Override
public ChannelFuture block(
InetAddress multicastAddress, NetworkInterface networkInterface,
InetAddress sourceToBlock) {
return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
}
@Override
public ChannelFuture block(
final InetAddress multicastAddress, final NetworkInterface networkInterface,
final InetAddress sourceToBlock, final ChannelPromise promise) {
if (multicastAddress == null) {
throw new NullPointerException("multicastAddress");
}
if (sourceToBlock == null) {
throw new NullPointerException("sourceToBlock");
}
if (networkInterface == null) {
throw new NullPointerException("networkInterface");
}
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
return promise;
}
@Override
public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
return block(multicastAddress, sourceToBlock, newPromise());
}
@Override
public ChannelFuture block(
InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
try {
return block(
multicastAddress,
NetworkInterface.getByInetAddress(localAddress().getAddress()),
sourceToBlock, promise);
} catch (Throwable e) {
promise.setFailure(e);
}
return promise;
}
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollDatagramChannelUnsafe();
}
@Override
protected InetSocketAddress localAddress0() {
return local;
}
@Override
protected InetSocketAddress remoteAddress0() {
return remote;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr);
Native.bind(fd, addr.getAddress(), addr.getPort());
local = Native.localAddress(fd);
active = true;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearEpollOut();
break;
}
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
setEpollOut();
break;
}
}
}
private boolean doWriteMessage(Object msg) throws IOException {
final Object m;
InetSocketAddress remoteAddress;
ByteBuf data;
if (msg instanceof DatagramPacket) {
@SuppressWarnings("unchecked")
DatagramPacket packet = (DatagramPacket) msg;
remoteAddress = packet.recipient();
m = packet.content();
} else {
m = msg;
remoteAddress = null;
}
if (m instanceof ByteBufHolder) {
data = ((ByteBufHolder) m).content();
} else if (m instanceof ByteBuf) {
data = (ByteBuf) m;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
int dataLen = data.readableBytes();
if (dataLen == 0) {
return true;
}
if (remoteAddress == null) {
remoteAddress = this.remote;
if (remoteAddress == null) {
throw new NotYetConnectedException();
}
}
final int writtenBytes;
if (data.hasMemoryAddress()) {
long memoryAddress = data.memoryAddress();
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
} else {
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
return writtenBytes > 0;
}
@Override
public EpollDatagramChannelConfig config() {
return config;
}
@Override
protected ChannelOutboundBuffer newOutboundBuffer() {
return EpollDatagramChannelOutboundBuffer.newInstance(this);
}
@Override
protected void doDisconnect() throws Exception {
connected = false;
}
final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
@Override
public void connect(SocketAddress remote, SocketAddress local, ChannelPromise channelPromise) {
boolean success = false;
try {
try {
InetSocketAddress remoteAddress = (InetSocketAddress) remote;
if (local != null) {
InetSocketAddress localAddress = (InetSocketAddress) local;
doBind(localAddress);
}
checkResolvable(remoteAddress);
EpollDatagramChannel.this.remote = remoteAddress;
EpollDatagramChannel.this.local = Native.localAddress(fd);
success = true;
} finally {
if (!success) {
doClose();
} else {
channelPromise.setSuccess();
connected = true;
}
}
} catch (Throwable cause) {
channelPromise.setFailure(cause);
}
}
@Override
void epollInReady() {
DatagramChannelConfig config = config();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
try {
for (;;) {
boolean free = true;
ByteBuf data = allocHandle.allocate(config.getAllocator());
int writerIndex = data.writerIndex();
DatagramSocketAddress remoteAddress;
if (data.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = Native.recvFromAddress(
fd, data.memoryAddress(), writerIndex, data.capacity());
} else {
ByteBuffer nioData = data.internalNioBuffer(writerIndex, data.writableBytes());
remoteAddress = Native.recvFrom(
fd, nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) {
break;
}
int readBytes = remoteAddress.receivedAmount;
data.writerIndex(data.writerIndex() + readBytes);
allocHandle.record(readBytes);
try {
readPending = false;
pipeline.fireChannelRead(
new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
free = false;
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
if (free) {
data.release();
}
}
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead() && !readPending) {
clearEpollIn();
}
}
}
}
/**
* Act as special {@link InetSocketAddress} to be able to easily pass all needed data from JNI without the need
* to create more objects then needed.
*/
static final class DatagramSocketAddress extends InetSocketAddress {
// holds the amount of received bytes
final int receivedAmount;
DatagramSocketAddress(String addr, int port, int receivedAmount) {
super(addr, port);
this.receivedAmount = receivedAmount;
}
}
}

View File

@ -0,0 +1,275 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannelConfig;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Map;
public final class EpollDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig {
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
private final EpollDatagramChannel datagramChannel;
private boolean activeOnOpen;
EpollDatagramChannelConfig(EpollDatagramChannel channel) {
super(channel);
this.datagramChannel = channel;
setRecvByteBufAllocator(DEFAULT_RCVBUF_ALLOCATOR);
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF,
ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED,
ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL,
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == ChannelOption.SO_BROADCAST) {
return (T) Boolean.valueOf(isBroadcast());
}
if (option == ChannelOption.SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == ChannelOption.SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == ChannelOption.SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
return (T) Boolean.valueOf(isLoopbackModeDisabled());
}
if (option == ChannelOption.IP_MULTICAST_ADDR) {
T i = (T) getInterface();
return i;
}
if (option == ChannelOption.IP_MULTICAST_IF) {
T i = (T) getNetworkInterface();
return i;
}
if (option == ChannelOption.IP_MULTICAST_TTL) {
return (T) Integer.valueOf(getTimeToLive());
}
if (option == ChannelOption.IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
return (T) Boolean.valueOf(activeOnOpen);
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == ChannelOption.SO_BROADCAST) {
setBroadcast((Boolean) value);
} else if (option == ChannelOption.SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == ChannelOption.SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == ChannelOption.SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
setLoopbackModeDisabled((Boolean) value);
} else if (option == ChannelOption.IP_MULTICAST_ADDR) {
setInterface((InetAddress) value);
} else if (option == ChannelOption.IP_MULTICAST_IF) {
setNetworkInterface((NetworkInterface) value);
} else if (option == ChannelOption.IP_MULTICAST_TTL) {
setTimeToLive((Integer) value);
} else if (option == ChannelOption.IP_TOS) {
setTrafficClass((Integer) value);
} else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
setActiveOnOpen((Boolean) value);
} else {
return super.setOption(option, value);
}
return true;
}
private void setActiveOnOpen(boolean activeOnOpen) {
if (channel.isRegistered()) {
throw new IllegalStateException("Can only changed before channel was registered");
}
this.activeOnOpen = activeOnOpen;
}
@Override
public EpollDatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
@Override
public EpollDatagramChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public EpollDatagramChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public EpollDatagramChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public EpollDatagramChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public EpollDatagramChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public EpollDatagramChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public EpollDatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public EpollDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public int getSendBufferSize() {
return Native.getSendBufferSize(datagramChannel.fd);
}
@Override
public EpollDatagramChannelConfig setSendBufferSize(int sendBufferSize) {
Native.setSendBufferSize(datagramChannel.fd, sendBufferSize);
return this;
}
@Override
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(datagramChannel.fd);
}
@Override
public EpollDatagramChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(datagramChannel.fd, receiveBufferSize);
return this;
}
@Override
public int getTrafficClass() {
return Native.getTrafficClass(datagramChannel.fd);
}
@Override
public EpollDatagramChannelConfig setTrafficClass(int trafficClass) {
Native.setTrafficClass(datagramChannel.fd, trafficClass);
return this;
}
@Override
public boolean isReuseAddress() {
return Native.isReuseAddress(datagramChannel.fd) == 1;
}
@Override
public EpollDatagramChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(datagramChannel.fd, reuseAddress ? 1 : 0);
return this;
}
@Override
public boolean isBroadcast() {
return Native.isBroadcast(datagramChannel.fd) == 1;
}
@Override
public EpollDatagramChannelConfig setBroadcast(boolean broadcast) {
Native.setBroadcast(datagramChannel.fd, broadcast ? 1 : 0);
return this;
}
@Override
public boolean isLoopbackModeDisabled() {
return false;
}
@Override
public DatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) {
throw new UnsupportedOperationException("Multicast not supported");
}
@Override
public int getTimeToLive() {
return -1;
}
@Override
public EpollDatagramChannelConfig setTimeToLive(int ttl) {
throw new UnsupportedOperationException("Multicast not supported");
}
@Override
public InetAddress getInterface() {
return null;
}
@Override
public EpollDatagramChannelConfig setInterface(InetAddress interfaceAddress) {
throw new UnsupportedOperationException("Multicast not supported");
}
@Override
public NetworkInterface getNetworkInterface() {
return null;
}
@Override
public EpollDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) {
throw new UnsupportedOperationException("Multicast not supported");
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.Recycler;
final class EpollDatagramChannelOutboundBuffer extends ChannelOutboundBuffer {
private static final Recycler<EpollDatagramChannelOutboundBuffer> RECYCLER =
new Recycler<EpollDatagramChannelOutboundBuffer>() {
@Override
protected EpollDatagramChannelOutboundBuffer newObject(Handle<EpollDatagramChannelOutboundBuffer> handle) {
return new EpollDatagramChannelOutboundBuffer(handle);
}
};
static EpollDatagramChannelOutboundBuffer newInstance(EpollDatagramChannel channel) {
EpollDatagramChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}
private EpollDatagramChannelOutboundBuffer(Recycler.Handle<EpollDatagramChannelOutboundBuffer> handle) {
super(handle);
}
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (isCopyNeeded(content)) {
ByteBuf direct = copyToDirectByteBuf(content);
return new DatagramPacket(direct, packet.recipient(), packet.sender());
}
} else if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (isCopyNeeded(buf)) {
msg = copyToDirectByteBuf((ByteBuf) msg);
}
}
return msg;
}
private static boolean isCopyNeeded(ByteBuf content) {
return !content.hasMemoryAddress() || content.nioBufferCount() != 1;
}
}

View File

@ -36,7 +36,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
private volatile InetSocketAddress local;
public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
super(eventLoop, Native.EPOLLACCEPT);
super(eventLoop, Native.socketStreamFd(), Native.EPOLLACCEPT);
config = new EpollServerSocketChannelConfig(this);
this.childGroup = childGroup;
}

View File

@ -76,7 +76,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
public EpollSocketChannel(EventLoop eventLoop) {
super(eventLoop, Native.EPOLLIN);
super(eventLoop, Native.socketStreamFd(), Native.EPOLLIN);
config = new EpollSocketChannelConfig(this);
}
@ -102,20 +102,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
this.local = Native.localAddress(fd);
}
private void setEpollOut() {
if ((flags & Native.EPOLLOUT) == 0) {
flags |= Native.EPOLLOUT;
((EpollEventLoop) eventLoop()).modify(this);
}
}
private void clearEpollOut() {
if ((flags & Native.EPOLLOUT) != 0) {
flags &= ~Native.EPOLLOUT;
((EpollEventLoop) eventLoop()).modify(this);
}
}
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written

View File

@ -16,6 +16,7 @@
package io.netty.channel.epoll;
import io.netty.channel.ChannelException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.epoll.EpollChannelOutboundBuffer.AddressEntry;
import io.netty.util.internal.NativeLibraryLoader;
@ -76,9 +77,10 @@ final class Native {
public static native long sendfile(int dest, DefaultFileRegion src, long offset, long length) throws IOException;
// socket operations
public static native int socket() throws IOException;
public static void bind(int fd, InetAddress addr, int port) throws IOException {
public static int sendTo(
int fd, ByteBuffer buf, int pos, int limit, InetAddress addr, int port) throws IOException {
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
// to be called frequently
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
@ -89,7 +91,60 @@ final class Native {
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
bind(fd, address, scopeId, port);
return sendTo(fd, buf, pos, limit, address, scopeId, port);
}
private static native int sendTo(
int fd, ByteBuffer buf, int pos, int limit, byte[] address, int scopeId, int port) throws IOException;
public static int sendToAddress(
int fd, long memoryAddress, int pos, int limit, InetAddress addr, int port) throws IOException {
// just duplicate the toNativeInetAddress code here to minimize object creation as this method is expected
// to be called frequently
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
address = addr.getAddress();
scopeId = ((Inet6Address) addr).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
return sendToAddress(fd, memoryAddress, pos, limit, address, scopeId, port);
}
private static native int sendToAddress(
int fd, long memoryAddress, int pos, int limit, byte[] address, int scopeId, int port) throws IOException;
public static native EpollDatagramChannel.DatagramSocketAddress recvFrom(
int fd, ByteBuffer buf, int pos, int limit) throws IOException;
public static native EpollDatagramChannel.DatagramSocketAddress recvFromAddress(
int fd, long memoryAddress, int pos, int limit) throws IOException;
// socket operations
public static int socketStreamFd() {
try {
return socketStream();
} catch (IOException e) {
throw new ChannelException(e);
}
}
public static int socketDgramFd() {
try {
return socketDgram();
} catch (IOException e) {
throw new ChannelException(e);
}
}
private static native int socketStream() throws IOException;
private static native int socketDgram() throws IOException;
public static void bind(int fd, InetAddress addr, int port) throws IOException {
NativeInetAddress address = toNativeInetAddress(addr);
bind(fd, address.address, address.scopeId, port);
}
private static byte[] ipv4MappedIpv6Address(byte[] ipv4) {
@ -102,17 +157,8 @@ final class Native {
public static native void bind(int fd, byte[] address, int scopeId, int port) throws IOException;
public static native void listen(int fd, int backlog) throws IOException;
public static boolean connect(int fd, InetAddress addr, int port) throws IOException {
byte[] address;
int scopeId;
if (addr instanceof Inet6Address) {
address = addr.getAddress();
scopeId = ((Inet6Address) addr).getScopeId();
} else {
// convert to ipv4 mapped ipv6 address;
scopeId = 0;
address = ipv4MappedIpv6Address(addr.getAddress());
}
return connect(fd, address, scopeId, port);
NativeInetAddress address = toNativeInetAddress(addr);
return connect(fd, address.address, address.scopeId, port);
}
public static native boolean connect(int fd, byte[] address, int scopeId, int port) throws IOException;
public static native boolean finishConnect(int fd) throws IOException;
@ -132,6 +178,7 @@ final class Native {
public static native int isTcpCork(int fd);
public static native int getSoLinger(int fd);
public static native int getTrafficClass(int fd);
public static native int isBroadcast(int fd);
public static native void setKeepAlive(int fd, int keepAlive);
public static native void setReceiveBufferSize(int fd, int receiveBufferSize);
@ -142,6 +189,31 @@ final class Native {
public static native void setTcpCork(int fd, int tcpCork);
public static native void setSoLinger(int fd, int soLinger);
public static native void setTrafficClass(int fd, int tcpNoDelay);
public static native void setBroadcast(int fd, int broadcast);
private static NativeInetAddress toNativeInetAddress(InetAddress addr) {
byte[] bytes = addr.getAddress();
if (addr instanceof Inet6Address) {
return new NativeInetAddress(bytes, ((Inet6Address) addr).getScopeId());
} else {
// convert to ipv4 mapped ipv6 address;
return new NativeInetAddress(ipv4MappedIpv6Address(bytes));
}
}
private static class NativeInetAddress {
final byte[] address;
final int scopeId;
NativeInetAddress(byte[] address, int scopeId) {
this.address = address;
this.scopeId = scopeId;
}
NativeInetAddress(byte[] address) {
this(address, 0);
}
}
public static native String kernelVersion();
private Native() {

View File

@ -0,0 +1,29 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.DatagramUnicastTest;
import java.util.List;
public class EpollDatagramUnicastTest extends DatagramUnicastTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.datagram();
}
}

View File

@ -16,8 +16,13 @@
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ChannelFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.testsuite.transport.TestsuitePermutation;
@ -85,4 +90,34 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
}
);
}
@Override
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram() {
// Make the list of Bootstrap factories.
List<BootstrapFactory<Bootstrap>> bfs = Arrays.asList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel(EventLoop loop) {
return new NioDatagramChannel(loop, InternetProtocolFamily.IPv4);
}
@Override
public String toString() {
return NioDatagramChannel.class.getSimpleName() + ".class";
}
});
}
},
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDatagramChannel.class);
}
}
);
return combo(bfs, bfs);
}
}