From 19ec0f3997064980deecb160d2b2fa9689ca0703 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 14 Jan 2015 16:38:46 +0100 Subject: [PATCH] Add support for Unix Domain Sockets when using native epoll transport Motivation: Using Unix Domain Sockets can be very useful when communication should take place on the same host and has less overhead then using loopback. We should support this with the native epoll transport. Modifications: - Add support for Unix Domain Sockets. - Adjust testsuite to be able to reuse tests. Result: Unix Domain Sockets are now support when using native epoll transport. --- .../socket/AbstractClientSocketTest.java | 10 +- .../socket/AbstractServerSocketTest.java | 11 +- .../transport/socket/AbstractSocketTest.java | 11 +- .../transport/socket/SocketEchoTest.java | 9 +- .../socket/SocketFixedLengthEchoTest.java | 9 +- .../socket/SocketObjectEchoTest.java | 9 +- .../transport/socket/SocketSslEchoTest.java | 15 +- .../socket/SocketSslGreetingTest.java | 9 +- .../transport/socket/SocketStartTlsTest.java | 9 +- .../socket/SocketStringEchoTest.java | 9 +- .../main/c/io_netty_channel_epoll_Native.c | 99 +++ .../channel/epoll/AbstractEpollChannel.java | 81 +- .../epoll/AbstractEpollServerChannel.java | 114 +++ .../epoll/AbstractEpollStreamChannel.java | 653 ++++++++++++++++ .../channel/epoll/DomainSocketAddress.java | 63 ++ .../channel/epoll/EpollDatagramChannel.java | 12 +- .../epoll/EpollDomainSocketChannel.java | 78 ++ .../epoll/EpollServerChannelConfig.java | 165 +++++ .../epoll/EpollServerDomainSocketChannel.java | 85 +++ .../epoll/EpollServerSocketChannel.java | 91 +-- .../epoll/EpollServerSocketChannelConfig.java | 61 +- .../channel/epoll/EpollSocketChannel.java | 694 +----------------- .../java/io/netty/channel/epoll/Native.java | 62 +- .../epoll/EpollDomainSocketEchoTest.java | 35 + .../EpollDomainSocketFileRegionTest.java | 35 + .../EpollDomainSocketFixedLengthEchoTest.java | 37 + .../EpollDomainSocketGatheringWriteTest.java | 37 + .../EpollDomainSocketObjectEchoTest.java | 36 + .../epoll/EpollDomainSocketSslEchoTest.java | 44 ++ .../EpollDomainSocketSslGreetingTest.java | 42 ++ .../epoll/EpollDomainSocketStartTlsTest.java | 42 ++ .../EpollDomainSocketStringEchoTest.java | 36 + .../epoll/EpollSocketTestPermutation.java | 45 +- 33 files changed, 1883 insertions(+), 865 deletions(-) create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/DomainSocketAddress.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java create mode 100644 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerDomainSocketChannel.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFileRegionTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFixedLengthEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketGatheringWriteTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketObjectEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslEchoTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStartTlsTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStringEchoTest.java diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java index dabcf5492c..0a4057e8ee 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java @@ -24,11 +24,12 @@ import io.netty.testsuite.util.TestUtils; import io.netty.util.NetUtil; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest { - protected volatile InetSocketAddress addr; + protected volatile SocketAddress addr; protected AbstractClientSocketTest() { super(Bootstrap.class); @@ -41,8 +42,13 @@ public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest { - protected volatile InetSocketAddress addr; + protected volatile SocketAddress addr; protected AbstractServerSocketTest() { super(ServerBootstrap.class); @@ -41,10 +42,14 @@ public abstract class AbstractServerSocketTest extends AbstractTestsuiteTest { - protected volatile InetSocketAddress addr; + protected volatile SocketAddress addr; protected AbstractSocketTest() { super(ServerBootstrap.class, Bootstrap.class); @@ -42,12 +43,16 @@ public abstract class AbstractSocketTest extends AbstractComboTestsuiteTest() { + sb.childHandler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel c) throws Exception { + protected void initChannel(Channel c) throws Exception { c.pipeline().addLast(group, sh); } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel c) throws Exception { + protected void initChannel(Channel c) throws Exception { c.pipeline().addLast(group, ch); } }); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java index aeeca2087d..ec1a2a6030 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java @@ -23,7 +23,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import org.junit.Test; @@ -64,17 +63,17 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest { final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); sch.pipeline().addAfter("decoder", "handler", sh); } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); sch.pipeline().addAfter("decoder", "handler", ch); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java index 7f115f94f9..64e39bc12a 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; @@ -72,9 +71,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest { final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast( new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ObjectEncoder(), @@ -82,9 +81,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast( new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ObjectEncoder(), diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index 00abad6bf3..1319b6c248 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.ssl.JdkSslClientContext; import io.netty.handler.ssl.JdkSslServerContext; import io.netty.handler.ssl.OpenSsl; @@ -174,8 +173,8 @@ public class SocketSslEchoTest extends AbstractSocketTest { private final AtomicInteger clientNegoCounter = new AtomicInteger(); private final AtomicInteger serverNegoCounter = new AtomicInteger(); - private volatile SocketChannel clientChannel; - private volatile SocketChannel serverChannel; + private volatile Channel clientChannel; + private volatile Channel serverChannel; private volatile SslHandler clientSslHandler; private volatile SslHandler serverSslHandler; @@ -210,9 +209,10 @@ public class SocketSslEchoTest extends AbstractSocketTest { public void testSslEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { reset(); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + @SuppressWarnings("deprecation") + public void initChannel(Channel sch) throws Exception { serverChannel = sch; serverSslHandler = serverCtx.newHandler(sch.alloc()); @@ -224,9 +224,10 @@ public class SocketSslEchoTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + @SuppressWarnings("deprecation") + public void initChannel(Channel sch) throws Exception { clientChannel = sch; clientSslHandler = clientCtx.newHandler(sch.alloc()); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java index 339516a1ab..5ca9c401b1 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.JdkSslClientContext; @@ -117,9 +116,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest { final ServerHandler sh = new ServerHandler(); final ClientHandler ch = new ClientHandler(); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast(serverCtx.newHandler(sch.alloc())); p.addLast(new LoggingHandler(LOG_LEVEL)); @@ -127,9 +126,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast(clientCtx.newHandler(sch.alloc())); p.addLast(new LoggingHandler(LOG_LEVEL)); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index 29f8037625..eb3630381e 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; @@ -150,9 +149,9 @@ public class SocketStartTlsTest extends AbstractSocketTest { final StartTlsServerHandler sh = new StartTlsServerHandler(sse, autoRead); final StartTlsClientHandler ch = new StartTlsClientHandler(cse, autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); @@ -160,9 +159,9 @@ public class SocketStartTlsTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java index ee99c112a1..c21d9e06f9 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; @@ -74,9 +73,9 @@ public class SocketStringEchoTest extends AbstractSocketTest { final StringEchoHandler sh = new StringEchoHandler(autoRead); final StringEchoHandler ch = new StringEchoHandler(autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter())); sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); @@ -84,9 +83,9 @@ public class SocketStringEchoTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter())); sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 3aa984559e..a2556140f9 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -1390,3 +1391,101 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_strError(JNIEnv* en char* err = strerror(error); return (*env)->NewStringUTF(env, err); } + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketDomain(JNIEnv* env, jclass clazz) { + int fd = socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (fd == -1) { + return -errno; + } + return fd; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_bindDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) { + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + + const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0); + memcpy(addr.sun_path, socket_path, strlen(socket_path)); + + if (unlink(socket_path) == -1 && errno != ENOENT) { + return -errno; + } + + int res = bind(fd, (struct sockaddr*) &addr, sizeof(addr)); + (*env)->ReleaseStringUTFChars(env, socketPath, socket_path); + + if (res == -1) { + return -errno; + } + return res; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_connectDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) { + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + + const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0); + strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); + + int res; + int err; + do { + res = connect(fd, (struct sockaddr*) &addr, sizeof(addr)); + } while (res == -1 && ((err = errno) == EINTR)); + + (*env)->ReleaseStringUTFChars(env, socketPath, socket_path); + + if (res < 0) { + return -err; + } + return 0; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_recvFd(JNIEnv* env, jclass clazz, jint fd) { + jint socketFd; + struct msghdr msg; + struct iovec iov[1]; + struct cmsghdr* ctrl_msg = NULL; + char msg_buffer[1]; + char elem_buffer[CMSG_SPACE(sizeof(int))]; + + /* Fill all with 0 */ + memset(&msg, 0, sizeof(struct msghdr)); + memset(elem_buffer, 0, CMSG_SPACE(sizeof(int))); + + iov[0].iov_base = msg_buffer; + iov[0].iov_len = 1; + + msg.msg_iov = iov; + msg.msg_iovlen = 1; + msg.msg_control = elem_buffer; + msg.msg_controllen = CMSG_SPACE(sizeof(int)); + + if(recvmsg(fd, &msg, MSG_CMSG_CLOEXEC) < 0) { + // All read, return -1 + return -1; + } + + if((msg.msg_flags & MSG_CTRUNC) == MSG_CTRUNC) { + // Not enough space ?!?! + return -1; + } + + // skip empty entries + for(ctrl_msg = CMSG_FIRSTHDR(&msg); ctrl_msg != NULL; ctrl_msg = CMSG_NXTHDR(&msg, ctrl_msg)) { + if((ctrl_msg->cmsg_level == SOL_SOCKET) && (ctrl_msg->cmsg_type == SCM_RIGHTS)) { + socketFd = *((int *) CMSG_DATA(ctrl_msg)); + // set as non blocking as we want to use it with epoll + if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) { + return -errno; + } + return socketFd; + } + } + return -1; +} + diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index bbe636561b..d87e8d15ae 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -27,6 +27,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.OneTimeTask; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; abstract class AbstractEpollChannel extends AbstractChannel { @@ -49,6 +50,10 @@ abstract class AbstractEpollChannel extends AbstractChannel { this.active = active; } + protected final int fd() { + return fd; + } + @Override public boolean isActive() { return active; @@ -71,16 +76,6 @@ abstract class AbstractEpollChannel extends AbstractChannel { Native.close(fd); } - @Override - public InetSocketAddress remoteAddress() { - return (InetSocketAddress) super.remoteAddress(); - } - - @Override - public InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - @Override protected void doDisconnect() throws Exception { doClose(); @@ -213,6 +208,72 @@ abstract class AbstractEpollChannel extends AbstractChannel { } } + /** + * Read bytes into the given {@link ByteBuf} and return the amount. + */ + protected final int doReadBytes(ByteBuf byteBuf) throws Exception { + int writerIndex = byteBuf.writerIndex(); + int localReadAmount; + if (byteBuf.hasMemoryAddress()) { + localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); + } else { + ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes()); + localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); + } + if (localReadAmount > 0) { + byteBuf.writerIndex(writerIndex + localReadAmount); + } + return localReadAmount; + } + + protected final int doWriteBytes(ByteBuf buf) throws Exception { + int readableBytes = buf.readableBytes(); + int writtenBytes = 0; + if (buf.hasMemoryAddress()) { + long memoryAddress = buf.memoryAddress(); + int readerIndex = buf.readerIndex(); + int writerIndex = buf.writerIndex(); + for (;;) { + int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex); + if (localFlushedAmount > 0) { + writtenBytes += localFlushedAmount; + if (writtenBytes == readableBytes) { + return writtenBytes; + } + readerIndex += localFlushedAmount; + } else { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + return writtenBytes; + } + } + } else { + ByteBuffer nioBuf; + if (buf.nioBufferCount() == 1) { + nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()); + } else { + nioBuf = buf.nioBuffer(); + } + for (;;) { + int pos = nioBuf.position(); + int limit = nioBuf.limit(); + int localFlushedAmount = Native.write(fd, nioBuf, pos, limit); + if (localFlushedAmount > 0) { + nioBuf.position(pos + localFlushedAmount); + writtenBytes += localFlushedAmount; + if (writtenBytes == readableBytes) { + return writtenBytes; + } + } else { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + } + return writtenBytes; + } + } + protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected boolean readPending; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java new file mode 100644 index 0000000000..a852f23ae8 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -0,0 +1,114 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.ServerChannel; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + + +public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel { + + protected AbstractEpollServerChannel(int fd) { + super(fd, Native.EPOLLACCEPT); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof EpollEventLoop; + } + + @Override + protected InetSocketAddress remoteAddress0() { + return null; + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollServerSocketUnsafe(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected Object filterOutboundMessage(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + + protected abstract Channel newChildChannel(int fd) throws Exception; + + final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { + + @Override + public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { + // Connect not supported by ServerChannel implementations + channelPromise.setFailure(new UnsupportedOperationException()); + } + + @Override + void epollInReady() { + assert eventLoop().inEventLoop(); + final ChannelPipeline pipeline = pipeline(); + Throwable exception = null; + try { + try { + for (;;) { + int socketFd = Native.accept(fd); + if (socketFd == -1) { + // this means everything was handled for now + break; + } + readPending = false; + + try { + pipeline.fireChannelRead(newChildChannel(socketFd)); + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } + } + } catch (Throwable t) { + exception = t; + } + pipeline.fireChannelReadComplete(); + + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (!config().isAutoRead() && !readPending) { + clearEpollIn0(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java new file mode 100644 index 0000000000..0324c5f9e7 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -0,0 +1,653 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.EventLoop; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { + + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; + + private volatile boolean inputShutdown; + private volatile boolean outputShutdown; + + protected AbstractEpollStreamChannel(Channel parent, int fd) { + super(parent, fd, Native.EPOLLIN, true); + } + + protected AbstractEpollStreamChannel(int fd) { + super(fd, Native.EPOLLIN); + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollStreamUnsafe(); + } + + /** + * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. + * @param buf the {@link ByteBuf} from which the bytes should be written + */ + private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + in.remove(); + return true; + } + + if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { + int writtenBytes = doWriteBytes(buf); + in.removeBytes(writtenBytes); + return writtenBytes == readableBytes; + } else { + ByteBuffer[] nioBuffers = buf.nioBuffers(); + return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); + } + } + + private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { + + long expectedWrittenBytes = array.size(); + final long initialExpectedWrittenBytes = expectedWrittenBytes; + + int cnt = array.count(); + + assert expectedWrittenBytes != 0; + assert cnt != 0; + + boolean done = false; + int offset = 0; + int end = offset + cnt; + for (;;) { + long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt); + if (localWrittenBytes == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + expectedWrittenBytes -= localWrittenBytes; + + if (expectedWrittenBytes == 0) { + // Written everything, just break out here (fast-path) + done = true; + break; + } + + do { + long bytes = array.processWritten(offset, localWrittenBytes); + if (bytes == -1) { + // incomplete write + break; + } else { + offset++; + cnt--; + localWrittenBytes -= bytes; + } + } while (offset < end && localWrittenBytes > 0); + } + + in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); + return done; + } + + private boolean writeBytesMultiple( + ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, + int nioBufferCnt, long expectedWrittenBytes) throws IOException { + + assert expectedWrittenBytes != 0; + final long initialExpectedWrittenBytes = expectedWrittenBytes; + + boolean done = false; + int offset = 0; + int end = offset + nioBufferCnt; + for (;;) { + long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt); + if (localWrittenBytes == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + expectedWrittenBytes -= localWrittenBytes; + + if (expectedWrittenBytes == 0) { + // Written everything, just break out here (fast-path) + done = true; + break; + } + do { + ByteBuffer buffer = nioBuffers[offset]; + int pos = buffer.position(); + int bytes = buffer.limit() - pos; + if (bytes > localWrittenBytes) { + buffer.position(pos + (int) localWrittenBytes); + // incomplete write + break; + } else { + offset++; + nioBufferCnt--; + localWrittenBytes -= bytes; + } + } while (offset < end && localWrittenBytes > 0); + } + + in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); + return done; + } + + /** + * Write a {@link DefaultFileRegion} + * + * @param region the {@link DefaultFileRegion} from which the bytes should be written + * @return amount the amount of written bytes + */ + private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { + final long regionCount = region.count(); + if (region.transfered() >= regionCount) { + in.remove(); + return true; + } + + final long baseOffset = region.position(); + boolean done = false; + long flushedAmount = 0; + + for (;;) { + final long offset = region.transfered(); + final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset); + if (localFlushedAmount == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + + flushedAmount += localFlushedAmount; + if (region.transfered() >= regionCount) { + done = true; + break; + } + } + + if (flushedAmount > 0) { + in.progress(flushedAmount); + } + + if (done) { + in.remove(); + } + return done; + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + for (;;) { + final int msgCount = in.size(); + + if (msgCount == 0) { + // Wrote all messages. + clearEpollOut(); + break; + } + + // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. + if (msgCount > 1 && in.current() instanceof ByteBuf) { + if (!doWriteMultiple(in)) { + break; + } + + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. + } else { // msgCount == 1 + if (!doWriteSingle(in)) { + break; + } + } + } + } + + private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { + // The outbound buffer contains only one message or it contains a file region. + Object msg = in.current(); + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!writeBytes(in, buf)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else if (msg instanceof DefaultFileRegion) { + DefaultFileRegion region = (DefaultFileRegion) msg; + if (!writeFileRegion(in, region)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { + // Should never reach here. + throw new Error(); + } + + return true; + } + + private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { + if (PlatformDependent.hasUnsafe()) { + // this means we can cast to IovArray and write the IovArray directly. + IovArray array = IovArrayThreadLocal.get(in); + int cnt = array.count(); + if (cnt >= 1) { + // TODO: Handle the case where cnt == 1 specially. + if (!writeBytesMultiple(in, array)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { // cnt == 0, which means the outbound buffer contained empty buffers only. + in.removeBytes(0); + } + } else { + ByteBuffer[] buffers = in.nioBuffers(); + int cnt = in.nioBufferCount(); + if (cnt >= 1) { + // TODO: Handle the case where cnt == 1 specially. + if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { // cnt == 0, which means the outbound buffer contained empty buffers only. + in.removeBytes(0); + } + } + + return true; + } + + @Override + protected Object filterOutboundMessage(Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) { + if (buf instanceof CompositeByteBuf) { + // Special handling of CompositeByteBuf to reduce memory copies if some of the Components + // in the CompositeByteBuf are backed by a memoryAddress. + CompositeByteBuf comp = (CompositeByteBuf) buf; + if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) { + // more then 1024 buffers for gathering writes so just do a memory copy. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } + } else { + // We can only handle buffers with memory address so we need to copy if a non direct is + // passed to write. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } + } + return buf; + } + + if (msg instanceof DefaultFileRegion) { + return msg; + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + + protected boolean isInputShutdown0() { + return inputShutdown; + } + + protected boolean isOutputShutdown0() { + return outputShutdown || !isActive(); + } + + protected ChannelFuture shutdownOutput0(final ChannelPromise promise) { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + try { + Native.shutdown(fd, false, true); + outputShutdown = true; + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } else { + loop.execute(new Runnable() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } + return promise; + } + + /** + * Connect to the remote peer + */ + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (localAddress != null) { + Native.bind(fd, localAddress); + } + + boolean success = false; + try { + boolean connected = Native.connect(fd, remoteAddress); + if (!connected) { + setEpollOut(); + } + success = true; + return connected; + } finally { + if (!success) { + doClose(); + } + } + } + + final class EpollStreamUnsafe extends AbstractEpollUnsafe { + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + private ChannelPromise connectPromise; + private ScheduledFuture connectTimeoutFuture; + private SocketAddress requestedRemoteAddress; + + private RecvByteBufAllocator.Handle allocHandle; + + private void closeOnRead(ChannelPipeline pipeline) { + inputShutdown = true; + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + clearEpollIn0(); + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidPromise()); + } + } + } + + private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + readPending = false; + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + closeOnRead(pipeline); + return true; + } + return false; + } + + @Override + public void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + try { + if (connectPromise != null) { + throw new IllegalStateException("connection attempt already made"); + } + + boolean wasActive = isActive(); + if (doConnect(remoteAddress, localAddress)) { + fulfillConnectPromise(promise, wasActive); + } else { + connectPromise = promise; + requestedRemoteAddress = remoteAddress; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + ChannelPromise connectPromise = EpollStreamUnsafe.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); + } + } catch (Throwable t) { + closeIfClosed(); + promise.tryFailure(annotateConnectException(t, remoteAddress)); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + active = true; + + // trySuccess() will return false if a user cancelled the connection attempt. + boolean promiseSet = promise.trySuccess(); + + // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, + // because what happened is what happened. + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + + // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). + if (!promiseSet) { + close(voidPromise()); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + + // Use tryFailure() instead of setFailure() to avoid the race against cancel(). + promise.tryFailure(cause); + closeIfClosed(); + } + + private void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + boolean connectStillInProgress = false; + try { + boolean wasActive = isActive(); + if (!doFinishConnect()) { + connectStillInProgress = true; + return; + } + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); + } finally { + if (!connectStillInProgress) { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + } + + @Override + void epollOutReady() { + if (connectPromise != null) { + // pending connect which is now complete so handle it. + finishConnect(); + } else { + super.epollOutReady(); + } + } + + /** + * Finish the connect + */ + private boolean doFinishConnect() throws Exception { + if (Native.finishConnect(fd)) { + clearEpollOut(); + return true; + } else { + setEpollOut(); + return false; + } + } + + @Override + void epollRdHupReady() { + if (isActive()) { + epollInReady(); + } else { + closeOnRead(pipeline()); + } + } + + @Override + void epollInReady() { + final ChannelConfig config = config(); + final ChannelPipeline pipeline = pipeline(); + final ByteBufAllocator allocator = config.getAllocator(); + RecvByteBufAllocator.Handle allocHandle = this.allocHandle; + if (allocHandle == null) { + this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); + } + + ByteBuf byteBuf = null; + boolean close = false; + try { + int totalReadAmount = 0; + for (;;) { + // we use a direct buffer here as the native implementations only be able + // to handle direct buffers. + byteBuf = allocHandle.allocate(allocator); + int writable = byteBuf.writableBytes(); + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount <= 0) { + // not was read release the buffer + byteBuf.release(); + close = localReadAmount < 0; + break; + } + readPending = false; + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + + if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { + allocHandle.record(totalReadAmount); + + // Avoid overflow. + totalReadAmount = localReadAmount; + } else { + totalReadAmount += localReadAmount; + } + + if (localReadAmount < writable) { + // Read less than what the buffer can hold, + // which might mean we drained the recv buffer completely. + break; + } + } + pipeline.fireChannelReadComplete(); + allocHandle.record(totalReadAmount); + + if (close) { + closeOnRead(pipeline); + close = false; + } + } catch (Throwable t) { + boolean closed = handleReadException(pipeline, byteBuf, t, close); + if (!closed) { + // trigger a read again as there may be something left to read and because of epoll ET we + // will not get notified again until we read everything from the socket + eventLoop().execute(new Runnable() { + @Override + public void run() { + epollInReady(); + } + }); + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (!config.isAutoRead() && !readPending) { + clearEpollIn0(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/DomainSocketAddress.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/DomainSocketAddress.java new file mode 100644 index 0000000000..04235ed959 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/DomainSocketAddress.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import java.io.File; +import java.net.SocketAddress; + +public final class DomainSocketAddress extends SocketAddress { + private final String socketPath; + + public DomainSocketAddress(String socketPath) { + if (socketPath == null) { + throw new NullPointerException("socketPath"); + } + this.socketPath = socketPath; + } + + public DomainSocketAddress(File file) { + this(file.getPath()); + } + + /** + * The path to the domain socket. + */ + public String path() { + return socketPath; + } + + @Override + public String toString() { + return path(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DomainSocketAddress)) { + return false; + } + + return ((DomainSocketAddress) o).socketPath.equals(socketPath); + } + + @Override + public int hashCode() { + return socketPath.hashCode(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 304491129c..499f9e5efc 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -66,6 +66,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements config = new EpollDatagramChannelConfig(this); } + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + @Override public ChannelMetadata metadata() { return METADATA; @@ -252,7 +262,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements protected void doBind(SocketAddress localAddress) throws Exception { InetSocketAddress addr = (InetSocketAddress) localAddress; checkResolvable(addr); - Native.bind(fd, addr.getAddress(), addr.getPort()); + Native.bind(fd, addr); local = Native.localAddress(fd); active = true; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java new file mode 100644 index 0000000000..85a78bdcf5 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.DefaultChannelConfig; + +import java.net.SocketAddress; + +public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel { + private final ChannelConfig config = new DefaultChannelConfig(this); + + private volatile DomainSocketAddress local; + private volatile DomainSocketAddress remote; + + EpollDomainSocketChannel(Channel parent, int fd) { + super(parent, fd); + } + + public EpollDomainSocketChannel() { + super(Native.socketDomainFd()); + } + + @Override + protected DomainSocketAddress localAddress0() { + return local; + } + + @Override + protected DomainSocketAddress remoteAddress0() { + return remote; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + Native.bind(fd, localAddress); + local = (DomainSocketAddress) localAddress; + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (super.doConnect(remoteAddress, localAddress)) { + local = (DomainSocketAddress) localAddress; + remote = (DomainSocketAddress) remoteAddress; + return true; + } + return false; + } + + @Override + public DomainSocketAddress remoteAddress() { + return (DomainSocketAddress) super.remoteAddress(); + } + + @Override + public DomainSocketAddress localAddress() { + return (DomainSocketAddress) super.localAddress(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java new file mode 100644 index 0000000000..8897447678 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java @@ -0,0 +1,165 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.util.NetUtil; + +import java.util.Map; + +import static io.netty.channel.ChannelOption.SO_BACKLOG; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_REUSEADDR; + +public class EpollServerChannelConfig extends DefaultChannelConfig { + protected final AbstractEpollChannel channel; + private volatile int backlog = NetUtil.SOMAXCONN; + + EpollServerChannelConfig(AbstractEpollChannel channel) { + super(channel); + this.channel = channel; + } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_BACKLOG) { + return (T) Integer.valueOf(getBacklog()); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_BACKLOG) { + setBacklog((Integer) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + public boolean isReuseAddress() { + return Native.isReuseAddress(channel.fd) == 1; + } + + public EpollServerChannelConfig setReuseAddress(boolean reuseAddress) { + Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + return this; + } + + public int getReceiveBufferSize() { + return Native.getReceiveBufferSize(channel.fd); + } + + public EpollServerChannelConfig setReceiveBufferSize(int receiveBufferSize) { + Native.setReceiveBufferSize(channel.fd, receiveBufferSize); + return this; + } + + public int getBacklog() { + return backlog; + } + + public EpollServerChannelConfig setBacklog(int backlog) { + if (backlog < 0) { + throw new IllegalArgumentException("backlog: " + backlog); + } + this.backlog = backlog; + return this; + } + + @Override + public EpollServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public EpollServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public EpollServerChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public EpollServerChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public EpollServerChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public EpollServerChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public EpollServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public EpollServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public EpollServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } + + @Override + protected final void autoReadCleared() { + channel.clearEpollIn(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerDomainSocketChannel.java new file mode 100644 index 0000000000..17143d161d --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerDomainSocketChannel.java @@ -0,0 +1,85 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.Channel; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.io.File; +import java.net.SocketAddress; + + +public final class EpollServerDomainSocketChannel extends AbstractEpollServerChannel { + private static final InternalLogger logger = InternalLoggerFactory.getInstance( + EpollServerDomainSocketChannel.class); + + private final EpollServerChannelConfig config = new EpollServerChannelConfig(this); + private volatile DomainSocketAddress local; + + public EpollServerDomainSocketChannel() { + super(Native.socketDomainFd()); + } + + @Override + protected Channel newChildChannel(int fd) throws Exception { + return new EpollDomainSocketChannel(this, fd); + } + + @Override + protected DomainSocketAddress localAddress0() { + return local; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + Native.bind(fd, localAddress); + Native.listen(fd, config.getBacklog()); + local = (DomainSocketAddress) localAddress; + } + + @Override + protected void doClose() throws Exception { + try { + super.doClose(); + } finally { + DomainSocketAddress local = this.local; + if (local != null) { + // Delete the socket file if possible. + File socketFile = new File(local.path()); + boolean success = socketFile.delete(); + if (!success && logger.isDebugEnabled()) { + logger.debug("Failed to delete a domain socket file: {}", local.path()); + } + } + } + } + + @Override + public EpollServerChannelConfig config() { + return config; + } + + @Override + public DomainSocketAddress remoteAddress() { + return (DomainSocketAddress) super.remoteAddress(); + } + + @Override + public DomainSocketAddress localAddress() { + return (DomainSocketAddress) super.localAddress(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index 1722ac29a2..185ac864b0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -15,9 +15,7 @@ */ package io.netty.channel.epoll; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; @@ -28,13 +26,13 @@ import java.net.SocketAddress; * {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * maximal performance. */ -public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel { +public final class EpollServerSocketChannel extends AbstractEpollServerChannel implements ServerSocketChannel { private final EpollServerSocketChannelConfig config; private volatile InetSocketAddress local; public EpollServerSocketChannel() { - super(Native.socketStreamFd(), Native.EPOLLACCEPT); + super(Native.socketStreamFd()); config = new EpollServerSocketChannelConfig(this); } @@ -47,12 +45,22 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme protected void doBind(SocketAddress localAddress) throws Exception { InetSocketAddress addr = (InetSocketAddress) localAddress; checkResolvable(addr); - Native.bind(fd, addr.getAddress(), addr.getPort()); + Native.bind(fd, addr); local = Native.localAddress(fd); Native.listen(fd, config.getBacklog()); active = true; } + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + @Override public EpollServerSocketChannelConfig config() { return config; @@ -64,74 +72,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } @Override - protected InetSocketAddress remoteAddress0() { - return null; - } - - @Override - protected AbstractEpollUnsafe newUnsafe() { - return new EpollServerSocketUnsafe(); - } - - @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - protected Object filterOutboundMessage(Object msg) throws Exception { - throw new UnsupportedOperationException(); - } - - final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { - - @Override - public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { - // Connect not supported by ServerChannel implementations - channelPromise.setFailure(new UnsupportedOperationException()); - } - - @Override - void epollInReady() { - assert eventLoop().inEventLoop(); - final ChannelPipeline pipeline = pipeline(); - Throwable exception = null; - try { - try { - for (;;) { - int socketFd = Native.accept(fd); - if (socketFd == -1) { - // this means everything was handled for now - break; - } - try { - readPending = false; - pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); - } catch (Throwable t) { - // keep on reading as we use epoll ET and need to consume everything from the socket - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(t); - } - } - } catch (Throwable t) { - exception = t; - } - pipeline.fireChannelReadComplete(); - - if (exception != null) { - pipeline.fireExceptionCaught(exception); - } - } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !readPending) { - clearEpollIn0(); - } - } - } + protected Channel newChildChannel(int fd) throws Exception { + return new EpollSocketChannel(this, fd); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java index 38c5f53c7f..4e48e11b5c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -17,27 +17,17 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ServerSocketChannelConfig; -import io.netty.util.NetUtil; import java.util.Map; -import static io.netty.channel.ChannelOption.SO_BACKLOG; -import static io.netty.channel.ChannelOption.SO_RCVBUF; -import static io.netty.channel.ChannelOption.SO_REUSEADDR; - -public final class EpollServerSocketChannelConfig extends DefaultChannelConfig +public final class EpollServerSocketChannelConfig extends EpollServerChannelConfig implements ServerSocketChannelConfig { - private final EpollServerSocketChannel channel; - private volatile int backlog = NetUtil.SOMAXCONN; - EpollServerSocketChannelConfig(EpollServerSocketChannel channel) { super(channel); - this.channel = channel; // Use SO_REUSEADDR by default as java.nio does the same. // @@ -47,21 +37,12 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.SO_REUSEPORT); + return getOptions(super.getOptions(), EpollChannelOption.SO_REUSEPORT); } @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { - if (option == SO_RCVBUF) { - return (T) Integer.valueOf(getReceiveBufferSize()); - } - if (option == SO_REUSEADDR) { - return (T) Boolean.valueOf(isReuseAddress()); - } - if (option == SO_BACKLOG) { - return (T) Integer.valueOf(getBacklog()); - } if (option == EpollChannelOption.SO_REUSEPORT) { return (T) Boolean.valueOf(isReusePort()); } @@ -72,13 +53,7 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig public boolean setOption(ChannelOption option, T value) { validate(option, value); - if (option == SO_RCVBUF) { - setReceiveBufferSize((Integer) value); - } else if (option == SO_REUSEADDR) { - setReuseAddress((Boolean) value); - } else if (option == SO_BACKLOG) { - setBacklog((Integer) value); - } else if (option == EpollChannelOption.SO_REUSEPORT) { + if (option == EpollChannelOption.SO_REUSEPORT) { setReusePort((Boolean) value); } else { return super.setOption(option, value); @@ -87,26 +62,15 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig return true; } - @Override - public boolean isReuseAddress() { - return Native.isReuseAddress(channel.fd) == 1; - } - @Override public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { - Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + super.setReuseAddress(reuseAddress); return this; } - @Override - public int getReceiveBufferSize() { - return Native.getReceiveBufferSize(channel.fd); - } - @Override public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { - Native.setReceiveBufferSize(channel.fd, receiveBufferSize); - + super.setReceiveBufferSize(receiveBufferSize); return this; } @@ -115,17 +79,9 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig return this; } - @Override - public int getBacklog() { - return backlog; - } - @Override public EpollServerSocketChannelConfig setBacklog(int backlog) { - if (backlog < 0) { - throw new IllegalArgumentException("backlog: " + backlog); - } - this.backlog = backlog; + super.setBacklog(backlog); return this; } @@ -201,9 +157,4 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig Native.setReusePort(channel.fd, reusePort ? 1 : 0); return this; } - - @Override - protected void autoReadCleared() { - channel.clearEpollIn(); - } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 7941ac9d21..fd2ea18567 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -15,61 +15,28 @@ */ package io.netty.channel.epoll; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.DefaultFileRegion; -import io.netty.channel.EventLoop; -import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.StringUtil; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; /** * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * maximal performance. */ -public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { - - private static final String EXPECTED_TYPES = - " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + - StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; +public final class EpollSocketChannel extends AbstractEpollStreamChannel implements SocketChannel { private final EpollSocketChannelConfig config; - /** - * The future of the current connection attempt. If not null, subsequent - * connection attempts will fail. - */ - private ChannelPromise connectPromise; - private ScheduledFuture connectTimeoutFuture; - private SocketAddress requestedRemoteAddress; - private volatile InetSocketAddress local; private volatile InetSocketAddress remote; - private volatile boolean inputShutdown; - private volatile boolean outputShutdown; EpollSocketChannel(Channel parent, int fd) { - super(parent, fd, Native.EPOLLIN, true); + super(parent, fd); config = new EpollSocketChannelConfig(this); // Directly cache the remote and local addresses // See https://github.com/netty/netty/issues/2359 @@ -78,7 +45,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } public EpollSocketChannel() { - super(Native.socketStreamFd(), Native.EPOLLIN); + super(Native.socketStreamFd()); config = new EpollSocketChannelConfig(this); } @@ -99,8 +66,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } @Override - protected AbstractEpollUnsafe newUnsafe() { - return new EpollSocketUnsafe(); + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); } @Override @@ -124,325 +96,10 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So @Override protected void doBind(SocketAddress local) throws Exception { InetSocketAddress localAddress = (InetSocketAddress) local; - Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); + Native.bind(fd, localAddress); this.local = Native.localAddress(fd); } - /** - * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. - * @param buf the {@link ByteBuf} from which the bytes should be written - */ - private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { - int readableBytes = buf.readableBytes(); - if (readableBytes == 0) { - in.remove(); - return true; - } - - boolean done = false; - long writtenBytes = 0; - if (buf.hasMemoryAddress()) { - long memoryAddress = buf.memoryAddress(); - int readerIndex = buf.readerIndex(); - int writerIndex = buf.writerIndex(); - for (;;) { - int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex); - if (localFlushedAmount > 0) { - writtenBytes += localFlushedAmount; - if (writtenBytes == readableBytes) { - done = true; - break; - } - readerIndex += localFlushedAmount; - } else { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - } - - in.removeBytes(writtenBytes); - return done; - } else if (buf.nioBufferCount() == 1) { - int readerIndex = buf.readerIndex(); - ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes()); - for (;;) { - int pos = nioBuf.position(); - int limit = nioBuf.limit(); - int localFlushedAmount = Native.write(fd, nioBuf, pos, limit); - if (localFlushedAmount > 0) { - nioBuf.position(pos + localFlushedAmount); - writtenBytes += localFlushedAmount; - if (writtenBytes == readableBytes) { - done = true; - break; - } - } else { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - } - - in.removeBytes(writtenBytes); - return done; - } else { - ByteBuffer[] nioBuffers = buf.nioBuffers(); - return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); - } - } - - private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { - - long expectedWrittenBytes = array.size(); - int cnt = array.count(); - - assert expectedWrittenBytes != 0; - assert cnt != 0; - - boolean done = false; - long writtenBytes = 0; - int offset = 0; - int end = offset + cnt; - for (;;) { - long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt); - if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - - if (expectedWrittenBytes == 0) { - // Written everything, just break out here (fast-path) - done = true; - break; - } - - do { - long bytes = array.processWritten(offset, localWrittenBytes); - if (bytes == -1) { - // incomplete write - break; - } else { - offset++; - cnt--; - localWrittenBytes -= bytes; - } - } while (offset < end && localWrittenBytes > 0); - } - - in.removeBytes(writtenBytes); - return done; - } - - private boolean writeBytesMultiple( - ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, - int nioBufferCnt, long expectedWrittenBytes) throws IOException { - - assert expectedWrittenBytes != 0; - - boolean done = false; - long writtenBytes = 0; - int offset = 0; - int end = offset + nioBufferCnt; - for (;;) { - long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt); - if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - - if (expectedWrittenBytes == 0) { - // Written everything, just break out here (fast-path) - done = true; - break; - } - do { - ByteBuffer buffer = nioBuffers[offset]; - int pos = buffer.position(); - int bytes = buffer.limit() - pos; - if (bytes > localWrittenBytes) { - buffer.position(pos + (int) localWrittenBytes); - // incomplete write - break; - } else { - offset++; - nioBufferCnt--; - localWrittenBytes -= bytes; - } - } while (offset < end && localWrittenBytes > 0); - } - - in.removeBytes(writtenBytes); - return done; - } - - /** - * Write a {@link DefaultFileRegion} - * - * @param region the {@link DefaultFileRegion} from which the bytes should be written - * @return amount the amount of written bytes - */ - private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { - final long regionCount = region.count(); - if (region.transfered() >= regionCount) { - in.remove(); - return true; - } - - final long baseOffset = region.position(); - boolean done = false; - long flushedAmount = 0; - - for (;;) { - final long offset = region.transfered(); - final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset); - if (localFlushedAmount == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - - flushedAmount += localFlushedAmount; - if (region.transfered() >= regionCount) { - done = true; - break; - } - } - - if (flushedAmount > 0) { - in.progress(flushedAmount); - } - - if (done) { - in.remove(); - } - return done; - } - - @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { - for (;;) { - final int msgCount = in.size(); - - if (msgCount == 0) { - // Wrote all messages. - clearEpollOut(); - break; - } - - // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. - if (msgCount > 1 && in.current() instanceof ByteBuf) { - if (!doWriteMultiple(in)) { - break; - } - - // We do not break the loop here even if the outbound buffer was flushed completely, - // because a user might have triggered another write and flush when we notify his or her - // listeners. - } else { // msgCount == 1 - if (!doWriteSingle(in)) { - break; - } - } - } - } - - private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { - // The outbound buffer contains only one message or it contains a file region. - Object msg = in.current(); - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!writeBytes(in, buf)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else if (msg instanceof DefaultFileRegion) { - DefaultFileRegion region = (DefaultFileRegion) msg; - if (!writeFileRegion(in, region)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else { - // Should never reach here. - throw new Error(); - } - - return true; - } - - private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { - if (PlatformDependent.hasUnsafe()) { - // this means we can cast to IovArray and write the IovArray directly. - IovArray array = IovArrayThreadLocal.get(in); - int cnt = array.count(); - if (cnt >= 1) { - // TODO: Handle the case where cnt == 1 specially. - if (!writeBytesMultiple(in, array)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else { // cnt == 0, which means the outbound buffer contained empty buffers only. - in.removeBytes(0); - } - } else { - ByteBuffer[] buffers = in.nioBuffers(); - int cnt = in.nioBufferCount(); - if (cnt >= 1) { - // TODO: Handle the case where cnt == 1 specially. - if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else { // cnt == 0, which means the outbound buffer contained empty buffers only. - in.removeBytes(0); - } - } - - return true; - } - - @Override - protected Object filterOutboundMessage(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) { - if (buf instanceof CompositeByteBuf) { - // Special handling of CompositeByteBuf to reduce memory copies if some of the Components - // in the CompositeByteBuf are backed by a memoryAddress. - CompositeByteBuf comp = (CompositeByteBuf) buf; - if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) { - // more then 1024 buffers for gathering writes so just do a memory copy. - buf = newDirectBuffer(buf); - assert buf.hasMemoryAddress(); - } - } else { - // We can only handle buffers with memory address so we need to copy if a non direct is - // passed to write. - buf = newDirectBuffer(buf); - assert buf.hasMemoryAddress(); - } - } - return buf; - } - - if (msg instanceof DefaultFileRegion) { - return msg; - } - - throw new UnsupportedOperationException( - "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); - } - @Override public EpollSocketChannelConfig config() { return config; @@ -450,12 +107,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So @Override public boolean isInputShutdown() { - return inputShutdown; + return isInputShutdown0(); } @Override public boolean isOutputShutdown() { - return outputShutdown || !isActive(); + return isOutputShutdown0(); } @Override @@ -465,24 +122,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - try { - Native.shutdown(fd, false, true); - outputShutdown = true; - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); - } - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownOutput(promise); - } - }); - } - return promise; + return shutdownOutput0(promise); } @Override @@ -490,303 +130,17 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So return (ServerSocketChannel) super.parent(); } - final class EpollSocketUnsafe extends AbstractEpollUnsafe { - - private void closeOnRead(ChannelPipeline pipeline) { - inputShutdown = true; - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - clearEpollIn0(); - pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - close(voidPromise()); - } - } + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (localAddress != null) { + checkResolvable((InetSocketAddress) localAddress); } - - private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { - if (byteBuf != null) { - if (byteBuf.isReadable()) { - readPending = false; - pipeline.fireChannelRead(byteBuf); - } else { - byteBuf.release(); - } - } - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(cause); - if (close || cause instanceof IOException) { - closeOnRead(pipeline); - return true; - } - return false; - } - - @Override - public void connect( - final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (!promise.setUncancellable() || !ensureOpen(promise)) { - return; - } - - try { - if (connectPromise != null) { - throw new IllegalStateException("connection attempt already made"); - } - - boolean wasActive = isActive(); - if (doConnect((InetSocketAddress) remoteAddress, (InetSocketAddress) localAddress)) { - fulfillConnectPromise(promise, wasActive); - } else { - connectPromise = promise; - requestedRemoteAddress = remoteAddress; - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - ChannelPromise connectPromise = EpollSocketChannel.this.connectPromise; - ConnectTimeoutException cause = - new ConnectTimeoutException("connection timed out: " + remoteAddress); - if (connectPromise != null && connectPromise.tryFailure(cause)) { - close(voidPromise()); - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isCancelled()) { - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - close(voidPromise()); - } - } - }); - } - } catch (Throwable t) { - closeIfClosed(); - promise.tryFailure(annotateConnectException(t, remoteAddress)); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - active = true; - - // trySuccess() will return false if a user cancelled the connection attempt. - boolean promiseSet = promise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - - // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). - if (!promiseSet) { - close(voidPromise()); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(cause); - closeIfClosed(); - } - - private void finishConnect() { - // Note this method is invoked by the event loop only if the connection attempt was - // neither cancelled nor timed out. - - assert eventLoop().inEventLoop(); - - boolean connectStillInProgress = false; - try { - boolean wasActive = isActive(); - if (!doFinishConnect()) { - connectStillInProgress = true; - return; - } - fulfillConnectPromise(connectPromise, wasActive); - } catch (Throwable t) { - fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); - } finally { - if (!connectStillInProgress) { - // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used - // See https://github.com/netty/netty/issues/1770 - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - } - } - } - - @Override - void epollOutReady() { - if (connectPromise != null) { - // pending connect which is now complete so handle it. - finishConnect(); - } else { - super.epollOutReady(); - } - } - - /** - * Connect to the remote peer - */ - private boolean doConnect(InetSocketAddress remoteAddress, InetSocketAddress localAddress) throws Exception { - if (localAddress != null) { - checkResolvable(localAddress); - Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); - } - - boolean success = false; - try { - checkResolvable(remoteAddress); - boolean connected = Native.connect(fd, remoteAddress.getAddress(), - remoteAddress.getPort()); - remote = remoteAddress; - local = Native.localAddress(fd); - if (!connected) { - setEpollOut(); - } - success = true; - return connected; - } finally { - if (!success) { - doClose(); - } - } - } - - /** - * Finish the connect - */ - private boolean doFinishConnect() throws Exception { - if (Native.finishConnect(fd)) { - clearEpollOut(); - return true; - } else { - setEpollOut(); - return false; - } - } - - /** - * Read bytes into the given {@link ByteBuf} and return the amount. - */ - private int doReadBytes(ByteBuf byteBuf) throws Exception { - int writerIndex = byteBuf.writerIndex(); - int localReadAmount; - if (byteBuf.hasMemoryAddress()) { - localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); - } else { - ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes()); - localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); - } - if (localReadAmount > 0) { - byteBuf.writerIndex(writerIndex + localReadAmount); - } - return localReadAmount; - } - - @Override - void epollRdHupReady() { - if (isActive()) { - epollInReady(); - } else { - closeOnRead(pipeline()); - } - } - - @Override - void epollInReady() { - final ChannelConfig config = config(); - final ChannelPipeline pipeline = pipeline(); - final ByteBufAllocator allocator = config.getAllocator(); - RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); - - ByteBuf byteBuf = null; - boolean close = false; - try { - int totalReadAmount = 0; - for (;;) { - // we use a direct buffer here as the native implementations only be able - // to handle direct buffers. - byteBuf = allocHandle.allocate(allocator); - int writable = byteBuf.writableBytes(); - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount <= 0) { - // not was read release the buffer - byteBuf.release(); - close = localReadAmount < 0; - break; - } - readPending = false; - pipeline.fireChannelRead(byteBuf); - byteBuf = null; - - if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { - allocHandle.record(totalReadAmount); - - // Avoid overflow. - totalReadAmount = localReadAmount; - } else { - totalReadAmount += localReadAmount; - } - - if (localReadAmount < writable) { - // Read less than what the buffer can hold, - // which might mean we drained the recv buffer completely. - break; - } - } - pipeline.fireChannelReadComplete(); - allocHandle.record(totalReadAmount); - - if (close) { - closeOnRead(pipeline); - close = false; - } - } catch (Throwable t) { - boolean closed = handleReadException(pipeline, byteBuf, t, close); - if (!closed) { - // trigger a read again as there may be something left to read and because of epoll ET we - // will not get notified again until we read everything from the socket - eventLoop().execute(new Runnable() { - @Override - public void run() { - epollInReady(); - } - }); - } - } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !readPending) { - clearEpollIn0(); - } - } + checkResolvable((InetSocketAddress) remoteAddress); + if (super.doConnect(remoteAddress, localAddress)) { + local = Native.localAddress(fd); + remote = Native.remoteAddress(fd); + return true; } + return false; } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index cf9ab02484..6d11ea32eb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; @@ -346,18 +347,39 @@ final class Native { return res; } + public static int socketDomainFd() { + int res = socketDomain(); + if (res < 0) { + throw new ChannelException(newIOException("socketDomain", res)); + } + return res; + } + private static native int socketStream(); private static native int socketDgram(); + private static native int socketDomain(); - public static void bind(int fd, InetAddress addr, int port) throws IOException { - NativeInetAddress address = toNativeInetAddress(addr); - int res = bind(fd, address.address, address.scopeId, port); - if (res < 0) { - throw newIOException("bind", res); + public static void bind(int fd, SocketAddress socketAddress) throws IOException { + if (socketAddress instanceof InetSocketAddress) { + InetSocketAddress addr = (InetSocketAddress) socketAddress; + NativeInetAddress address = toNativeInetAddress(addr.getAddress()); + int res = bind(fd, address.address, address.scopeId, addr.getPort()); + if (res < 0) { + throw newIOException("bind", res); + } + } else if (socketAddress instanceof DomainSocketAddress) { + DomainSocketAddress addr = (DomainSocketAddress) socketAddress; + int res = bindDomainSocket(fd, addr.path()); + if (res < 0) { + throw newIOException("bind", res); + } + } else { + throw new Error("Unexpected SocketAddress implementation " + socketAddress); } } private static native int bind(int fd, byte[] address, int scopeId, int port); + private static native int bindDomainSocket(int fd, String path); public static void listen(int fd, int backlog) throws IOException { int res = listen0(fd, backlog); @@ -368,9 +390,18 @@ final class Native { private static native int listen0(int fd, int backlog); - public static boolean connect(int fd, InetAddress addr, int port) throws IOException { - NativeInetAddress address = toNativeInetAddress(addr); - int res = connect(fd, address.address, address.scopeId, port); + public static boolean connect(int fd, SocketAddress socketAddress) throws IOException { + int res; + if (socketAddress instanceof InetSocketAddress) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + NativeInetAddress address = toNativeInetAddress(inetSocketAddress.getAddress()); + res = connect(fd, address.address, address.scopeId, inetSocketAddress.getPort()); + } else if (socketAddress instanceof DomainSocketAddress) { + DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) socketAddress; + res = connectDomainSocket(fd, unixDomainSocketAddress.path()); + } else { + throw new Error("Unexpected SocketAddress implementation " + socketAddress); + } if (res < 0) { if (res == ERRNO_EINPROGRESS_NEGATIVE) { // connect not complete yet need to wait for EPOLLOUT event @@ -382,6 +413,7 @@ final class Native { } private static native int connect(int fd, byte[] address, int scopeId, int port); + private static native int connectDomainSocket(int fd, String path); public static boolean finishConnect(int fd) throws IOException { int res = finishConnect0(fd); @@ -477,6 +509,20 @@ final class Native { private static native int accept0(int fd); + public static int recvFd(int fd) throws IOException { + int res = recvFd0(fd); + if (res >= 0) { + return res; + } + if (res == ERRNO_EAGAIN_NEGATIVE || res == ERRNO_EWOULDBLOCK_NEGATIVE) { + // Everything consumed so just return -1 here. + return -1; + } + throw newIOException("recvFd", res); + } + + private static native int recvFd0(int fd); + public static void shutdown(int fd, boolean read, boolean write) throws IOException { int res = shutdown0(fd, read, write); if (res < 0) { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketEchoTest.java new file mode 100644 index 0000000000..faf6d9b740 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketEchoTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketEchoTest extends EpollSocketEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFileRegionTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFileRegionTest.java new file mode 100644 index 0000000000..8d5abd3ea4 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFileRegionTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketFileRegionTest extends EpollSocketFileRegionTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFixedLengthEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFixedLengthEchoTest.java new file mode 100644 index 0000000000..925663f6fc --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFixedLengthEchoTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest { + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketGatheringWriteTest.java new file mode 100644 index 0000000000..03c2cbcfd2 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketGatheringWriteTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketGatheringWriteTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketGatheringWriteTest extends SocketGatheringWriteTest { + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketObjectEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketObjectEchoTest.java new file mode 100644 index 0000000000..91ce300caa --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketObjectEchoTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketObjectEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketObjectEchoTest extends SocketObjectEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslEchoTest.java new file mode 100644 index 0000000000..58be4ce12a --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslEchoTest.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.handler.ssl.SslContext; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketSslEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketSslEchoTest extends SocketSslEchoTest { + + public EpollDomainSocketSslEchoTest( + SslContext serverCtx, SslContext clientCtx, Renegotiation renegotiation, boolean autoRead, + boolean useChunkedWriteHandler, boolean useCompositeByteBuf) { + super(serverCtx, clientCtx, renegotiation, autoRead, useChunkedWriteHandler, useCompositeByteBuf); + } + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java new file mode 100644 index 0000000000..a1ed0c5ac2 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.handler.ssl.SslContext; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketSslGreetingTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketSslGreetingTest extends SocketSslGreetingTest { + + public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) { + super(serverCtx, clientCtx); + } + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStartTlsTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStartTlsTest.java new file mode 100644 index 0000000000..f60981bfee --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStartTlsTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.handler.ssl.SslContext; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStartTlsTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketStartTlsTest extends SocketStartTlsTest { + + public EpollDomainSocketStartTlsTest(SslContext serverCtx, SslContext clientCtx) { + super(serverCtx, clientCtx); + } + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStringEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStringEchoTest.java new file mode 100644 index 0000000000..1b5a032d91 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStringEchoTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStringEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketStringEchoTest extends SocketStringEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index ad9cb4debf..d58b08bbf8 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -29,12 +29,15 @@ import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; import io.netty.testsuite.transport.socket.SocketTestPermutation; import io.netty.util.concurrent.DefaultExecutorServiceFactory; +import java.io.File; +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; class EpollSocketTestPermutation extends SocketTestPermutation { - static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); + static final EpollSocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); static final EventLoopGroup EPOLL_BOSS_GROUP = new EpollEventLoopGroup(BOSSES, new DefaultExecutorServiceFactory("testsuite-epoll-boss")); @@ -119,4 +122,44 @@ class EpollSocketTestPermutation extends SocketTestPermutation { ); return combo(bfs, bfs); } + + public List> domainSocket() { + + List> list = + combo(serverDomainSocket(), clientDomainSocket()); + return list; + } + + public List> serverDomainSocket() { + return Collections.>singletonList( + new BootstrapFactory() { + @Override + public ServerBootstrap newInstance() { + return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP) + .channel(EpollServerDomainSocketChannel.class); + } + } + ); + } + + public List> clientDomainSocket() { + return Collections.>singletonList( + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDomainSocketChannel.class); + } + } + ); + } + + public static DomainSocketAddress newSocketAddress() { + try { + File file = File.createTempFile("netty", "dsocket"); + file.delete(); + return new DomainSocketAddress(file); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } }