diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java index dabcf5492c..0a4057e8ee 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractClientSocketTest.java @@ -24,11 +24,12 @@ import io.netty.testsuite.util.TestUtils; import io.netty.util.NetUtil; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.List; public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest { - protected volatile InetSocketAddress addr; + protected volatile SocketAddress addr; protected AbstractClientSocketTest() { super(Bootstrap.class); @@ -41,8 +42,13 @@ public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest { - protected volatile InetSocketAddress addr; + protected volatile SocketAddress addr; protected AbstractServerSocketTest() { super(ServerBootstrap.class); @@ -41,10 +42,14 @@ public abstract class AbstractServerSocketTest extends AbstractTestsuiteTest { - protected volatile InetSocketAddress addr; + protected volatile SocketAddress addr; protected AbstractSocketTest() { super(ServerBootstrap.class, Bootstrap.class); @@ -42,12 +43,16 @@ public abstract class AbstractSocketTest extends AbstractComboTestsuiteTest() { + sb.childHandler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel c) throws Exception { + protected void initChannel(Channel c) throws Exception { c.pipeline().addLast(group, sh); } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel c) throws Exception { + protected void initChannel(Channel c) throws Exception { c.pipeline().addLast(group, ch); } }); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java index 2ff82c7f7b..013f55e1e6 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java @@ -23,7 +23,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import org.junit.Test; @@ -64,17 +63,17 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest { final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); sch.pipeline().addAfter("decoder", "handler", sh); } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); sch.pipeline().addAfter("decoder", "handler", ch); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java index d1af7462b3..10945c44f4 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketObjectEchoTest.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; @@ -72,9 +71,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest { final EchoHandler sh = new EchoHandler(autoRead); final EchoHandler ch = new EchoHandler(autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast( new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ObjectEncoder(), @@ -82,9 +81,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast( new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())), new ObjectEncoder(), diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index db94661ed9..36dcfdb4a0 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.ssl.JdkSslClientContext; import io.netty.handler.ssl.JdkSslServerContext; import io.netty.handler.ssl.OpenSsl; @@ -182,8 +181,8 @@ public class SocketSslEchoTest extends AbstractSocketTest { private final AtomicInteger clientNegoCounter = new AtomicInteger(); private final AtomicInteger serverNegoCounter = new AtomicInteger(); - private volatile SocketChannel clientChannel; - private volatile SocketChannel serverChannel; + private volatile Channel clientChannel; + private volatile Channel serverChannel; private volatile SslHandler clientSslHandler; private volatile SslHandler serverSslHandler; @@ -222,10 +221,10 @@ public class SocketSslEchoTest extends AbstractSocketTest { final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool(); reset(); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override @SuppressWarnings("deprecation") - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { serverChannel = sch; if (serverUsesDelegatedTaskExecutor) { @@ -243,10 +242,10 @@ public class SocketSslEchoTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override @SuppressWarnings("deprecation") - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { clientChannel = sch; if (clientUsesDelegatedTaskExecutor) { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java index 97423e80b9..b0e45481fc 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.JdkSslClientContext; @@ -117,9 +116,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest { final ServerHandler sh = new ServerHandler(); final ClientHandler ch = new ClientHandler(); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast(serverCtx.newHandler(sch.alloc())); p.addLast(new LoggingHandler(LOG_LEVEL)); @@ -127,9 +126,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast(clientCtx.newHandler(sch.alloc())); p.addLast(new LoggingHandler(LOG_LEVEL)); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java index fdbeb6a57a..132a94fe51 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStartTlsTest.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; @@ -150,9 +149,9 @@ public class SocketStartTlsTest extends AbstractSocketTest { final StartTlsServerHandler sh = new StartTlsServerHandler(sse, autoRead); final StartTlsClientHandler ch = new StartTlsClientHandler(cse, autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); @@ -160,9 +159,9 @@ public class SocketStartTlsTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { ChannelPipeline p = sch.pipeline(); p.addLast("logger", new LoggingHandler(LOG_LEVEL)); p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder()); diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java index 38123532e5..c43204bfb8 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; @@ -74,9 +73,9 @@ public class SocketStringEchoTest extends AbstractSocketTest { final StringEchoHandler sh = new StringEchoHandler(autoRead); final StringEchoHandler ch = new StringEchoHandler(autoRead); - sb.childHandler(new ChannelInitializer() { + sb.childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter())); sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); @@ -84,9 +83,9 @@ public class SocketStringEchoTest extends AbstractSocketTest { } }); - cb.handler(new ChannelInitializer() { + cb.handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel sch) throws Exception { + public void initChannel(Channel sch) throws Exception { sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter())); sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1)); sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1)); diff --git a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c index 3aa984559e..a2556140f9 100644 --- a/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c +++ b/transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -1390,3 +1391,101 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_strError(JNIEnv* en char* err = strerror(error); return (*env)->NewStringUTF(env, err); } + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketDomain(JNIEnv* env, jclass clazz) { + int fd = socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (fd == -1) { + return -errno; + } + return fd; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_bindDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) { + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + + const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0); + memcpy(addr.sun_path, socket_path, strlen(socket_path)); + + if (unlink(socket_path) == -1 && errno != ENOENT) { + return -errno; + } + + int res = bind(fd, (struct sockaddr*) &addr, sizeof(addr)); + (*env)->ReleaseStringUTFChars(env, socketPath, socket_path); + + if (res == -1) { + return -errno; + } + return res; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_connectDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) { + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + + const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0); + strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); + + int res; + int err; + do { + res = connect(fd, (struct sockaddr*) &addr, sizeof(addr)); + } while (res == -1 && ((err = errno) == EINTR)); + + (*env)->ReleaseStringUTFChars(env, socketPath, socket_path); + + if (res < 0) { + return -err; + } + return 0; +} + +JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_recvFd(JNIEnv* env, jclass clazz, jint fd) { + jint socketFd; + struct msghdr msg; + struct iovec iov[1]; + struct cmsghdr* ctrl_msg = NULL; + char msg_buffer[1]; + char elem_buffer[CMSG_SPACE(sizeof(int))]; + + /* Fill all with 0 */ + memset(&msg, 0, sizeof(struct msghdr)); + memset(elem_buffer, 0, CMSG_SPACE(sizeof(int))); + + iov[0].iov_base = msg_buffer; + iov[0].iov_len = 1; + + msg.msg_iov = iov; + msg.msg_iovlen = 1; + msg.msg_control = elem_buffer; + msg.msg_controllen = CMSG_SPACE(sizeof(int)); + + if(recvmsg(fd, &msg, MSG_CMSG_CLOEXEC) < 0) { + // All read, return -1 + return -1; + } + + if((msg.msg_flags & MSG_CTRUNC) == MSG_CTRUNC) { + // Not enough space ?!?! + return -1; + } + + // skip empty entries + for(ctrl_msg = CMSG_FIRSTHDR(&msg); ctrl_msg != NULL; ctrl_msg = CMSG_NXTHDR(&msg, ctrl_msg)) { + if((ctrl_msg->cmsg_level == SOL_SOCKET) && (ctrl_msg->cmsg_type == SCM_RIGHTS)) { + socketFd = *((int *) CMSG_DATA(ctrl_msg)); + // set as non blocking as we want to use it with epoll + if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) { + return -errno; + } + return socketFd; + } + } + return -1; +} + diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 79fef4403c..e298c21d30 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -27,6 +27,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.OneTimeTask; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; abstract class AbstractEpollChannel extends AbstractChannel { @@ -49,6 +50,10 @@ abstract class AbstractEpollChannel extends AbstractChannel { this.active = active; } + protected final int fd() { + return fd; + } + @Override public boolean isActive() { return active; @@ -71,16 +76,6 @@ abstract class AbstractEpollChannel extends AbstractChannel { Native.close(fd); } - @Override - public InetSocketAddress remoteAddress() { - return (InetSocketAddress) super.remoteAddress(); - } - - @Override - public InetSocketAddress localAddress() { - return (InetSocketAddress) super.localAddress(); - } - @Override protected void doDisconnect() throws Exception { doClose(); @@ -214,6 +209,72 @@ abstract class AbstractEpollChannel extends AbstractChannel { } } + /** + * Read bytes into the given {@link ByteBuf} and return the amount. + */ + protected final int doReadBytes(ByteBuf byteBuf) throws Exception { + int writerIndex = byteBuf.writerIndex(); + int localReadAmount; + if (byteBuf.hasMemoryAddress()) { + localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); + } else { + ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes()); + localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); + } + if (localReadAmount > 0) { + byteBuf.writerIndex(writerIndex + localReadAmount); + } + return localReadAmount; + } + + protected final int doWriteBytes(ByteBuf buf) throws Exception { + int readableBytes = buf.readableBytes(); + int writtenBytes = 0; + if (buf.hasMemoryAddress()) { + long memoryAddress = buf.memoryAddress(); + int readerIndex = buf.readerIndex(); + int writerIndex = buf.writerIndex(); + for (;;) { + int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex); + if (localFlushedAmount > 0) { + writtenBytes += localFlushedAmount; + if (writtenBytes == readableBytes) { + return writtenBytes; + } + readerIndex += localFlushedAmount; + } else { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + return writtenBytes; + } + } + } else { + ByteBuffer nioBuf; + if (buf.nioBufferCount() == 1) { + nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()); + } else { + nioBuf = buf.nioBuffer(); + } + for (;;) { + int pos = nioBuf.position(); + int limit = nioBuf.limit(); + int localFlushedAmount = Native.write(fd, nioBuf, pos, limit); + if (localFlushedAmount > 0) { + nioBuf.position(pos + localFlushedAmount); + writtenBytes += localFlushedAmount; + if (writtenBytes == readableBytes) { + return writtenBytes; + } + } else { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + } + return writtenBytes; + } + } + protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected boolean readPending; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java new file mode 100644 index 0000000000..a852f23ae8 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -0,0 +1,114 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.channel.ServerChannel; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + + +public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel { + + protected AbstractEpollServerChannel(int fd) { + super(fd, Native.EPOLLACCEPT); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof EpollEventLoop; + } + + @Override + protected InetSocketAddress remoteAddress0() { + return null; + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollServerSocketUnsafe(); + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected Object filterOutboundMessage(Object msg) throws Exception { + throw new UnsupportedOperationException(); + } + + protected abstract Channel newChildChannel(int fd) throws Exception; + + final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { + + @Override + public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { + // Connect not supported by ServerChannel implementations + channelPromise.setFailure(new UnsupportedOperationException()); + } + + @Override + void epollInReady() { + assert eventLoop().inEventLoop(); + final ChannelPipeline pipeline = pipeline(); + Throwable exception = null; + try { + try { + for (;;) { + int socketFd = Native.accept(fd); + if (socketFd == -1) { + // this means everything was handled for now + break; + } + readPending = false; + + try { + pipeline.fireChannelRead(newChildChannel(socketFd)); + } catch (Throwable t) { + // keep on reading as we use epoll ET and need to consume everything from the socket + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(t); + } + } + } catch (Throwable t) { + exception = t; + } + pipeline.fireChannelReadComplete(); + + if (exception != null) { + pipeline.fireExceptionCaught(exception); + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (!config().isAutoRead() && !readPending) { + clearEpollIn0(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java new file mode 100644 index 0000000000..0324c5f9e7 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -0,0 +1,653 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundBuffer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.EventLoop; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { + + private static final String EXPECTED_TYPES = + " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + + StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; + + private volatile boolean inputShutdown; + private volatile boolean outputShutdown; + + protected AbstractEpollStreamChannel(Channel parent, int fd) { + super(parent, fd, Native.EPOLLIN, true); + } + + protected AbstractEpollStreamChannel(int fd) { + super(fd, Native.EPOLLIN); + } + + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollStreamUnsafe(); + } + + /** + * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. + * @param buf the {@link ByteBuf} from which the bytes should be written + */ + private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { + int readableBytes = buf.readableBytes(); + if (readableBytes == 0) { + in.remove(); + return true; + } + + if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { + int writtenBytes = doWriteBytes(buf); + in.removeBytes(writtenBytes); + return writtenBytes == readableBytes; + } else { + ByteBuffer[] nioBuffers = buf.nioBuffers(); + return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); + } + } + + private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { + + long expectedWrittenBytes = array.size(); + final long initialExpectedWrittenBytes = expectedWrittenBytes; + + int cnt = array.count(); + + assert expectedWrittenBytes != 0; + assert cnt != 0; + + boolean done = false; + int offset = 0; + int end = offset + cnt; + for (;;) { + long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt); + if (localWrittenBytes == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + expectedWrittenBytes -= localWrittenBytes; + + if (expectedWrittenBytes == 0) { + // Written everything, just break out here (fast-path) + done = true; + break; + } + + do { + long bytes = array.processWritten(offset, localWrittenBytes); + if (bytes == -1) { + // incomplete write + break; + } else { + offset++; + cnt--; + localWrittenBytes -= bytes; + } + } while (offset < end && localWrittenBytes > 0); + } + + in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); + return done; + } + + private boolean writeBytesMultiple( + ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, + int nioBufferCnt, long expectedWrittenBytes) throws IOException { + + assert expectedWrittenBytes != 0; + final long initialExpectedWrittenBytes = expectedWrittenBytes; + + boolean done = false; + int offset = 0; + int end = offset + nioBufferCnt; + for (;;) { + long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt); + if (localWrittenBytes == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + expectedWrittenBytes -= localWrittenBytes; + + if (expectedWrittenBytes == 0) { + // Written everything, just break out here (fast-path) + done = true; + break; + } + do { + ByteBuffer buffer = nioBuffers[offset]; + int pos = buffer.position(); + int bytes = buffer.limit() - pos; + if (bytes > localWrittenBytes) { + buffer.position(pos + (int) localWrittenBytes); + // incomplete write + break; + } else { + offset++; + nioBufferCnt--; + localWrittenBytes -= bytes; + } + } while (offset < end && localWrittenBytes > 0); + } + + in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); + return done; + } + + /** + * Write a {@link DefaultFileRegion} + * + * @param region the {@link DefaultFileRegion} from which the bytes should be written + * @return amount the amount of written bytes + */ + private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { + final long regionCount = region.count(); + if (region.transfered() >= regionCount) { + in.remove(); + return true; + } + + final long baseOffset = region.position(); + boolean done = false; + long flushedAmount = 0; + + for (;;) { + final long offset = region.transfered(); + final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset); + if (localFlushedAmount == 0) { + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); + break; + } + + flushedAmount += localFlushedAmount; + if (region.transfered() >= regionCount) { + done = true; + break; + } + } + + if (flushedAmount > 0) { + in.progress(flushedAmount); + } + + if (done) { + in.remove(); + } + return done; + } + + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + for (;;) { + final int msgCount = in.size(); + + if (msgCount == 0) { + // Wrote all messages. + clearEpollOut(); + break; + } + + // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. + if (msgCount > 1 && in.current() instanceof ByteBuf) { + if (!doWriteMultiple(in)) { + break; + } + + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. + } else { // msgCount == 1 + if (!doWriteSingle(in)) { + break; + } + } + } + } + + private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { + // The outbound buffer contains only one message or it contains a file region. + Object msg = in.current(); + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!writeBytes(in, buf)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else if (msg instanceof DefaultFileRegion) { + DefaultFileRegion region = (DefaultFileRegion) msg; + if (!writeFileRegion(in, region)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { + // Should never reach here. + throw new Error(); + } + + return true; + } + + private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { + if (PlatformDependent.hasUnsafe()) { + // this means we can cast to IovArray and write the IovArray directly. + IovArray array = IovArrayThreadLocal.get(in); + int cnt = array.count(); + if (cnt >= 1) { + // TODO: Handle the case where cnt == 1 specially. + if (!writeBytesMultiple(in, array)) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { // cnt == 0, which means the outbound buffer contained empty buffers only. + in.removeBytes(0); + } + } else { + ByteBuffer[] buffers = in.nioBuffers(); + int cnt = in.nioBufferCount(); + if (cnt >= 1) { + // TODO: Handle the case where cnt == 1 specially. + if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { + // was not able to write everything so break here we will get notified later again once + // the network stack can handle more writes. + return false; + } + } else { // cnt == 0, which means the outbound buffer contained empty buffers only. + in.removeBytes(0); + } + } + + return true; + } + + @Override + protected Object filterOutboundMessage(Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) { + if (buf instanceof CompositeByteBuf) { + // Special handling of CompositeByteBuf to reduce memory copies if some of the Components + // in the CompositeByteBuf are backed by a memoryAddress. + CompositeByteBuf comp = (CompositeByteBuf) buf; + if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) { + // more then 1024 buffers for gathering writes so just do a memory copy. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } + } else { + // We can only handle buffers with memory address so we need to copy if a non direct is + // passed to write. + buf = newDirectBuffer(buf); + assert buf.hasMemoryAddress(); + } + } + return buf; + } + + if (msg instanceof DefaultFileRegion) { + return msg; + } + + throw new UnsupportedOperationException( + "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); + } + + protected boolean isInputShutdown0() { + return inputShutdown; + } + + protected boolean isOutputShutdown0() { + return outputShutdown || !isActive(); + } + + protected ChannelFuture shutdownOutput0(final ChannelPromise promise) { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + try { + Native.shutdown(fd, false, true); + outputShutdown = true; + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } else { + loop.execute(new Runnable() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } + return promise; + } + + /** + * Connect to the remote peer + */ + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (localAddress != null) { + Native.bind(fd, localAddress); + } + + boolean success = false; + try { + boolean connected = Native.connect(fd, remoteAddress); + if (!connected) { + setEpollOut(); + } + success = true; + return connected; + } finally { + if (!success) { + doClose(); + } + } + } + + final class EpollStreamUnsafe extends AbstractEpollUnsafe { + /** + * The future of the current connection attempt. If not null, subsequent + * connection attempts will fail. + */ + private ChannelPromise connectPromise; + private ScheduledFuture connectTimeoutFuture; + private SocketAddress requestedRemoteAddress; + + private RecvByteBufAllocator.Handle allocHandle; + + private void closeOnRead(ChannelPipeline pipeline) { + inputShutdown = true; + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + clearEpollIn0(); + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidPromise()); + } + } + } + + private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + readPending = false; + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + closeOnRead(pipeline); + return true; + } + return false; + } + + @Override + public void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + try { + if (connectPromise != null) { + throw new IllegalStateException("connection attempt already made"); + } + + boolean wasActive = isActive(); + if (doConnect(remoteAddress, localAddress)) { + fulfillConnectPromise(promise, wasActive); + } else { + connectPromise = promise; + requestedRemoteAddress = remoteAddress; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + ChannelPromise connectPromise = EpollStreamUnsafe.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); + } + } catch (Throwable t) { + closeIfClosed(); + promise.tryFailure(annotateConnectException(t, remoteAddress)); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + active = true; + + // trySuccess() will return false if a user cancelled the connection attempt. + boolean promiseSet = promise.trySuccess(); + + // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, + // because what happened is what happened. + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } + + // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). + if (!promiseSet) { + close(voidPromise()); + } + } + + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + + // Use tryFailure() instead of setFailure() to avoid the race against cancel(). + promise.tryFailure(cause); + closeIfClosed(); + } + + private void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + boolean connectStillInProgress = false; + try { + boolean wasActive = isActive(); + if (!doFinishConnect()) { + connectStillInProgress = true; + return; + } + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); + } finally { + if (!connectStillInProgress) { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + } + + @Override + void epollOutReady() { + if (connectPromise != null) { + // pending connect which is now complete so handle it. + finishConnect(); + } else { + super.epollOutReady(); + } + } + + /** + * Finish the connect + */ + private boolean doFinishConnect() throws Exception { + if (Native.finishConnect(fd)) { + clearEpollOut(); + return true; + } else { + setEpollOut(); + return false; + } + } + + @Override + void epollRdHupReady() { + if (isActive()) { + epollInReady(); + } else { + closeOnRead(pipeline()); + } + } + + @Override + void epollInReady() { + final ChannelConfig config = config(); + final ChannelPipeline pipeline = pipeline(); + final ByteBufAllocator allocator = config.getAllocator(); + RecvByteBufAllocator.Handle allocHandle = this.allocHandle; + if (allocHandle == null) { + this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); + } + + ByteBuf byteBuf = null; + boolean close = false; + try { + int totalReadAmount = 0; + for (;;) { + // we use a direct buffer here as the native implementations only be able + // to handle direct buffers. + byteBuf = allocHandle.allocate(allocator); + int writable = byteBuf.writableBytes(); + int localReadAmount = doReadBytes(byteBuf); + if (localReadAmount <= 0) { + // not was read release the buffer + byteBuf.release(); + close = localReadAmount < 0; + break; + } + readPending = false; + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + + if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { + allocHandle.record(totalReadAmount); + + // Avoid overflow. + totalReadAmount = localReadAmount; + } else { + totalReadAmount += localReadAmount; + } + + if (localReadAmount < writable) { + // Read less than what the buffer can hold, + // which might mean we drained the recv buffer completely. + break; + } + } + pipeline.fireChannelReadComplete(); + allocHandle.record(totalReadAmount); + + if (close) { + closeOnRead(pipeline); + close = false; + } + } catch (Throwable t) { + boolean closed = handleReadException(pipeline, byteBuf, t, close); + if (!closed) { + // trigger a read again as there may be something left to read and because of epoll ET we + // will not get notified again until we read everything from the socket + eventLoop().execute(new Runnable() { + @Override + public void run() { + epollInReady(); + } + }); + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (!config.isAutoRead() && !readPending) { + clearEpollIn0(); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/DomainSocketAddress.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/DomainSocketAddress.java new file mode 100644 index 0000000000..04235ed959 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/DomainSocketAddress.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import java.io.File; +import java.net.SocketAddress; + +public final class DomainSocketAddress extends SocketAddress { + private final String socketPath; + + public DomainSocketAddress(String socketPath) { + if (socketPath == null) { + throw new NullPointerException("socketPath"); + } + this.socketPath = socketPath; + } + + public DomainSocketAddress(File file) { + this(file.getPath()); + } + + /** + * The path to the domain socket. + */ + public String path() { + return socketPath; + } + + @Override + public String toString() { + return path(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DomainSocketAddress)) { + return false; + } + + return ((DomainSocketAddress) o).socketPath.equals(socketPath); + } + + @Override + public int hashCode() { + return socketPath.hashCode(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index a255db0a60..78999c859f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -66,6 +66,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements config = new EpollDatagramChannelConfig(this); } + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + @Override public ChannelMetadata metadata() { return METADATA; @@ -252,7 +262,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements protected void doBind(SocketAddress localAddress) throws Exception { InetSocketAddress addr = (InetSocketAddress) localAddress; checkResolvable(addr); - Native.bind(fd, addr.getAddress(), addr.getPort()); + Native.bind(fd, addr); local = Native.localAddress(fd); active = true; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java new file mode 100644 index 0000000000..85a78bdcf5 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.DefaultChannelConfig; + +import java.net.SocketAddress; + +public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel { + private final ChannelConfig config = new DefaultChannelConfig(this); + + private volatile DomainSocketAddress local; + private volatile DomainSocketAddress remote; + + EpollDomainSocketChannel(Channel parent, int fd) { + super(parent, fd); + } + + public EpollDomainSocketChannel() { + super(Native.socketDomainFd()); + } + + @Override + protected DomainSocketAddress localAddress0() { + return local; + } + + @Override + protected DomainSocketAddress remoteAddress0() { + return remote; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + Native.bind(fd, localAddress); + local = (DomainSocketAddress) localAddress; + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (super.doConnect(remoteAddress, localAddress)) { + local = (DomainSocketAddress) localAddress; + remote = (DomainSocketAddress) remoteAddress; + return true; + } + return false; + } + + @Override + public DomainSocketAddress remoteAddress() { + return (DomainSocketAddress) super.remoteAddress(); + } + + @Override + public DomainSocketAddress localAddress() { + return (DomainSocketAddress) super.localAddress(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java new file mode 100644 index 0000000000..8897447678 --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerChannelConfig.java @@ -0,0 +1,165 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.util.NetUtil; + +import java.util.Map; + +import static io.netty.channel.ChannelOption.SO_BACKLOG; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_REUSEADDR; + +public class EpollServerChannelConfig extends DefaultChannelConfig { + protected final AbstractEpollChannel channel; + private volatile int backlog = NetUtil.SOMAXCONN; + + EpollServerChannelConfig(AbstractEpollChannel channel) { + super(channel); + this.channel = channel; + } + + @Override + public Map, Object> getOptions() { + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + } + + @SuppressWarnings("unchecked") + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_BACKLOG) { + return (T) Integer.valueOf(getBacklog()); + } + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_BACKLOG) { + setBacklog((Integer) value); + } else { + return super.setOption(option, value); + } + + return true; + } + + public boolean isReuseAddress() { + return Native.isReuseAddress(channel.fd) == 1; + } + + public EpollServerChannelConfig setReuseAddress(boolean reuseAddress) { + Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + return this; + } + + public int getReceiveBufferSize() { + return Native.getReceiveBufferSize(channel.fd); + } + + public EpollServerChannelConfig setReceiveBufferSize(int receiveBufferSize) { + Native.setReceiveBufferSize(channel.fd, receiveBufferSize); + return this; + } + + public int getBacklog() { + return backlog; + } + + public EpollServerChannelConfig setBacklog(int backlog) { + if (backlog < 0) { + throw new IllegalArgumentException("backlog: " + backlog); + } + this.backlog = backlog; + return this; + } + + @Override + public EpollServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + public EpollServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public EpollServerChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public EpollServerChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public EpollServerChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public EpollServerChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public EpollServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + public EpollServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public EpollServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } + + @Override + protected final void autoReadCleared() { + channel.clearEpollIn(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerDomainSocketChannel.java new file mode 100644 index 0000000000..17143d161d --- /dev/null +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerDomainSocketChannel.java @@ -0,0 +1,85 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.Channel; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.io.File; +import java.net.SocketAddress; + + +public final class EpollServerDomainSocketChannel extends AbstractEpollServerChannel { + private static final InternalLogger logger = InternalLoggerFactory.getInstance( + EpollServerDomainSocketChannel.class); + + private final EpollServerChannelConfig config = new EpollServerChannelConfig(this); + private volatile DomainSocketAddress local; + + public EpollServerDomainSocketChannel() { + super(Native.socketDomainFd()); + } + + @Override + protected Channel newChildChannel(int fd) throws Exception { + return new EpollDomainSocketChannel(this, fd); + } + + @Override + protected DomainSocketAddress localAddress0() { + return local; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + Native.bind(fd, localAddress); + Native.listen(fd, config.getBacklog()); + local = (DomainSocketAddress) localAddress; + } + + @Override + protected void doClose() throws Exception { + try { + super.doClose(); + } finally { + DomainSocketAddress local = this.local; + if (local != null) { + // Delete the socket file if possible. + File socketFile = new File(local.path()); + boolean success = socketFile.delete(); + if (!success && logger.isDebugEnabled()) { + logger.debug("Failed to delete a domain socket file: {}", local.path()); + } + } + } + } + + @Override + public EpollServerChannelConfig config() { + return config; + } + + @Override + public DomainSocketAddress remoteAddress() { + return (DomainSocketAddress) super.remoteAddress(); + } + + @Override + public DomainSocketAddress localAddress() { + return (DomainSocketAddress) super.localAddress(); + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index 1722ac29a2..185ac864b0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -15,9 +15,7 @@ */ package io.netty.channel.epoll; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; @@ -28,13 +26,13 @@ import java.net.SocketAddress; * {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * maximal performance. */ -public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel { +public final class EpollServerSocketChannel extends AbstractEpollServerChannel implements ServerSocketChannel { private final EpollServerSocketChannelConfig config; private volatile InetSocketAddress local; public EpollServerSocketChannel() { - super(Native.socketStreamFd(), Native.EPOLLACCEPT); + super(Native.socketStreamFd()); config = new EpollServerSocketChannelConfig(this); } @@ -47,12 +45,22 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme protected void doBind(SocketAddress localAddress) throws Exception { InetSocketAddress addr = (InetSocketAddress) localAddress; checkResolvable(addr); - Native.bind(fd, addr.getAddress(), addr.getPort()); + Native.bind(fd, addr); local = Native.localAddress(fd); Native.listen(fd, config.getBacklog()); active = true; } + @Override + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); + } + @Override public EpollServerSocketChannelConfig config() { return config; @@ -64,74 +72,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme } @Override - protected InetSocketAddress remoteAddress0() { - return null; - } - - @Override - protected AbstractEpollUnsafe newUnsafe() { - return new EpollServerSocketUnsafe(); - } - - @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - protected Object filterOutboundMessage(Object msg) throws Exception { - throw new UnsupportedOperationException(); - } - - final class EpollServerSocketUnsafe extends AbstractEpollUnsafe { - - @Override - public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) { - // Connect not supported by ServerChannel implementations - channelPromise.setFailure(new UnsupportedOperationException()); - } - - @Override - void epollInReady() { - assert eventLoop().inEventLoop(); - final ChannelPipeline pipeline = pipeline(); - Throwable exception = null; - try { - try { - for (;;) { - int socketFd = Native.accept(fd); - if (socketFd == -1) { - // this means everything was handled for now - break; - } - try { - readPending = false; - pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd)); - } catch (Throwable t) { - // keep on reading as we use epoll ET and need to consume everything from the socket - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(t); - } - } - } catch (Throwable t) { - exception = t; - } - pipeline.fireChannelReadComplete(); - - if (exception != null) { - pipeline.fireExceptionCaught(exception); - } - } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !readPending) { - clearEpollIn0(); - } - } - } + protected Channel newChildChannel(int fd) throws Exception { + return new EpollSocketChannel(this, fd); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java index 38c5f53c7f..4e48e11b5c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -17,27 +17,17 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ServerSocketChannelConfig; -import io.netty.util.NetUtil; import java.util.Map; -import static io.netty.channel.ChannelOption.SO_BACKLOG; -import static io.netty.channel.ChannelOption.SO_RCVBUF; -import static io.netty.channel.ChannelOption.SO_REUSEADDR; - -public final class EpollServerSocketChannelConfig extends DefaultChannelConfig +public final class EpollServerSocketChannelConfig extends EpollServerChannelConfig implements ServerSocketChannelConfig { - private final EpollServerSocketChannel channel; - private volatile int backlog = NetUtil.SOMAXCONN; - EpollServerSocketChannelConfig(EpollServerSocketChannel channel) { super(channel); - this.channel = channel; // Use SO_REUSEADDR by default as java.nio does the same. // @@ -47,21 +37,12 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.SO_REUSEPORT); + return getOptions(super.getOptions(), EpollChannelOption.SO_REUSEPORT); } @SuppressWarnings("unchecked") @Override public T getOption(ChannelOption option) { - if (option == SO_RCVBUF) { - return (T) Integer.valueOf(getReceiveBufferSize()); - } - if (option == SO_REUSEADDR) { - return (T) Boolean.valueOf(isReuseAddress()); - } - if (option == SO_BACKLOG) { - return (T) Integer.valueOf(getBacklog()); - } if (option == EpollChannelOption.SO_REUSEPORT) { return (T) Boolean.valueOf(isReusePort()); } @@ -72,13 +53,7 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig public boolean setOption(ChannelOption option, T value) { validate(option, value); - if (option == SO_RCVBUF) { - setReceiveBufferSize((Integer) value); - } else if (option == SO_REUSEADDR) { - setReuseAddress((Boolean) value); - } else if (option == SO_BACKLOG) { - setBacklog((Integer) value); - } else if (option == EpollChannelOption.SO_REUSEPORT) { + if (option == EpollChannelOption.SO_REUSEPORT) { setReusePort((Boolean) value); } else { return super.setOption(option, value); @@ -87,26 +62,15 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig return true; } - @Override - public boolean isReuseAddress() { - return Native.isReuseAddress(channel.fd) == 1; - } - @Override public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { - Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0); + super.setReuseAddress(reuseAddress); return this; } - @Override - public int getReceiveBufferSize() { - return Native.getReceiveBufferSize(channel.fd); - } - @Override public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { - Native.setReceiveBufferSize(channel.fd, receiveBufferSize); - + super.setReceiveBufferSize(receiveBufferSize); return this; } @@ -115,17 +79,9 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig return this; } - @Override - public int getBacklog() { - return backlog; - } - @Override public EpollServerSocketChannelConfig setBacklog(int backlog) { - if (backlog < 0) { - throw new IllegalArgumentException("backlog: " + backlog); - } - this.backlog = backlog; + super.setBacklog(backlog); return this; } @@ -201,9 +157,4 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig Native.setReusePort(channel.fd, reusePort ? 1 : 0); return this; } - - @Override - protected void autoReadCleared() { - channel.clearEpollIn(); - } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 85254160e6..fd2ea18567 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -15,61 +15,28 @@ */ package io.netty.channel.epoll; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.DefaultFileRegion; -import io.netty.channel.EventLoop; -import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.StringUtil; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; /** * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * maximal performance. */ -public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { - - private static final String EXPECTED_TYPES = - " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + - StringUtil.simpleClassName(DefaultFileRegion.class) + ')'; +public final class EpollSocketChannel extends AbstractEpollStreamChannel implements SocketChannel { private final EpollSocketChannelConfig config; - /** - * The future of the current connection attempt. If not null, subsequent - * connection attempts will fail. - */ - private ChannelPromise connectPromise; - private ScheduledFuture connectTimeoutFuture; - private SocketAddress requestedRemoteAddress; - private volatile InetSocketAddress local; private volatile InetSocketAddress remote; - private volatile boolean inputShutdown; - private volatile boolean outputShutdown; EpollSocketChannel(Channel parent, int fd) { - super(parent, fd, Native.EPOLLIN, true); + super(parent, fd); config = new EpollSocketChannelConfig(this); // Directly cache the remote and local addresses // See https://github.com/netty/netty/issues/2359 @@ -78,7 +45,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } public EpollSocketChannel() { - super(Native.socketStreamFd(), Native.EPOLLIN); + super(Native.socketStreamFd()); config = new EpollSocketChannelConfig(this); } @@ -99,8 +66,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } @Override - protected AbstractEpollUnsafe newUnsafe() { - return new EpollSocketUnsafe(); + public InetSocketAddress remoteAddress() { + return (InetSocketAddress) super.remoteAddress(); + } + + @Override + public InetSocketAddress localAddress() { + return (InetSocketAddress) super.localAddress(); } @Override @@ -124,325 +96,10 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So @Override protected void doBind(SocketAddress local) throws Exception { InetSocketAddress localAddress = (InetSocketAddress) local; - Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); + Native.bind(fd, localAddress); this.local = Native.localAddress(fd); } - /** - * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. - * @param buf the {@link ByteBuf} from which the bytes should be written - */ - private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { - int readableBytes = buf.readableBytes(); - if (readableBytes == 0) { - in.remove(); - return true; - } - - boolean done = false; - long writtenBytes = 0; - if (buf.hasMemoryAddress()) { - long memoryAddress = buf.memoryAddress(); - int readerIndex = buf.readerIndex(); - int writerIndex = buf.writerIndex(); - for (;;) { - int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex); - if (localFlushedAmount > 0) { - writtenBytes += localFlushedAmount; - if (writtenBytes == readableBytes) { - done = true; - break; - } - readerIndex += localFlushedAmount; - } else { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - } - - in.removeBytes(writtenBytes); - return done; - } else if (buf.nioBufferCount() == 1) { - int readerIndex = buf.readerIndex(); - ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes()); - for (;;) { - int pos = nioBuf.position(); - int limit = nioBuf.limit(); - int localFlushedAmount = Native.write(fd, nioBuf, pos, limit); - if (localFlushedAmount > 0) { - nioBuf.position(pos + localFlushedAmount); - writtenBytes += localFlushedAmount; - if (writtenBytes == readableBytes) { - done = true; - break; - } - } else { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - } - - in.removeBytes(writtenBytes); - return done; - } else { - ByteBuffer[] nioBuffers = buf.nioBuffers(); - return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); - } - } - - private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { - - long expectedWrittenBytes = array.size(); - int cnt = array.count(); - - assert expectedWrittenBytes != 0; - assert cnt != 0; - - boolean done = false; - long writtenBytes = 0; - int offset = 0; - int end = offset + cnt; - for (;;) { - long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt); - if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - - if (expectedWrittenBytes == 0) { - // Written everything, just break out here (fast-path) - done = true; - break; - } - - do { - long bytes = array.processWritten(offset, localWrittenBytes); - if (bytes == -1) { - // incomplete write - break; - } else { - offset++; - cnt--; - localWrittenBytes -= bytes; - } - } while (offset < end && localWrittenBytes > 0); - } - - in.removeBytes(writtenBytes); - return done; - } - - private boolean writeBytesMultiple( - ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, - int nioBufferCnt, long expectedWrittenBytes) throws IOException { - - assert expectedWrittenBytes != 0; - - boolean done = false; - long writtenBytes = 0; - int offset = 0; - int end = offset + nioBufferCnt; - for (;;) { - long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt); - if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - - if (expectedWrittenBytes == 0) { - // Written everything, just break out here (fast-path) - done = true; - break; - } - do { - ByteBuffer buffer = nioBuffers[offset]; - int pos = buffer.position(); - int bytes = buffer.limit() - pos; - if (bytes > localWrittenBytes) { - buffer.position(pos + (int) localWrittenBytes); - // incomplete write - break; - } else { - offset++; - nioBufferCnt--; - localWrittenBytes -= bytes; - } - } while (offset < end && localWrittenBytes > 0); - } - - in.removeBytes(writtenBytes); - return done; - } - - /** - * Write a {@link DefaultFileRegion} - * - * @param region the {@link DefaultFileRegion} from which the bytes should be written - * @return amount the amount of written bytes - */ - private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { - final long regionCount = region.count(); - if (region.transfered() >= regionCount) { - in.remove(); - return true; - } - - final long baseOffset = region.position(); - boolean done = false; - long flushedAmount = 0; - - for (;;) { - final long offset = region.transfered(); - final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset); - if (localFlushedAmount == 0) { - // Returned EAGAIN need to set EPOLLOUT - setEpollOut(); - break; - } - - flushedAmount += localFlushedAmount; - if (region.transfered() >= regionCount) { - done = true; - break; - } - } - - if (flushedAmount > 0) { - in.progress(flushedAmount); - } - - if (done) { - in.remove(); - } - return done; - } - - @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { - for (;;) { - final int msgCount = in.size(); - - if (msgCount == 0) { - // Wrote all messages. - clearEpollOut(); - break; - } - - // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. - if (msgCount > 1 && in.current() instanceof ByteBuf) { - if (!doWriteMultiple(in)) { - break; - } - - // We do not break the loop here even if the outbound buffer was flushed completely, - // because a user might have triggered another write and flush when we notify his or her - // listeners. - } else { // msgCount == 1 - if (!doWriteSingle(in)) { - break; - } - } - } - } - - private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { - // The outbound buffer contains only one message or it contains a file region. - Object msg = in.current(); - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!writeBytes(in, buf)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else if (msg instanceof DefaultFileRegion) { - DefaultFileRegion region = (DefaultFileRegion) msg; - if (!writeFileRegion(in, region)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else { - // Should never reach here. - throw new Error(); - } - - return true; - } - - private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { - if (PlatformDependent.hasUnsafe()) { - // this means we can cast to IovArray and write the IovArray directly. - IovArray array = IovArrayThreadLocal.get(in); - int cnt = array.count(); - if (cnt >= 1) { - // TODO: Handle the case where cnt == 1 specially. - if (!writeBytesMultiple(in, array)) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else { // cnt == 0, which means the outbound buffer contained empty buffers only. - in.removeBytes(0); - } - } else { - ByteBuffer[] buffers = in.nioBuffers(); - int cnt = in.nioBufferCount(); - if (cnt >= 1) { - // TODO: Handle the case where cnt == 1 specially. - if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { - // was not able to write everything so break here we will get notified later again once - // the network stack can handle more writes. - return false; - } - } else { // cnt == 0, which means the outbound buffer contained empty buffers only. - in.removeBytes(0); - } - } - - return true; - } - - @Override - protected Object filterOutboundMessage(Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) { - if (buf instanceof CompositeByteBuf) { - // Special handling of CompositeByteBuf to reduce memory copies if some of the Components - // in the CompositeByteBuf are backed by a memoryAddress. - CompositeByteBuf comp = (CompositeByteBuf) buf; - if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) { - // more then 1024 buffers for gathering writes so just do a memory copy. - buf = newDirectBuffer(buf); - assert buf.hasMemoryAddress(); - } - } else { - // We can only handle buffers with memory address so we need to copy if a non direct is - // passed to write. - buf = newDirectBuffer(buf); - assert buf.hasMemoryAddress(); - } - } - return buf; - } - - if (msg instanceof DefaultFileRegion) { - return msg; - } - - throw new UnsupportedOperationException( - "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); - } - @Override public EpollSocketChannelConfig config() { return config; @@ -450,12 +107,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So @Override public boolean isInputShutdown() { - return inputShutdown; + return isInputShutdown0(); } @Override public boolean isOutputShutdown() { - return outputShutdown || !isActive(); + return isOutputShutdown0(); } @Override @@ -465,24 +122,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - try { - Native.shutdown(fd, false, true); - outputShutdown = true; - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); - } - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownOutput(promise); - } - }); - } - return promise; + return shutdownOutput0(promise); } @Override @@ -490,307 +130,17 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So return (ServerSocketChannel) super.parent(); } - final class EpollSocketUnsafe extends AbstractEpollUnsafe { - private RecvByteBufAllocator.Handle allocHandle; - - private void closeOnRead(ChannelPipeline pipeline) { - inputShutdown = true; - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - clearEpollIn0(); - pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - close(voidPromise()); - } - } + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (localAddress != null) { + checkResolvable((InetSocketAddress) localAddress); } - - private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { - if (byteBuf != null) { - if (byteBuf.isReadable()) { - readPending = false; - pipeline.fireChannelRead(byteBuf); - } else { - byteBuf.release(); - } - } - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(cause); - if (close || cause instanceof IOException) { - closeOnRead(pipeline); - return true; - } - return false; - } - - @Override - public void connect( - final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { - if (!promise.setUncancellable() || !ensureOpen(promise)) { - return; - } - - try { - if (connectPromise != null) { - throw new IllegalStateException("connection attempt already made"); - } - - boolean wasActive = isActive(); - if (doConnect((InetSocketAddress) remoteAddress, (InetSocketAddress) localAddress)) { - fulfillConnectPromise(promise, wasActive); - } else { - connectPromise = promise; - requestedRemoteAddress = remoteAddress; - - // Schedule connect timeout. - int connectTimeoutMillis = config().getConnectTimeoutMillis(); - if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - ChannelPromise connectPromise = EpollSocketChannel.this.connectPromise; - ConnectTimeoutException cause = - new ConnectTimeoutException("connection timed out: " + remoteAddress); - if (connectPromise != null && connectPromise.tryFailure(cause)) { - close(voidPromise()); - } - } - }, connectTimeoutMillis, TimeUnit.MILLISECONDS); - } - - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isCancelled()) { - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - close(voidPromise()); - } - } - }); - } - } catch (Throwable t) { - closeIfClosed(); - promise.tryFailure(annotateConnectException(t, remoteAddress)); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - active = true; - - // trySuccess() will return false if a user cancelled the connection attempt. - boolean promiseSet = promise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && isActive()) { - pipeline().fireChannelActive(); - } - - // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). - if (!promiseSet) { - close(voidPromise()); - } - } - - private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - - // Use tryFailure() instead of setFailure() to avoid the race against cancel(). - promise.tryFailure(cause); - closeIfClosed(); - } - - private void finishConnect() { - // Note this method is invoked by the event loop only if the connection attempt was - // neither cancelled nor timed out. - - assert eventLoop().inEventLoop(); - - boolean connectStillInProgress = false; - try { - boolean wasActive = isActive(); - if (!doFinishConnect()) { - connectStillInProgress = true; - return; - } - fulfillConnectPromise(connectPromise, wasActive); - } catch (Throwable t) { - fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); - } finally { - if (!connectStillInProgress) { - // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used - // See https://github.com/netty/netty/issues/1770 - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - } - } - } - - @Override - void epollOutReady() { - if (connectPromise != null) { - // pending connect which is now complete so handle it. - finishConnect(); - } else { - super.epollOutReady(); - } - } - - /** - * Connect to the remote peer - */ - private boolean doConnect(InetSocketAddress remoteAddress, InetSocketAddress localAddress) throws Exception { - if (localAddress != null) { - checkResolvable(localAddress); - Native.bind(fd, localAddress.getAddress(), localAddress.getPort()); - } - - boolean success = false; - try { - checkResolvable(remoteAddress); - boolean connected = Native.connect(fd, remoteAddress.getAddress(), - remoteAddress.getPort()); - remote = remoteAddress; - local = Native.localAddress(fd); - if (!connected) { - setEpollOut(); - } - success = true; - return connected; - } finally { - if (!success) { - doClose(); - } - } - } - - /** - * Finish the connect - */ - private boolean doFinishConnect() throws Exception { - if (Native.finishConnect(fd)) { - clearEpollOut(); - return true; - } else { - setEpollOut(); - return false; - } - } - - /** - * Read bytes into the given {@link ByteBuf} and return the amount. - */ - private int doReadBytes(ByteBuf byteBuf) throws Exception { - int writerIndex = byteBuf.writerIndex(); - int localReadAmount; - if (byteBuf.hasMemoryAddress()) { - localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity()); - } else { - ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes()); - localReadAmount = Native.read(fd, buf, buf.position(), buf.limit()); - } - if (localReadAmount > 0) { - byteBuf.writerIndex(writerIndex + localReadAmount); - } - return localReadAmount; - } - - @Override - void epollRdHupReady() { - if (isActive()) { - epollInReady(); - } else { - closeOnRead(pipeline()); - } - } - - @Override - void epollInReady() { - final ChannelConfig config = config(); - final ChannelPipeline pipeline = pipeline(); - final ByteBufAllocator allocator = config.getAllocator(); - RecvByteBufAllocator.Handle allocHandle = this.allocHandle; - if (allocHandle == null) { - this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); - } - - ByteBuf byteBuf = null; - boolean close = false; - try { - int totalReadAmount = 0; - for (;;) { - // we use a direct buffer here as the native implementations only be able - // to handle direct buffers. - byteBuf = allocHandle.allocate(allocator); - int writable = byteBuf.writableBytes(); - int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount <= 0) { - // not was read release the buffer - byteBuf.release(); - close = localReadAmount < 0; - break; - } - readPending = false; - pipeline.fireChannelRead(byteBuf); - byteBuf = null; - - if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { - allocHandle.record(totalReadAmount); - - // Avoid overflow. - totalReadAmount = localReadAmount; - } else { - totalReadAmount += localReadAmount; - } - - if (localReadAmount < writable) { - // Read less than what the buffer can hold, - // which might mean we drained the recv buffer completely. - break; - } - } - pipeline.fireChannelReadComplete(); - allocHandle.record(totalReadAmount); - - if (close) { - closeOnRead(pipeline); - close = false; - } - } catch (Throwable t) { - boolean closed = handleReadException(pipeline, byteBuf, t, close); - if (!closed) { - // trigger a read again as there may be something left to read and because of epoll ET we - // will not get notified again until we read everything from the socket - eventLoop().execute(new Runnable() { - @Override - public void run() { - epollInReady(); - } - }); - } - } finally { - // Check if there is a readPending which was not processed yet. - // This could be for two reasons: - // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method - // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method - // - // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !readPending) { - clearEpollIn0(); - } - } + checkResolvable((InetSocketAddress) remoteAddress); + if (super.doConnect(remoteAddress, localAddress)) { + local = Native.localAddress(fd); + remote = Native.remoteAddress(fd); + return true; } + return false; } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index cf9ab02484..6d11ea32eb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; @@ -346,18 +347,39 @@ final class Native { return res; } + public static int socketDomainFd() { + int res = socketDomain(); + if (res < 0) { + throw new ChannelException(newIOException("socketDomain", res)); + } + return res; + } + private static native int socketStream(); private static native int socketDgram(); + private static native int socketDomain(); - public static void bind(int fd, InetAddress addr, int port) throws IOException { - NativeInetAddress address = toNativeInetAddress(addr); - int res = bind(fd, address.address, address.scopeId, port); - if (res < 0) { - throw newIOException("bind", res); + public static void bind(int fd, SocketAddress socketAddress) throws IOException { + if (socketAddress instanceof InetSocketAddress) { + InetSocketAddress addr = (InetSocketAddress) socketAddress; + NativeInetAddress address = toNativeInetAddress(addr.getAddress()); + int res = bind(fd, address.address, address.scopeId, addr.getPort()); + if (res < 0) { + throw newIOException("bind", res); + } + } else if (socketAddress instanceof DomainSocketAddress) { + DomainSocketAddress addr = (DomainSocketAddress) socketAddress; + int res = bindDomainSocket(fd, addr.path()); + if (res < 0) { + throw newIOException("bind", res); + } + } else { + throw new Error("Unexpected SocketAddress implementation " + socketAddress); } } private static native int bind(int fd, byte[] address, int scopeId, int port); + private static native int bindDomainSocket(int fd, String path); public static void listen(int fd, int backlog) throws IOException { int res = listen0(fd, backlog); @@ -368,9 +390,18 @@ final class Native { private static native int listen0(int fd, int backlog); - public static boolean connect(int fd, InetAddress addr, int port) throws IOException { - NativeInetAddress address = toNativeInetAddress(addr); - int res = connect(fd, address.address, address.scopeId, port); + public static boolean connect(int fd, SocketAddress socketAddress) throws IOException { + int res; + if (socketAddress instanceof InetSocketAddress) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + NativeInetAddress address = toNativeInetAddress(inetSocketAddress.getAddress()); + res = connect(fd, address.address, address.scopeId, inetSocketAddress.getPort()); + } else if (socketAddress instanceof DomainSocketAddress) { + DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) socketAddress; + res = connectDomainSocket(fd, unixDomainSocketAddress.path()); + } else { + throw new Error("Unexpected SocketAddress implementation " + socketAddress); + } if (res < 0) { if (res == ERRNO_EINPROGRESS_NEGATIVE) { // connect not complete yet need to wait for EPOLLOUT event @@ -382,6 +413,7 @@ final class Native { } private static native int connect(int fd, byte[] address, int scopeId, int port); + private static native int connectDomainSocket(int fd, String path); public static boolean finishConnect(int fd) throws IOException { int res = finishConnect0(fd); @@ -477,6 +509,20 @@ final class Native { private static native int accept0(int fd); + public static int recvFd(int fd) throws IOException { + int res = recvFd0(fd); + if (res >= 0) { + return res; + } + if (res == ERRNO_EAGAIN_NEGATIVE || res == ERRNO_EWOULDBLOCK_NEGATIVE) { + // Everything consumed so just return -1 here. + return -1; + } + throw newIOException("recvFd", res); + } + + private static native int recvFd0(int fd); + public static void shutdown(int fd, boolean read, boolean write) throws IOException { int res = shutdown0(fd, read, write); if (res < 0) { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketEchoTest.java new file mode 100644 index 0000000000..faf6d9b740 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketEchoTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketEchoTest extends EpollSocketEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFileRegionTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFileRegionTest.java new file mode 100644 index 0000000000..8d5abd3ea4 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFileRegionTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketFileRegionTest extends EpollSocketFileRegionTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFixedLengthEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFixedLengthEchoTest.java new file mode 100644 index 0000000000..925663f6fc --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketFixedLengthEchoTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest { + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketGatheringWriteTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketGatheringWriteTest.java new file mode 100644 index 0000000000..03c2cbcfd2 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketGatheringWriteTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketGatheringWriteTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketGatheringWriteTest extends SocketGatheringWriteTest { + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketObjectEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketObjectEchoTest.java new file mode 100644 index 0000000000..91ce300caa --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketObjectEchoTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketObjectEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketObjectEchoTest extends SocketObjectEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslEchoTest.java new file mode 100644 index 0000000000..4eaf36d1e2 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslEchoTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.handler.ssl.SslContext; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketSslEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketSslEchoTest extends SocketSslEchoTest { + public EpollDomainSocketSslEchoTest( + SslContext serverCtx, SslContext clientCtx, Renegotiation renegotiation, + boolean serverUsesDelegatedTaskExecutor, boolean clientUsesDelegatedTaskExecutor, + boolean autoRead, boolean useChunkedWriteHandler, boolean useCompositeByteBuf) { + + super(serverCtx, clientCtx, renegotiation, + serverUsesDelegatedTaskExecutor, clientUsesDelegatedTaskExecutor, + autoRead, useChunkedWriteHandler, useCompositeByteBuf); + } + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java new file mode 100644 index 0000000000..a1ed0c5ac2 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.handler.ssl.SslContext; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketSslGreetingTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketSslGreetingTest extends SocketSslGreetingTest { + + public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) { + super(serverCtx, clientCtx); + } + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStartTlsTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStartTlsTest.java new file mode 100644 index 0000000000..f60981bfee --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStartTlsTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.handler.ssl.SslContext; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStartTlsTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketStartTlsTest extends SocketStartTlsTest { + + public EpollDomainSocketStartTlsTest(SslContext serverCtx, SslContext clientCtx) { + super(serverCtx, clientCtx); + } + + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStringEchoTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStringEchoTest.java new file mode 100644 index 0000000000..1b5a032d91 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketStringEchoTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStringEchoTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketStringEchoTest extends SocketStringEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index 6d56c6b891..cd7208bb0d 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -29,12 +29,15 @@ import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; import io.netty.testsuite.transport.socket.SocketTestPermutation; import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.File; +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; class EpollSocketTestPermutation extends SocketTestPermutation { - static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); + static final EpollSocketTestPermutation INSTANCE = new EpollSocketTestPermutation(); static final EventLoopGroup EPOLL_BOSS_GROUP = new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true)); @@ -119,4 +122,44 @@ class EpollSocketTestPermutation extends SocketTestPermutation { ); return combo(bfs, bfs); } + + public List> domainSocket() { + + List> list = + combo(serverDomainSocket(), clientDomainSocket()); + return list; + } + + public List> serverDomainSocket() { + return Collections.>singletonList( + new BootstrapFactory() { + @Override + public ServerBootstrap newInstance() { + return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP) + .channel(EpollServerDomainSocketChannel.class); + } + } + ); + } + + public List> clientDomainSocket() { + return Collections.>singletonList( + new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDomainSocketChannel.class); + } + } + ); + } + + public static DomainSocketAddress newSocketAddress() { + try { + File file = File.createTempFile("netty", "dsocket"); + file.delete(); + return new DomainSocketAddress(file); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } }