Add support for Unix Domain Sockets when using native epoll transport

Motivation:

Using Unix Domain Sockets can be very useful when communication should take place on the same host and has less overhead then using loopback. We should support this with the native epoll transport.

Modifications:

- Add support for Unix Domain Sockets.
- Adjust testsuite to be able to reuse tests.

Result:

Unix Domain Sockets are now support when using native epoll transport.
This commit is contained in:
Norman Maurer 2015-01-14 16:38:46 +01:00
parent 108a95caca
commit b898bdda84
33 changed files with 1884 additions and 869 deletions

View File

@ -24,11 +24,12 @@ import io.netty.testsuite.util.TestUtils;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List; import java.util.List;
public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest<Bootstrap> { public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest<Bootstrap> {
protected volatile InetSocketAddress addr; protected volatile SocketAddress addr;
protected AbstractClientSocketTest() { protected AbstractClientSocketTest() {
super(Bootstrap.class); super(Bootstrap.class);
@ -41,8 +42,13 @@ public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest<Boo
@Override @Override
protected void configure(Bootstrap bootstrap, ByteBufAllocator allocator) { protected void configure(Bootstrap bootstrap, ByteBufAllocator allocator) {
addr = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort()); addr = newSocketAddress();
bootstrap.remoteAddress(addr); bootstrap.remoteAddress(addr);
bootstrap.option(ChannelOption.ALLOCATOR, allocator); bootstrap.option(ChannelOption.ALLOCATOR, allocator);
} }
protected SocketAddress newSocketAddress() {
return new InetSocketAddress(
NetUtil.LOCALHOST, TestUtils.getFreePort());
}
} }

View File

@ -24,11 +24,12 @@ import io.netty.testsuite.util.TestUtils;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List; import java.util.List;
public abstract class AbstractServerSocketTest extends AbstractTestsuiteTest<ServerBootstrap> { public abstract class AbstractServerSocketTest extends AbstractTestsuiteTest<ServerBootstrap> {
protected volatile InetSocketAddress addr; protected volatile SocketAddress addr;
protected AbstractServerSocketTest() { protected AbstractServerSocketTest() {
super(ServerBootstrap.class); super(ServerBootstrap.class);
@ -41,10 +42,14 @@ public abstract class AbstractServerSocketTest extends AbstractTestsuiteTest<Ser
@Override @Override
protected void configure(ServerBootstrap bootstrap, ByteBufAllocator allocator) { protected void configure(ServerBootstrap bootstrap, ByteBufAllocator allocator) {
addr = new InetSocketAddress( addr = newSocketAddress();
NetUtil.LOCALHOST, TestUtils.getFreePort());
bootstrap.localAddress(addr); bootstrap.localAddress(addr);
bootstrap.option(ChannelOption.ALLOCATOR, allocator); bootstrap.option(ChannelOption.ALLOCATOR, allocator);
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator); bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
} }
protected SocketAddress newSocketAddress() {
return new InetSocketAddress(
NetUtil.LOCALHOST, TestUtils.getFreePort());
}
} }

View File

@ -25,11 +25,12 @@ import io.netty.testsuite.util.TestUtils;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List; import java.util.List;
public abstract class AbstractSocketTest extends AbstractComboTestsuiteTest<ServerBootstrap, Bootstrap> { public abstract class AbstractSocketTest extends AbstractComboTestsuiteTest<ServerBootstrap, Bootstrap> {
protected volatile InetSocketAddress addr; protected volatile SocketAddress addr;
protected AbstractSocketTest() { protected AbstractSocketTest() {
super(ServerBootstrap.class, Bootstrap.class); super(ServerBootstrap.class, Bootstrap.class);
@ -42,12 +43,16 @@ public abstract class AbstractSocketTest extends AbstractComboTestsuiteTest<Serv
@Override @Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
addr = new InetSocketAddress( addr = newSocketAddress();
NetUtil.LOCALHOST, TestUtils.getFreePort());
bootstrap.localAddress(addr); bootstrap.localAddress(addr);
bootstrap.option(ChannelOption.ALLOCATOR, allocator); bootstrap.option(ChannelOption.ALLOCATOR, allocator);
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator); bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
bootstrap2.remoteAddress(addr); bootstrap2.remoteAddress(addr);
bootstrap2.option(ChannelOption.ALLOCATOR, allocator); bootstrap2.option(ChannelOption.ALLOCATOR, allocator);
} }
protected SocketAddress newSocketAddress() {
return new InetSocketAddress(
NetUtil.LOCALHOST, TestUtils.getFreePort());
}
} }

View File

@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -129,15 +128,15 @@ public class SocketEchoTest extends AbstractSocketTest {
final EchoHandler ch = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead);
if (additionalExecutor) { if (additionalExecutor) {
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(SocketChannel c) throws Exception { protected void initChannel(Channel c) throws Exception {
c.pipeline().addLast(group, sh); c.pipeline().addLast(group, sh);
} }
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(SocketChannel c) throws Exception { protected void initChannel(Channel c) throws Exception {
c.pipeline().addLast(group, ch); c.pipeline().addLast(group, ch);
} }
}); });

View File

@ -23,7 +23,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder;
import org.junit.Test; import org.junit.Test;
@ -64,17 +63,17 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
sch.pipeline().addAfter("decoder", "handler", sh); sch.pipeline().addAfter("decoder", "handler", sh);
} }
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
sch.pipeline().addAfter("decoder", "handler", ch); sch.pipeline().addAfter("decoder", "handler", ch);
} }

View File

@ -21,7 +21,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.codec.serialization.ObjectEncoder;
@ -72,9 +71,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler sh = new EchoHandler(autoRead);
final EchoHandler ch = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
sch.pipeline().addLast( sch.pipeline().addLast(
new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
new ObjectEncoder(), new ObjectEncoder(),
@ -82,9 +81,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
} }
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
sch.pipeline().addLast( sch.pipeline().addLast(
new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
new ObjectEncoder(), new ObjectEncoder(),

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.JdkSslClientContext; import io.netty.handler.ssl.JdkSslClientContext;
import io.netty.handler.ssl.JdkSslServerContext; import io.netty.handler.ssl.JdkSslServerContext;
import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.OpenSsl;
@ -182,8 +181,8 @@ public class SocketSslEchoTest extends AbstractSocketTest {
private final AtomicInteger clientNegoCounter = new AtomicInteger(); private final AtomicInteger clientNegoCounter = new AtomicInteger();
private final AtomicInteger serverNegoCounter = new AtomicInteger(); private final AtomicInteger serverNegoCounter = new AtomicInteger();
private volatile SocketChannel clientChannel; private volatile Channel clientChannel;
private volatile SocketChannel serverChannel; private volatile Channel serverChannel;
private volatile SslHandler clientSslHandler; private volatile SslHandler clientSslHandler;
private volatile SslHandler serverSslHandler; private volatile SslHandler serverSslHandler;
@ -222,10 +221,10 @@ public class SocketSslEchoTest extends AbstractSocketTest {
final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool(); final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool();
reset(); reset();
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
serverChannel = sch; serverChannel = sch;
if (serverUsesDelegatedTaskExecutor) { if (serverUsesDelegatedTaskExecutor) {
@ -243,10 +242,10 @@ public class SocketSslEchoTest extends AbstractSocketTest {
} }
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
clientChannel = sch; clientChannel = sch;
if (clientUsesDelegatedTaskExecutor) { if (clientUsesDelegatedTaskExecutor) {

View File

@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.JdkSslClientContext; import io.netty.handler.ssl.JdkSslClientContext;
@ -117,9 +116,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
final ServerHandler sh = new ServerHandler(); final ServerHandler sh = new ServerHandler();
final ClientHandler ch = new ClientHandler(); final ClientHandler ch = new ClientHandler();
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
ChannelPipeline p = sch.pipeline(); ChannelPipeline p = sch.pipeline();
p.addLast(serverCtx.newHandler(sch.alloc())); p.addLast(serverCtx.newHandler(sch.alloc()));
p.addLast(new LoggingHandler(LOG_LEVEL)); p.addLast(new LoggingHandler(LOG_LEVEL));
@ -127,9 +126,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
} }
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
ChannelPipeline p = sch.pipeline(); ChannelPipeline p = sch.pipeline();
p.addLast(clientCtx.newHandler(sch.alloc())); p.addLast(clientCtx.newHandler(sch.alloc()));
p.addLast(new LoggingHandler(LOG_LEVEL)); p.addLast(new LoggingHandler(LOG_LEVEL));

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.string.StringEncoder;
@ -150,9 +149,9 @@ public class SocketStartTlsTest extends AbstractSocketTest {
final StartTlsServerHandler sh = new StartTlsServerHandler(sse, autoRead); final StartTlsServerHandler sh = new StartTlsServerHandler(sse, autoRead);
final StartTlsClientHandler ch = new StartTlsClientHandler(cse, autoRead); final StartTlsClientHandler ch = new StartTlsClientHandler(cse, autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
ChannelPipeline p = sch.pipeline(); ChannelPipeline p = sch.pipeline();
p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast("logger", new LoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
@ -160,9 +159,9 @@ public class SocketStartTlsTest extends AbstractSocketTest {
} }
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
ChannelPipeline p = sch.pipeline(); ChannelPipeline p = sch.pipeline();
p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast("logger", new LoggingHandler(LOG_LEVEL));
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());

View File

@ -21,7 +21,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
@ -74,9 +73,9 @@ public class SocketStringEchoTest extends AbstractSocketTest {
final StringEchoHandler sh = new StringEchoHandler(autoRead); final StringEchoHandler sh = new StringEchoHandler(autoRead);
final StringEchoHandler ch = new StringEchoHandler(autoRead); final StringEchoHandler ch = new StringEchoHandler(autoRead);
sb.childHandler(new ChannelInitializer<SocketChannel>() { sb.childHandler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter())); sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
@ -84,9 +83,9 @@ public class SocketStringEchoTest extends AbstractSocketTest {
} }
}); });
cb.handler(new ChannelInitializer<SocketChannel>() { cb.handler(new ChannelInitializer<Channel>() {
@Override @Override
public void initChannel(SocketChannel sch) throws Exception { public void initChannel(Channel sch) throws Exception {
sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter())); sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));

View File

@ -21,6 +21,7 @@
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <sys/sendfile.h> #include <sys/sendfile.h>
#include <sys/un.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/types.h> #include <sys/types.h>
@ -1390,3 +1391,101 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_strError(JNIEnv* en
char* err = strerror(error); char* err = strerror(error);
return (*env)->NewStringUTF(env, err); return (*env)->NewStringUTF(env, err);
} }
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketDomain(JNIEnv* env, jclass clazz) {
int fd = socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd == -1) {
return -errno;
}
return fd;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_bindDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) {
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0);
memcpy(addr.sun_path, socket_path, strlen(socket_path));
if (unlink(socket_path) == -1 && errno != ENOENT) {
return -errno;
}
int res = bind(fd, (struct sockaddr*) &addr, sizeof(addr));
(*env)->ReleaseStringUTFChars(env, socketPath, socket_path);
if (res == -1) {
return -errno;
}
return res;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_connectDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) {
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0);
strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
int res;
int err;
do {
res = connect(fd, (struct sockaddr*) &addr, sizeof(addr));
} while (res == -1 && ((err = errno) == EINTR));
(*env)->ReleaseStringUTFChars(env, socketPath, socket_path);
if (res < 0) {
return -err;
}
return 0;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_recvFd(JNIEnv* env, jclass clazz, jint fd) {
jint socketFd;
struct msghdr msg;
struct iovec iov[1];
struct cmsghdr* ctrl_msg = NULL;
char msg_buffer[1];
char elem_buffer[CMSG_SPACE(sizeof(int))];
/* Fill all with 0 */
memset(&msg, 0, sizeof(struct msghdr));
memset(elem_buffer, 0, CMSG_SPACE(sizeof(int)));
iov[0].iov_base = msg_buffer;
iov[0].iov_len = 1;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = elem_buffer;
msg.msg_controllen = CMSG_SPACE(sizeof(int));
if(recvmsg(fd, &msg, MSG_CMSG_CLOEXEC) < 0) {
// All read, return -1
return -1;
}
if((msg.msg_flags & MSG_CTRUNC) == MSG_CTRUNC) {
// Not enough space ?!?!
return -1;
}
// skip empty entries
for(ctrl_msg = CMSG_FIRSTHDR(&msg); ctrl_msg != NULL; ctrl_msg = CMSG_NXTHDR(&msg, ctrl_msg)) {
if((ctrl_msg->cmsg_level == SOL_SOCKET) && (ctrl_msg->cmsg_type == SCM_RIGHTS)) {
socketFd = *((int *) CMSG_DATA(ctrl_msg));
// set as non blocking as we want to use it with epoll
if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) {
return -errno;
}
return socketFd;
}
}
return -1;
}

View File

@ -27,6 +27,7 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException; import java.nio.channels.UnresolvedAddressException;
abstract class AbstractEpollChannel extends AbstractChannel { abstract class AbstractEpollChannel extends AbstractChannel {
@ -49,6 +50,10 @@ abstract class AbstractEpollChannel extends AbstractChannel {
this.active = active; this.active = active;
} }
protected final int fd() {
return fd;
}
@Override @Override
public boolean isActive() { public boolean isActive() {
return active; return active;
@ -71,16 +76,6 @@ abstract class AbstractEpollChannel extends AbstractChannel {
Native.close(fd); Native.close(fd);
} }
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override @Override
protected void doDisconnect() throws Exception { protected void doDisconnect() throws Exception {
doClose(); doClose();
@ -214,6 +209,72 @@ abstract class AbstractEpollChannel extends AbstractChannel {
} }
} }
/**
* Read bytes into the given {@link ByteBuf} and return the amount.
*/
protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
int writerIndex = byteBuf.writerIndex();
int localReadAmount;
if (byteBuf.hasMemoryAddress()) {
localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
} else {
ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
localReadAmount = Native.read(fd, buf, buf.position(), buf.limit());
}
if (localReadAmount > 0) {
byteBuf.writerIndex(writerIndex + localReadAmount);
}
return localReadAmount;
}
protected final int doWriteBytes(ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes();
int writtenBytes = 0;
if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress();
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
for (;;) {
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) {
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
return writtenBytes;
}
readerIndex += localFlushedAmount;
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
return writtenBytes;
}
}
} else {
ByteBuffer nioBuf;
if (buf.nioBufferCount() == 1) {
nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes());
} else {
nioBuf = buf.nioBuffer();
}
for (;;) {
int pos = nioBuf.position();
int limit = nioBuf.limit();
int localFlushedAmount = Native.write(fd, nioBuf, pos, limit);
if (localFlushedAmount > 0) {
nioBuf.position(pos + localFlushedAmount);
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
return writtenBytes;
}
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
}
return writtenBytes;
}
}
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending; protected boolean readPending;

View File

@ -0,0 +1,114 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
protected AbstractEpollServerChannel(int fd) {
super(fd, Native.EPOLLACCEPT);
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}
@Override
protected InetSocketAddress remoteAddress0() {
return null;
}
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollServerSocketUnsafe();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
protected abstract Channel newChildChannel(int fd) throws Exception;
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
@Override
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
// Connect not supported by ServerChannel implementations
channelPromise.setFailure(new UnsupportedOperationException());
}
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
readPending = false;
try {
pipeline.fireChannelRead(newChildChannel(socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config().isAutoRead() && !readPending) {
clearEpollIn0();
}
}
}
}
}

View File

@ -0,0 +1,653 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
protected AbstractEpollStreamChannel(Channel parent, int fd) {
super(parent, fd, Native.EPOLLIN, true);
}
protected AbstractEpollStreamChannel(int fd) {
super(fd, Native.EPOLLIN);
}
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollStreamUnsafe();
}
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written
*/
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
return true;
}
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
int writtenBytes = doWriteBytes(buf);
in.removeBytes(writtenBytes);
return writtenBytes == readableBytes;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
}
}
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
long expectedWrittenBytes = array.size();
final long initialExpectedWrittenBytes = expectedWrittenBytes;
int cnt = array.count();
assert expectedWrittenBytes != 0;
assert cnt != 0;
boolean done = false;
int offset = 0;
int end = offset + cnt;
for (;;) {
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
expectedWrittenBytes -= localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
long bytes = array.processWritten(offset, localWrittenBytes);
if (bytes == -1) {
// incomplete write
break;
} else {
offset++;
cnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
}
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
return done;
}
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
assert expectedWrittenBytes != 0;
final long initialExpectedWrittenBytes = expectedWrittenBytes;
boolean done = false;
int offset = 0;
int end = offset + nioBufferCnt;
for (;;) {
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
expectedWrittenBytes -= localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
}
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
return done;
}
/**
* Write a {@link DefaultFileRegion}
*
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes
*/
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
final long regionCount = region.count();
if (region.transfered() >= regionCount) {
in.remove();
return true;
}
final long baseOffset = region.position();
boolean done = false;
long flushedAmount = 0;
for (;;) {
final long offset = region.transfered();
final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= regionCount) {
done = true;
break;
}
}
if (flushedAmount > 0) {
in.progress(flushedAmount);
}
if (done) {
in.remove();
}
return done;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
final int msgCount = in.size();
if (msgCount == 0) {
// Wrote all messages.
clearEpollOut();
break;
}
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in)) {
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} else { // msgCount == 1
if (!doWriteSingle(in)) {
break;
}
}
}
}
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else {
// Should never reach here.
throw new Error();
}
return true;
}
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArrayThreadLocal.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
}
return true;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
if (buf instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) buf;
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
// more then 1024 buffers for gathering writes so just do a memory copy.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
} else {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
}
return buf;
}
if (msg instanceof DefaultFileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
protected boolean isInputShutdown0() {
return inputShutdown;
}
protected boolean isOutputShutdown0() {
return outputShutdown || !isActive();
}
protected ChannelFuture shutdownOutput0(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
try {
Native.shutdown(fd, false, true);
outputShutdown = true;
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
return promise;
}
/**
* Connect to the remote peer
*/
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
Native.bind(fd, localAddress);
}
boolean success = false;
try {
boolean connected = Native.connect(fd, remoteAddress);
if (!connected) {
setEpollOut();
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
final class EpollStreamUnsafe extends AbstractEpollUnsafe {
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private RecvByteBufAllocator.Handle allocHandle;
private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
clearEpollIn0();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
return true;
}
return false;
}
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = EpollStreamUnsafe.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
closeIfClosed();
promise.tryFailure(annotateConnectException(t, remoteAddress));
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
active = true;
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
private void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
boolean connectStillInProgress = false;
try {
boolean wasActive = isActive();
if (!doFinishConnect()) {
connectStillInProgress = true;
return;
}
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (!connectStillInProgress) {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
}
@Override
void epollOutReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
finishConnect();
} else {
super.epollOutReady();
}
}
/**
* Finish the connect
*/
private boolean doFinishConnect() throws Exception {
if (Native.finishConnect(fd)) {
clearEpollOut();
return true;
} else {
setEpollOut();
return false;
}
}
@Override
void epollRdHupReady() {
if (isActive()) {
epollInReady();
} else {
closeOnRead(pipeline());
}
}
@Override
void epollInReady() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
boolean close = false;
try {
int totalReadAmount = 0;
for (;;) {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
allocHandle.record(totalReadAmount);
// Avoid overflow.
totalReadAmount = localReadAmount;
} else {
totalReadAmount += localReadAmount;
}
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
}
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
boolean closed = handleReadException(pipeline, byteBuf, t, close);
if (!closed) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
eventLoop().execute(new Runnable() {
@Override
public void run() {
epollInReady();
}
});
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !readPending) {
clearEpollIn0();
}
}
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import java.io.File;
import java.net.SocketAddress;
public final class DomainSocketAddress extends SocketAddress {
private final String socketPath;
public DomainSocketAddress(String socketPath) {
if (socketPath == null) {
throw new NullPointerException("socketPath");
}
this.socketPath = socketPath;
}
public DomainSocketAddress(File file) {
this(file.getPath());
}
/**
* The path to the domain socket.
*/
public String path() {
return socketPath;
}
@Override
public String toString() {
return path();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DomainSocketAddress)) {
return false;
}
return ((DomainSocketAddress) o).socketPath.equals(socketPath);
}
@Override
public int hashCode() {
return socketPath.hashCode();
}
}

View File

@ -66,6 +66,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
config = new EpollDatagramChannelConfig(this); config = new EpollDatagramChannelConfig(this);
} }
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override @Override
public ChannelMetadata metadata() { public ChannelMetadata metadata() {
return METADATA; return METADATA;
@ -252,7 +262,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
protected void doBind(SocketAddress localAddress) throws Exception { protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress; InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr); checkResolvable(addr);
Native.bind(fd, addr.getAddress(), addr.getPort()); Native.bind(fd, addr);
local = Native.localAddress(fd); local = Native.localAddress(fd);
active = true; active = true;
} }

View File

@ -0,0 +1,78 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.DefaultChannelConfig;
import java.net.SocketAddress;
public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
private volatile DomainSocketAddress local;
private volatile DomainSocketAddress remote;
EpollDomainSocketChannel(Channel parent, int fd) {
super(parent, fd);
}
public EpollDomainSocketChannel() {
super(Native.socketDomainFd());
}
@Override
protected DomainSocketAddress localAddress0() {
return local;
}
@Override
protected DomainSocketAddress remoteAddress0() {
return remote;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
Native.bind(fd, localAddress);
local = (DomainSocketAddress) localAddress;
}
@Override
public ChannelConfig config() {
return config;
}
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (super.doConnect(remoteAddress, localAddress)) {
local = (DomainSocketAddress) localAddress;
remote = (DomainSocketAddress) remoteAddress;
return true;
}
return false;
}
@Override
public DomainSocketAddress remoteAddress() {
return (DomainSocketAddress) super.remoteAddress();
}
@Override
public DomainSocketAddress localAddress() {
return (DomainSocketAddress) super.localAddress();
}
}

View File

@ -0,0 +1,165 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.NetUtil;
import java.util.Map;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
public class EpollServerChannelConfig extends DefaultChannelConfig {
protected final AbstractEpollChannel channel;
private volatile int backlog = NetUtil.SOMAXCONN;
EpollServerChannelConfig(AbstractEpollChannel channel) {
super(channel);
this.channel = channel;
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else {
return super.setOption(option, value);
}
return true;
}
public boolean isReuseAddress() {
return Native.isReuseAddress(channel.fd) == 1;
}
public EpollServerChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
return this;
}
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(channel.fd);
}
public EpollServerChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
return this;
}
public int getBacklog() {
return backlog;
}
public EpollServerChannelConfig setBacklog(int backlog) {
if (backlog < 0) {
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
return this;
}
@Override
public EpollServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public EpollServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public EpollServerChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public EpollServerChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public EpollServerChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public EpollServerChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
@Override
public EpollServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public EpollServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public EpollServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
@Override
protected final void autoReadCleared() {
channel.clearEpollIn();
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.Channel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.File;
import java.net.SocketAddress;
public final class EpollServerDomainSocketChannel extends AbstractEpollServerChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(
EpollServerDomainSocketChannel.class);
private final EpollServerChannelConfig config = new EpollServerChannelConfig(this);
private volatile DomainSocketAddress local;
public EpollServerDomainSocketChannel() {
super(Native.socketDomainFd());
}
@Override
protected Channel newChildChannel(int fd) throws Exception {
return new EpollDomainSocketChannel(this, fd);
}
@Override
protected DomainSocketAddress localAddress0() {
return local;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
Native.bind(fd, localAddress);
Native.listen(fd, config.getBacklog());
local = (DomainSocketAddress) localAddress;
}
@Override
protected void doClose() throws Exception {
try {
super.doClose();
} finally {
DomainSocketAddress local = this.local;
if (local != null) {
// Delete the socket file if possible.
File socketFile = new File(local.path());
boolean success = socketFile.delete();
if (!success && logger.isDebugEnabled()) {
logger.debug("Failed to delete a domain socket file: {}", local.path());
}
}
}
}
@Override
public EpollServerChannelConfig config() {
return config;
}
@Override
public DomainSocketAddress remoteAddress() {
return (DomainSocketAddress) super.remoteAddress();
}
@Override
public DomainSocketAddress localAddress() {
return (DomainSocketAddress) super.localAddress();
}
}

View File

@ -15,9 +15,7 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
@ -28,13 +26,13 @@ import java.net.SocketAddress;
* {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
* maximal performance. * maximal performance.
*/ */
public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel { public final class EpollServerSocketChannel extends AbstractEpollServerChannel implements ServerSocketChannel {
private final EpollServerSocketChannelConfig config; private final EpollServerSocketChannelConfig config;
private volatile InetSocketAddress local; private volatile InetSocketAddress local;
public EpollServerSocketChannel() { public EpollServerSocketChannel() {
super(Native.socketStreamFd(), Native.EPOLLACCEPT); super(Native.socketStreamFd());
config = new EpollServerSocketChannelConfig(this); config = new EpollServerSocketChannelConfig(this);
} }
@ -47,12 +45,22 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
protected void doBind(SocketAddress localAddress) throws Exception { protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress; InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr); checkResolvable(addr);
Native.bind(fd, addr.getAddress(), addr.getPort()); Native.bind(fd, addr);
local = Native.localAddress(fd); local = Native.localAddress(fd);
Native.listen(fd, config.getBacklog()); Native.listen(fd, config.getBacklog());
active = true; active = true;
} }
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
@Override @Override
public EpollServerSocketChannelConfig config() { public EpollServerSocketChannelConfig config() {
return config; return config;
@ -64,74 +72,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
} }
@Override @Override
protected InetSocketAddress remoteAddress0() { protected Channel newChildChannel(int fd) throws Exception {
return null; return new EpollSocketChannel(this, fd);
}
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollServerSocketUnsafe();
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
@Override
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
// Connect not supported by ServerChannel implementations
channelPromise.setFailure(new UnsupportedOperationException());
}
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
Throwable exception = null;
try {
try {
for (;;) {
int socketFd = Native.accept(fd);
if (socketFd == -1) {
// this means everything was handled for now
break;
}
try {
readPending = false;
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
}
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !readPending) {
clearEpollIn0();
}
}
}
} }
} }

View File

@ -17,27 +17,17 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator; import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.NetUtil;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.SO_BACKLOG; public final class EpollServerSocketChannelConfig extends EpollServerChannelConfig
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig { implements ServerSocketChannelConfig {
private final EpollServerSocketChannel channel;
private volatile int backlog = NetUtil.SOMAXCONN;
EpollServerSocketChannelConfig(EpollServerSocketChannel channel) { EpollServerSocketChannelConfig(EpollServerSocketChannel channel) {
super(channel); super(channel);
this.channel = channel;
// Use SO_REUSEADDR by default as java.nio does the same. // Use SO_REUSEADDR by default as java.nio does the same.
// //
@ -47,21 +37,12 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.SO_REUSEPORT); return getOptions(super.getOptions(), EpollChannelOption.SO_REUSEPORT);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public <T> T getOption(ChannelOption<T> option) { public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
if (option == EpollChannelOption.SO_REUSEPORT) { if (option == EpollChannelOption.SO_REUSEPORT) {
return (T) Boolean.valueOf(isReusePort()); return (T) Boolean.valueOf(isReusePort());
} }
@ -72,13 +53,7 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
public <T> boolean setOption(ChannelOption<T> option, T value) { public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value); validate(option, value);
if (option == SO_RCVBUF) { if (option == EpollChannelOption.SO_REUSEPORT) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else if (option == EpollChannelOption.SO_REUSEPORT) {
setReusePort((Boolean) value); setReusePort((Boolean) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
@ -87,26 +62,15 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
return true; return true;
} }
@Override
public boolean isReuseAddress() {
return Native.isReuseAddress(channel.fd) == 1;
}
@Override @Override
public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); super.setReuseAddress(reuseAddress);
return this; return this;
} }
@Override
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(channel.fd);
}
@Override @Override
public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(channel.fd, receiveBufferSize); super.setReceiveBufferSize(receiveBufferSize);
return this; return this;
} }
@ -115,17 +79,9 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
return this; return this;
} }
@Override
public int getBacklog() {
return backlog;
}
@Override @Override
public EpollServerSocketChannelConfig setBacklog(int backlog) { public EpollServerSocketChannelConfig setBacklog(int backlog) {
if (backlog < 0) { super.setBacklog(backlog);
throw new IllegalArgumentException("backlog: " + backlog);
}
this.backlog = backlog;
return this; return this;
} }
@ -201,9 +157,4 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
Native.setReusePort(channel.fd, reusePort ? 1 : 0); Native.setReusePort(channel.fd, reusePort ? 1 : 0);
return this; return this;
} }
@Override
protected void autoReadCleared() {
channel.clearEpollIn();
}
} }

View File

@ -15,61 +15,28 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
* maximal performance. * maximal performance.
*/ */
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { public final class EpollSocketChannel extends AbstractEpollStreamChannel implements SocketChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private final EpollSocketChannelConfig config; private final EpollSocketChannelConfig config;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private volatile InetSocketAddress local; private volatile InetSocketAddress local;
private volatile InetSocketAddress remote; private volatile InetSocketAddress remote;
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
EpollSocketChannel(Channel parent, int fd) { EpollSocketChannel(Channel parent, int fd) {
super(parent, fd, Native.EPOLLIN, true); super(parent, fd);
config = new EpollSocketChannelConfig(this); config = new EpollSocketChannelConfig(this);
// Directly cache the remote and local addresses // Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359 // See https://github.com/netty/netty/issues/2359
@ -78,7 +45,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
public EpollSocketChannel() { public EpollSocketChannel() {
super(Native.socketStreamFd(), Native.EPOLLIN); super(Native.socketStreamFd());
config = new EpollSocketChannelConfig(this); config = new EpollSocketChannelConfig(this);
} }
@ -99,8 +66,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
@Override @Override
protected AbstractEpollUnsafe newUnsafe() { public InetSocketAddress remoteAddress() {
return new EpollSocketUnsafe(); return (InetSocketAddress) super.remoteAddress();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
} }
@Override @Override
@ -124,325 +96,10 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
@Override @Override
protected void doBind(SocketAddress local) throws Exception { protected void doBind(SocketAddress local) throws Exception {
InetSocketAddress localAddress = (InetSocketAddress) local; InetSocketAddress localAddress = (InetSocketAddress) local;
Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); Native.bind(fd, localAddress);
this.local = Native.localAddress(fd); this.local = Native.localAddress(fd);
} }
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written
*/
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
return true;
}
boolean done = false;
long writtenBytes = 0;
if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress();
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
for (;;) {
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) {
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
done = true;
break;
}
readerIndex += localFlushedAmount;
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
}
in.removeBytes(writtenBytes);
return done;
} else if (buf.nioBufferCount() == 1) {
int readerIndex = buf.readerIndex();
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
for (;;) {
int pos = nioBuf.position();
int limit = nioBuf.limit();
int localFlushedAmount = Native.write(fd, nioBuf, pos, limit);
if (localFlushedAmount > 0) {
nioBuf.position(pos + localFlushedAmount);
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
done = true;
break;
}
} else {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
}
in.removeBytes(writtenBytes);
return done;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
}
}
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
long expectedWrittenBytes = array.size();
int cnt = array.count();
assert expectedWrittenBytes != 0;
assert cnt != 0;
boolean done = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + cnt;
for (;;) {
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
long bytes = array.processWritten(offset, localWrittenBytes);
if (bytes == -1) {
// incomplete write
break;
} else {
offset++;
cnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
}
in.removeBytes(writtenBytes);
return done;
}
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
assert expectedWrittenBytes != 0;
boolean done = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + nioBufferCnt;
for (;;) {
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
// Written everything, just break out here (fast-path)
done = true;
break;
}
do {
ByteBuffer buffer = nioBuffers[offset];
int pos = buffer.position();
int bytes = buffer.limit() - pos;
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
break;
} else {
offset++;
nioBufferCnt--;
localWrittenBytes -= bytes;
}
} while (offset < end && localWrittenBytes > 0);
}
in.removeBytes(writtenBytes);
return done;
}
/**
* Write a {@link DefaultFileRegion}
*
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes
*/
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
final long regionCount = region.count();
if (region.transfered() >= regionCount) {
in.remove();
return true;
}
final long baseOffset = region.position();
boolean done = false;
long flushedAmount = 0;
for (;;) {
final long offset = region.transfered();
final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= regionCount) {
done = true;
break;
}
}
if (flushedAmount > 0) {
in.progress(flushedAmount);
}
if (done) {
in.remove();
}
return done;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
final int msgCount = in.size();
if (msgCount == 0) {
// Wrote all messages.
clearEpollOut();
break;
}
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in)) {
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} else { // msgCount == 1
if (!doWriteSingle(in)) {
break;
}
}
}
}
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else {
// Should never reach here.
throw new Error();
}
return true;
}
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArrayThreadLocal.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
}
return true;
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
if (buf instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) buf;
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
// more then 1024 buffers for gathering writes so just do a memory copy.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
} else {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
}
return buf;
}
if (msg instanceof DefaultFileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
@Override @Override
public EpollSocketChannelConfig config() { public EpollSocketChannelConfig config() {
return config; return config;
@ -450,12 +107,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
@Override @Override
public boolean isInputShutdown() { public boolean isInputShutdown() {
return inputShutdown; return isInputShutdown0();
} }
@Override @Override
public boolean isOutputShutdown() { public boolean isOutputShutdown() {
return outputShutdown || !isActive(); return isOutputShutdown0();
} }
@Override @Override
@ -465,24 +122,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop(); return shutdownOutput0(promise);
if (loop.inEventLoop()) {
try {
Native.shutdown(fd, false, true);
outputShutdown = true;
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput(promise);
}
});
}
return promise;
} }
@Override @Override
@ -490,307 +130,17 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
return (ServerSocketChannel) super.parent(); return (ServerSocketChannel) super.parent();
} }
final class EpollSocketUnsafe extends AbstractEpollUnsafe {
private RecvByteBufAllocator.Handle allocHandle;
private void closeOnRead(ChannelPipeline pipeline) {
inputShutdown = true;
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
clearEpollIn0();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
}
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
return true;
}
return false;
}
@Override @Override
public void connect( protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect((InetSocketAddress) remoteAddress, (InetSocketAddress) localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = EpollSocketChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
closeIfClosed();
promise.tryFailure(annotateConnectException(t, remoteAddress));
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
active = true;
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(cause);
closeIfClosed();
}
private void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
boolean connectStillInProgress = false;
try {
boolean wasActive = isActive();
if (!doFinishConnect()) {
connectStillInProgress = true;
return;
}
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (!connectStillInProgress) {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
}
@Override
void epollOutReady() {
if (connectPromise != null) {
// pending connect which is now complete so handle it.
finishConnect();
} else {
super.epollOutReady();
}
}
/**
* Connect to the remote peer
*/
private boolean doConnect(InetSocketAddress remoteAddress, InetSocketAddress localAddress) throws Exception {
if (localAddress != null) { if (localAddress != null) {
checkResolvable(localAddress); checkResolvable((InetSocketAddress) localAddress);
Native.bind(fd, localAddress.getAddress(), localAddress.getPort());
} }
checkResolvable((InetSocketAddress) remoteAddress);
boolean success = false; if (super.doConnect(remoteAddress, localAddress)) {
try {
checkResolvable(remoteAddress);
boolean connected = Native.connect(fd, remoteAddress.getAddress(),
remoteAddress.getPort());
remote = remoteAddress;
local = Native.localAddress(fd); local = Native.localAddress(fd);
if (!connected) { remote = Native.remoteAddress(fd);
setEpollOut();
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
/**
* Finish the connect
*/
private boolean doFinishConnect() throws Exception {
if (Native.finishConnect(fd)) {
clearEpollOut();
return true; return true;
} else { }
setEpollOut();
return false; return false;
} }
} }
/**
* Read bytes into the given {@link ByteBuf} and return the amount.
*/
private int doReadBytes(ByteBuf byteBuf) throws Exception {
int writerIndex = byteBuf.writerIndex();
int localReadAmount;
if (byteBuf.hasMemoryAddress()) {
localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
} else {
ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
localReadAmount = Native.read(fd, buf, buf.position(), buf.limit());
}
if (localReadAmount > 0) {
byteBuf.writerIndex(writerIndex + localReadAmount);
}
return localReadAmount;
}
@Override
void epollRdHupReady() {
if (isActive()) {
epollInReady();
} else {
closeOnRead(pipeline());
}
}
@Override
void epollInReady() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
boolean close = false;
try {
int totalReadAmount = 0;
for (;;) {
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
allocHandle.record(totalReadAmount);
// Avoid overflow.
totalReadAmount = localReadAmount;
} else {
totalReadAmount += localReadAmount;
}
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
}
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
boolean closed = handleReadException(pipeline, byteBuf, t, close);
if (!closed) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
eventLoop().execute(new Runnable() {
@Override
public void run() {
epollInReady();
}
});
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !readPending) {
clearEpollIn0();
}
}
}
}
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
@ -346,18 +347,39 @@ final class Native {
return res; return res;
} }
public static int socketDomainFd() {
int res = socketDomain();
if (res < 0) {
throw new ChannelException(newIOException("socketDomain", res));
}
return res;
}
private static native int socketStream(); private static native int socketStream();
private static native int socketDgram(); private static native int socketDgram();
private static native int socketDomain();
public static void bind(int fd, InetAddress addr, int port) throws IOException { public static void bind(int fd, SocketAddress socketAddress) throws IOException {
NativeInetAddress address = toNativeInetAddress(addr); if (socketAddress instanceof InetSocketAddress) {
int res = bind(fd, address.address, address.scopeId, port); InetSocketAddress addr = (InetSocketAddress) socketAddress;
NativeInetAddress address = toNativeInetAddress(addr.getAddress());
int res = bind(fd, address.address, address.scopeId, addr.getPort());
if (res < 0) { if (res < 0) {
throw newIOException("bind", res); throw newIOException("bind", res);
} }
} else if (socketAddress instanceof DomainSocketAddress) {
DomainSocketAddress addr = (DomainSocketAddress) socketAddress;
int res = bindDomainSocket(fd, addr.path());
if (res < 0) {
throw newIOException("bind", res);
}
} else {
throw new Error("Unexpected SocketAddress implementation " + socketAddress);
}
} }
private static native int bind(int fd, byte[] address, int scopeId, int port); private static native int bind(int fd, byte[] address, int scopeId, int port);
private static native int bindDomainSocket(int fd, String path);
public static void listen(int fd, int backlog) throws IOException { public static void listen(int fd, int backlog) throws IOException {
int res = listen0(fd, backlog); int res = listen0(fd, backlog);
@ -368,9 +390,18 @@ final class Native {
private static native int listen0(int fd, int backlog); private static native int listen0(int fd, int backlog);
public static boolean connect(int fd, InetAddress addr, int port) throws IOException { public static boolean connect(int fd, SocketAddress socketAddress) throws IOException {
NativeInetAddress address = toNativeInetAddress(addr); int res;
int res = connect(fd, address.address, address.scopeId, port); if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
NativeInetAddress address = toNativeInetAddress(inetSocketAddress.getAddress());
res = connect(fd, address.address, address.scopeId, inetSocketAddress.getPort());
} else if (socketAddress instanceof DomainSocketAddress) {
DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) socketAddress;
res = connectDomainSocket(fd, unixDomainSocketAddress.path());
} else {
throw new Error("Unexpected SocketAddress implementation " + socketAddress);
}
if (res < 0) { if (res < 0) {
if (res == ERRNO_EINPROGRESS_NEGATIVE) { if (res == ERRNO_EINPROGRESS_NEGATIVE) {
// connect not complete yet need to wait for EPOLLOUT event // connect not complete yet need to wait for EPOLLOUT event
@ -382,6 +413,7 @@ final class Native {
} }
private static native int connect(int fd, byte[] address, int scopeId, int port); private static native int connect(int fd, byte[] address, int scopeId, int port);
private static native int connectDomainSocket(int fd, String path);
public static boolean finishConnect(int fd) throws IOException { public static boolean finishConnect(int fd) throws IOException {
int res = finishConnect0(fd); int res = finishConnect0(fd);
@ -477,6 +509,20 @@ final class Native {
private static native int accept0(int fd); private static native int accept0(int fd);
public static int recvFd(int fd) throws IOException {
int res = recvFd0(fd);
if (res >= 0) {
return res;
}
if (res == ERRNO_EAGAIN_NEGATIVE || res == ERRNO_EWOULDBLOCK_NEGATIVE) {
// Everything consumed so just return -1 here.
return -1;
}
throw newIOException("recvFd", res);
}
private static native int recvFd0(int fd);
public static void shutdown(int fd, boolean read, boolean write) throws IOException { public static void shutdown(int fd, boolean read, boolean write) throws IOException {
int res = shutdown0(fd, read, write); int res = shutdown0(fd, read, write);
if (res < 0) { if (res < 0) {

View File

@ -0,0 +1,35 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketEchoTest extends EpollSocketEchoTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketFileRegionTest extends EpollSocketFileRegionTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketGatheringWriteTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketGatheringWriteTest extends SocketGatheringWriteTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketObjectEchoTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketObjectEchoTest extends SocketObjectEchoTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.handler.ssl.SslContext;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketSslEchoTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketSslEchoTest extends SocketSslEchoTest {
public EpollDomainSocketSslEchoTest(
SslContext serverCtx, SslContext clientCtx, Renegotiation renegotiation,
boolean serverUsesDelegatedTaskExecutor, boolean clientUsesDelegatedTaskExecutor,
boolean autoRead, boolean useChunkedWriteHandler, boolean useCompositeByteBuf) {
super(serverCtx, clientCtx, renegotiation,
serverUsesDelegatedTaskExecutor, clientUsesDelegatedTaskExecutor,
autoRead, useChunkedWriteHandler, useCompositeByteBuf);
}
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.handler.ssl.SslContext;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketSslGreetingTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketSslGreetingTest extends SocketSslGreetingTest {
public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) {
super(serverCtx, clientCtx);
}
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.handler.ssl.SslContext;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketStartTlsTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketStartTlsTest extends SocketStartTlsTest {
public EpollDomainSocketStartTlsTest(SslContext serverCtx, SslContext clientCtx) {
super(serverCtx, clientCtx);
}
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketStringEchoTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketStringEchoTest extends SocketStringEchoTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -29,12 +29,15 @@ import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation; import io.netty.testsuite.transport.socket.SocketTestPermutation;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
class EpollSocketTestPermutation extends SocketTestPermutation { class EpollSocketTestPermutation extends SocketTestPermutation {
static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); static final EpollSocketTestPermutation INSTANCE = new EpollSocketTestPermutation();
static final EventLoopGroup EPOLL_BOSS_GROUP = static final EventLoopGroup EPOLL_BOSS_GROUP =
new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true)); new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true));
@ -119,4 +122,44 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
); );
return combo(bfs, bfs); return combo(bfs, bfs);
} }
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> domainSocket() {
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
combo(serverDomainSocket(), clientDomainSocket());
return list;
}
public List<BootstrapFactory<ServerBootstrap>> serverDomainSocket() {
return Collections.<BootstrapFactory<ServerBootstrap>>singletonList(
new BootstrapFactory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
.channel(EpollServerDomainSocketChannel.class);
}
}
);
}
public List<BootstrapFactory<Bootstrap>> clientDomainSocket() {
return Collections.<BootstrapFactory<Bootstrap>>singletonList(
new BootstrapFactory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDomainSocketChannel.class);
}
}
);
}
public static DomainSocketAddress newSocketAddress() {
try {
File file = File.createTempFile("netty", "dsocket");
file.delete();
return new DomainSocketAddress(file);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
} }