Add support for Unix Domain Sockets when using native epoll transport
Motivation: Using Unix Domain Sockets can be very useful when communication should take place on the same host and has less overhead then using loopback. We should support this with the native epoll transport. Modifications: - Add support for Unix Domain Sockets. - Adjust testsuite to be able to reuse tests. Result: Unix Domain Sockets are now support when using native epoll transport.
This commit is contained in:
parent
108a95caca
commit
b898bdda84
@ -24,11 +24,12 @@ import io.netty.testsuite.util.TestUtils;
|
||||
import io.netty.util.NetUtil;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest<Bootstrap> {
|
||||
|
||||
protected volatile InetSocketAddress addr;
|
||||
protected volatile SocketAddress addr;
|
||||
|
||||
protected AbstractClientSocketTest() {
|
||||
super(Bootstrap.class);
|
||||
@ -41,8 +42,13 @@ public abstract class AbstractClientSocketTest extends AbstractTestsuiteTest<Boo
|
||||
|
||||
@Override
|
||||
protected void configure(Bootstrap bootstrap, ByteBufAllocator allocator) {
|
||||
addr = new InetSocketAddress(NetUtil.LOCALHOST, TestUtils.getFreePort());
|
||||
addr = newSocketAddress();
|
||||
bootstrap.remoteAddress(addr);
|
||||
bootstrap.option(ChannelOption.ALLOCATOR, allocator);
|
||||
}
|
||||
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return new InetSocketAddress(
|
||||
NetUtil.LOCALHOST, TestUtils.getFreePort());
|
||||
}
|
||||
}
|
||||
|
@ -24,11 +24,12 @@ import io.netty.testsuite.util.TestUtils;
|
||||
import io.netty.util.NetUtil;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class AbstractServerSocketTest extends AbstractTestsuiteTest<ServerBootstrap> {
|
||||
|
||||
protected volatile InetSocketAddress addr;
|
||||
protected volatile SocketAddress addr;
|
||||
|
||||
protected AbstractServerSocketTest() {
|
||||
super(ServerBootstrap.class);
|
||||
@ -41,10 +42,14 @@ public abstract class AbstractServerSocketTest extends AbstractTestsuiteTest<Ser
|
||||
|
||||
@Override
|
||||
protected void configure(ServerBootstrap bootstrap, ByteBufAllocator allocator) {
|
||||
addr = new InetSocketAddress(
|
||||
NetUtil.LOCALHOST, TestUtils.getFreePort());
|
||||
addr = newSocketAddress();
|
||||
bootstrap.localAddress(addr);
|
||||
bootstrap.option(ChannelOption.ALLOCATOR, allocator);
|
||||
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
|
||||
}
|
||||
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return new InetSocketAddress(
|
||||
NetUtil.LOCALHOST, TestUtils.getFreePort());
|
||||
}
|
||||
}
|
||||
|
@ -25,11 +25,12 @@ import io.netty.testsuite.util.TestUtils;
|
||||
import io.netty.util.NetUtil;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class AbstractSocketTest extends AbstractComboTestsuiteTest<ServerBootstrap, Bootstrap> {
|
||||
|
||||
protected volatile InetSocketAddress addr;
|
||||
protected volatile SocketAddress addr;
|
||||
|
||||
protected AbstractSocketTest() {
|
||||
super(ServerBootstrap.class, Bootstrap.class);
|
||||
@ -42,12 +43,16 @@ public abstract class AbstractSocketTest extends AbstractComboTestsuiteTest<Serv
|
||||
|
||||
@Override
|
||||
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
|
||||
addr = new InetSocketAddress(
|
||||
NetUtil.LOCALHOST, TestUtils.getFreePort());
|
||||
addr = newSocketAddress();
|
||||
bootstrap.localAddress(addr);
|
||||
bootstrap.option(ChannelOption.ALLOCATOR, allocator);
|
||||
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
|
||||
bootstrap2.remoteAddress(addr);
|
||||
bootstrap2.option(ChannelOption.ALLOCATOR, allocator);
|
||||
}
|
||||
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return new InetSocketAddress(
|
||||
NetUtil.LOCALHOST, TestUtils.getFreePort());
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import org.junit.AfterClass;
|
||||
@ -129,15 +128,15 @@ public class SocketEchoTest extends AbstractSocketTest {
|
||||
final EchoHandler ch = new EchoHandler(autoRead);
|
||||
|
||||
if (additionalExecutor) {
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel c) throws Exception {
|
||||
protected void initChannel(Channel c) throws Exception {
|
||||
c.pipeline().addLast(group, sh);
|
||||
}
|
||||
});
|
||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel c) throws Exception {
|
||||
protected void initChannel(Channel c) throws Exception {
|
||||
c.pipeline().addLast(group, ch);
|
||||
}
|
||||
});
|
||||
|
@ -23,7 +23,6 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.FixedLengthFrameDecoder;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -64,17 +63,17 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
|
||||
final EchoHandler sh = new EchoHandler(autoRead);
|
||||
final EchoHandler ch = new EchoHandler(autoRead);
|
||||
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
|
||||
sch.pipeline().addAfter("decoder", "handler", sh);
|
||||
}
|
||||
});
|
||||
|
||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
|
||||
sch.pipeline().addAfter("decoder", "handler", ch);
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.serialization.ClassResolvers;
|
||||
import io.netty.handler.codec.serialization.ObjectDecoder;
|
||||
import io.netty.handler.codec.serialization.ObjectEncoder;
|
||||
@ -72,9 +71,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
|
||||
final EchoHandler sh = new EchoHandler(autoRead);
|
||||
final EchoHandler ch = new EchoHandler(autoRead);
|
||||
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
sch.pipeline().addLast(
|
||||
new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
|
||||
new ObjectEncoder(),
|
||||
@ -82,9 +81,9 @@ public class SocketObjectEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
});
|
||||
|
||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
sch.pipeline().addLast(
|
||||
new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
|
||||
new ObjectEncoder(),
|
||||
|
@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.ssl.JdkSslClientContext;
|
||||
import io.netty.handler.ssl.JdkSslServerContext;
|
||||
import io.netty.handler.ssl.OpenSsl;
|
||||
@ -182,8 +181,8 @@ public class SocketSslEchoTest extends AbstractSocketTest {
|
||||
private final AtomicInteger clientNegoCounter = new AtomicInteger();
|
||||
private final AtomicInteger serverNegoCounter = new AtomicInteger();
|
||||
|
||||
private volatile SocketChannel clientChannel;
|
||||
private volatile SocketChannel serverChannel;
|
||||
private volatile Channel clientChannel;
|
||||
private volatile Channel serverChannel;
|
||||
|
||||
private volatile SslHandler clientSslHandler;
|
||||
private volatile SslHandler serverSslHandler;
|
||||
@ -222,10 +221,10 @@ public class SocketSslEchoTest extends AbstractSocketTest {
|
||||
final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool();
|
||||
reset();
|
||||
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
serverChannel = sch;
|
||||
|
||||
if (serverUsesDelegatedTaskExecutor) {
|
||||
@ -243,10 +242,10 @@ public class SocketSslEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
});
|
||||
|
||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
clientChannel = sch;
|
||||
|
||||
if (clientUsesDelegatedTaskExecutor) {
|
||||
|
@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import io.netty.handler.ssl.JdkSslClientContext;
|
||||
@ -117,9 +116,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
|
||||
final ServerHandler sh = new ServerHandler();
|
||||
final ClientHandler ch = new ClientHandler();
|
||||
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
ChannelPipeline p = sch.pipeline();
|
||||
p.addLast(serverCtx.newHandler(sch.alloc()));
|
||||
p.addLast(new LoggingHandler(LOG_LEVEL));
|
||||
@ -127,9 +126,9 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
|
||||
}
|
||||
});
|
||||
|
||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
ChannelPipeline p = sch.pipeline();
|
||||
p.addLast(clientCtx.newHandler(sch.alloc()));
|
||||
p.addLast(new LoggingHandler(LOG_LEVEL));
|
||||
|
@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.LineBasedFrameDecoder;
|
||||
import io.netty.handler.codec.string.StringDecoder;
|
||||
import io.netty.handler.codec.string.StringEncoder;
|
||||
@ -150,9 +149,9 @@ public class SocketStartTlsTest extends AbstractSocketTest {
|
||||
final StartTlsServerHandler sh = new StartTlsServerHandler(sse, autoRead);
|
||||
final StartTlsClientHandler ch = new StartTlsClientHandler(cse, autoRead);
|
||||
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
ChannelPipeline p = sch.pipeline();
|
||||
p.addLast("logger", new LoggingHandler(LOG_LEVEL));
|
||||
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
|
||||
@ -160,9 +159,9 @@ public class SocketStartTlsTest extends AbstractSocketTest {
|
||||
}
|
||||
});
|
||||
|
||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
ChannelPipeline p = sch.pipeline();
|
||||
p.addLast("logger", new LoggingHandler(LOG_LEVEL));
|
||||
p.addLast(new LineBasedFrameDecoder(64), new StringDecoder(), new StringEncoder());
|
||||
|
@ -21,7 +21,6 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
import io.netty.handler.codec.Delimiters;
|
||||
import io.netty.handler.codec.string.StringDecoder;
|
||||
@ -74,9 +73,9 @@ public class SocketStringEchoTest extends AbstractSocketTest {
|
||||
final StringEchoHandler sh = new StringEchoHandler(autoRead);
|
||||
final StringEchoHandler ch = new StringEchoHandler(autoRead);
|
||||
|
||||
sb.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
|
||||
sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
|
||||
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
|
||||
@ -84,9 +83,9 @@ public class SocketStringEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
});
|
||||
|
||||
cb.handler(new ChannelInitializer<SocketChannel>() {
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel sch) throws Exception {
|
||||
public void initChannel(Channel sch) throws Exception {
|
||||
sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
|
||||
sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
|
||||
sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/eventfd.h>
|
||||
#include <sys/sendfile.h>
|
||||
#include <sys/un.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <netinet/in.h>
|
||||
#include <sys/types.h>
|
||||
@ -1390,3 +1391,101 @@ JNIEXPORT jstring JNICALL Java_io_netty_channel_epoll_Native_strError(JNIEnv* en
|
||||
char* err = strerror(error);
|
||||
return (*env)->NewStringUTF(env, err);
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socketDomain(JNIEnv* env, jclass clazz) {
|
||||
int fd = socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
||||
if (fd == -1) {
|
||||
return -errno;
|
||||
}
|
||||
return fd;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_bindDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) {
|
||||
struct sockaddr_un addr;
|
||||
|
||||
memset(&addr, 0, sizeof(addr));
|
||||
addr.sun_family = AF_UNIX;
|
||||
|
||||
const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0);
|
||||
memcpy(addr.sun_path, socket_path, strlen(socket_path));
|
||||
|
||||
if (unlink(socket_path) == -1 && errno != ENOENT) {
|
||||
return -errno;
|
||||
}
|
||||
|
||||
int res = bind(fd, (struct sockaddr*) &addr, sizeof(addr));
|
||||
(*env)->ReleaseStringUTFChars(env, socketPath, socket_path);
|
||||
|
||||
if (res == -1) {
|
||||
return -errno;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_connectDomainSocket(JNIEnv* env, jclass clazz, jint fd, jstring socketPath) {
|
||||
struct sockaddr_un addr;
|
||||
|
||||
memset(&addr, 0, sizeof(addr));
|
||||
addr.sun_family = AF_UNIX;
|
||||
|
||||
const char* socket_path = (*env)->GetStringUTFChars(env, socketPath, 0);
|
||||
strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);
|
||||
|
||||
int res;
|
||||
int err;
|
||||
do {
|
||||
res = connect(fd, (struct sockaddr*) &addr, sizeof(addr));
|
||||
} while (res == -1 && ((err = errno) == EINTR));
|
||||
|
||||
(*env)->ReleaseStringUTFChars(env, socketPath, socket_path);
|
||||
|
||||
if (res < 0) {
|
||||
return -err;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_recvFd(JNIEnv* env, jclass clazz, jint fd) {
|
||||
jint socketFd;
|
||||
struct msghdr msg;
|
||||
struct iovec iov[1];
|
||||
struct cmsghdr* ctrl_msg = NULL;
|
||||
char msg_buffer[1];
|
||||
char elem_buffer[CMSG_SPACE(sizeof(int))];
|
||||
|
||||
/* Fill all with 0 */
|
||||
memset(&msg, 0, sizeof(struct msghdr));
|
||||
memset(elem_buffer, 0, CMSG_SPACE(sizeof(int)));
|
||||
|
||||
iov[0].iov_base = msg_buffer;
|
||||
iov[0].iov_len = 1;
|
||||
|
||||
msg.msg_iov = iov;
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_control = elem_buffer;
|
||||
msg.msg_controllen = CMSG_SPACE(sizeof(int));
|
||||
|
||||
if(recvmsg(fd, &msg, MSG_CMSG_CLOEXEC) < 0) {
|
||||
// All read, return -1
|
||||
return -1;
|
||||
}
|
||||
|
||||
if((msg.msg_flags & MSG_CTRUNC) == MSG_CTRUNC) {
|
||||
// Not enough space ?!?!
|
||||
return -1;
|
||||
}
|
||||
|
||||
// skip empty entries
|
||||
for(ctrl_msg = CMSG_FIRSTHDR(&msg); ctrl_msg != NULL; ctrl_msg = CMSG_NXTHDR(&msg, ctrl_msg)) {
|
||||
if((ctrl_msg->cmsg_level == SOL_SOCKET) && (ctrl_msg->cmsg_type == SCM_RIGHTS)) {
|
||||
socketFd = *((int *) CMSG_DATA(ctrl_msg));
|
||||
// set as non blocking as we want to use it with epoll
|
||||
if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) {
|
||||
return -errno;
|
||||
}
|
||||
return socketFd;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -27,6 +27,7 @@ import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.UnresolvedAddressException;
|
||||
|
||||
abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
@ -49,6 +50,10 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
this.active = active;
|
||||
}
|
||||
|
||||
protected final int fd() {
|
||||
return fd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
return active;
|
||||
@ -71,16 +76,6 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
Native.close(fd);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress remoteAddress() {
|
||||
return (InetSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress localAddress() {
|
||||
return (InetSocketAddress) super.localAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDisconnect() throws Exception {
|
||||
doClose();
|
||||
@ -214,6 +209,72 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read bytes into the given {@link ByteBuf} and return the amount.
|
||||
*/
|
||||
protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
|
||||
int writerIndex = byteBuf.writerIndex();
|
||||
int localReadAmount;
|
||||
if (byteBuf.hasMemoryAddress()) {
|
||||
localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
|
||||
} else {
|
||||
ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
|
||||
localReadAmount = Native.read(fd, buf, buf.position(), buf.limit());
|
||||
}
|
||||
if (localReadAmount > 0) {
|
||||
byteBuf.writerIndex(writerIndex + localReadAmount);
|
||||
}
|
||||
return localReadAmount;
|
||||
}
|
||||
|
||||
protected final int doWriteBytes(ByteBuf buf) throws Exception {
|
||||
int readableBytes = buf.readableBytes();
|
||||
int writtenBytes = 0;
|
||||
if (buf.hasMemoryAddress()) {
|
||||
long memoryAddress = buf.memoryAddress();
|
||||
int readerIndex = buf.readerIndex();
|
||||
int writerIndex = buf.writerIndex();
|
||||
for (;;) {
|
||||
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex);
|
||||
if (localFlushedAmount > 0) {
|
||||
writtenBytes += localFlushedAmount;
|
||||
if (writtenBytes == readableBytes) {
|
||||
return writtenBytes;
|
||||
}
|
||||
readerIndex += localFlushedAmount;
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
return writtenBytes;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ByteBuffer nioBuf;
|
||||
if (buf.nioBufferCount() == 1) {
|
||||
nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes());
|
||||
} else {
|
||||
nioBuf = buf.nioBuffer();
|
||||
}
|
||||
for (;;) {
|
||||
int pos = nioBuf.position();
|
||||
int limit = nioBuf.limit();
|
||||
int localFlushedAmount = Native.write(fd, nioBuf, pos, limit);
|
||||
if (localFlushedAmount > 0) {
|
||||
nioBuf.position(pos + localFlushedAmount);
|
||||
writtenBytes += localFlushedAmount;
|
||||
if (writtenBytes == readableBytes) {
|
||||
return writtenBytes;
|
||||
}
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return writtenBytes;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
|
||||
protected boolean readPending;
|
||||
|
||||
|
@ -0,0 +1,114 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.ServerChannel;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
|
||||
public abstract class AbstractEpollServerChannel extends AbstractEpollChannel implements ServerChannel {
|
||||
|
||||
protected AbstractEpollServerChannel(int fd) {
|
||||
super(fd, Native.EPOLLACCEPT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return loop instanceof EpollEventLoop;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InetSocketAddress remoteAddress0() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractEpollUnsafe newUnsafe() {
|
||||
return new EpollServerSocketUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object filterOutboundMessage(Object msg) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
protected abstract Channel newChildChannel(int fd) throws Exception;
|
||||
|
||||
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
|
||||
|
||||
@Override
|
||||
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
|
||||
// Connect not supported by ServerChannel implementations
|
||||
channelPromise.setFailure(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollInReady() {
|
||||
assert eventLoop().inEventLoop();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
Throwable exception = null;
|
||||
try {
|
||||
try {
|
||||
for (;;) {
|
||||
int socketFd = Native.accept(fd);
|
||||
if (socketFd == -1) {
|
||||
// this means everything was handled for now
|
||||
break;
|
||||
}
|
||||
readPending = false;
|
||||
|
||||
try {
|
||||
pipeline.fireChannelRead(newChildChannel(socketFd));
|
||||
} catch (Throwable t) {
|
||||
// keep on reading as we use epoll ET and need to consume everything from the socket
|
||||
pipeline.fireChannelReadComplete();
|
||||
pipeline.fireExceptionCaught(t);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
exception = t;
|
||||
}
|
||||
pipeline.fireChannelReadComplete();
|
||||
|
||||
if (exception != null) {
|
||||
pipeline.fireExceptionCaught(exception);
|
||||
}
|
||||
} finally {
|
||||
// Check if there is a readPending which was not processed yet.
|
||||
// This could be for two reasons:
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config().isAutoRead() && !readPending) {
|
||||
clearEpollIn0();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,653 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ConnectTimeoutException;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
|
||||
private static final String EXPECTED_TYPES =
|
||||
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
||||
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
|
||||
|
||||
private volatile boolean inputShutdown;
|
||||
private volatile boolean outputShutdown;
|
||||
|
||||
protected AbstractEpollStreamChannel(Channel parent, int fd) {
|
||||
super(parent, fd, Native.EPOLLIN, true);
|
||||
}
|
||||
|
||||
protected AbstractEpollStreamChannel(int fd) {
|
||||
super(fd, Native.EPOLLIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractEpollUnsafe newUnsafe() {
|
||||
return new EpollStreamUnsafe();
|
||||
}
|
||||
|
||||
/**
|
||||
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
|
||||
* @param buf the {@link ByteBuf} from which the bytes should be written
|
||||
*/
|
||||
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
|
||||
int readableBytes = buf.readableBytes();
|
||||
if (readableBytes == 0) {
|
||||
in.remove();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
|
||||
int writtenBytes = doWriteBytes(buf);
|
||||
in.removeBytes(writtenBytes);
|
||||
return writtenBytes == readableBytes;
|
||||
} else {
|
||||
ByteBuffer[] nioBuffers = buf.nioBuffers();
|
||||
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
|
||||
|
||||
long expectedWrittenBytes = array.size();
|
||||
final long initialExpectedWrittenBytes = expectedWrittenBytes;
|
||||
|
||||
int cnt = array.count();
|
||||
|
||||
assert expectedWrittenBytes != 0;
|
||||
assert cnt != 0;
|
||||
|
||||
boolean done = false;
|
||||
int offset = 0;
|
||||
int end = offset + cnt;
|
||||
for (;;) {
|
||||
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
|
||||
if (expectedWrittenBytes == 0) {
|
||||
// Written everything, just break out here (fast-path)
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
|
||||
do {
|
||||
long bytes = array.processWritten(offset, localWrittenBytes);
|
||||
if (bytes == -1) {
|
||||
// incomplete write
|
||||
break;
|
||||
} else {
|
||||
offset++;
|
||||
cnt--;
|
||||
localWrittenBytes -= bytes;
|
||||
}
|
||||
} while (offset < end && localWrittenBytes > 0);
|
||||
}
|
||||
|
||||
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
||||
return done;
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(
|
||||
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
|
||||
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
|
||||
|
||||
assert expectedWrittenBytes != 0;
|
||||
final long initialExpectedWrittenBytes = expectedWrittenBytes;
|
||||
|
||||
boolean done = false;
|
||||
int offset = 0;
|
||||
int end = offset + nioBufferCnt;
|
||||
for (;;) {
|
||||
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
|
||||
if (expectedWrittenBytes == 0) {
|
||||
// Written everything, just break out here (fast-path)
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
do {
|
||||
ByteBuffer buffer = nioBuffers[offset];
|
||||
int pos = buffer.position();
|
||||
int bytes = buffer.limit() - pos;
|
||||
if (bytes > localWrittenBytes) {
|
||||
buffer.position(pos + (int) localWrittenBytes);
|
||||
// incomplete write
|
||||
break;
|
||||
} else {
|
||||
offset++;
|
||||
nioBufferCnt--;
|
||||
localWrittenBytes -= bytes;
|
||||
}
|
||||
} while (offset < end && localWrittenBytes > 0);
|
||||
}
|
||||
|
||||
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
||||
return done;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a {@link DefaultFileRegion}
|
||||
*
|
||||
* @param region the {@link DefaultFileRegion} from which the bytes should be written
|
||||
* @return amount the amount of written bytes
|
||||
*/
|
||||
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||
final long regionCount = region.count();
|
||||
if (region.transfered() >= regionCount) {
|
||||
in.remove();
|
||||
return true;
|
||||
}
|
||||
|
||||
final long baseOffset = region.position();
|
||||
boolean done = false;
|
||||
long flushedAmount = 0;
|
||||
|
||||
for (;;) {
|
||||
final long offset = region.transfered();
|
||||
final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset);
|
||||
if (localFlushedAmount == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
|
||||
flushedAmount += localFlushedAmount;
|
||||
if (region.transfered() >= regionCount) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (flushedAmount > 0) {
|
||||
in.progress(flushedAmount);
|
||||
}
|
||||
|
||||
if (done) {
|
||||
in.remove();
|
||||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
for (;;) {
|
||||
final int msgCount = in.size();
|
||||
|
||||
if (msgCount == 0) {
|
||||
// Wrote all messages.
|
||||
clearEpollOut();
|
||||
break;
|
||||
}
|
||||
|
||||
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
|
||||
if (msgCount > 1 && in.current() instanceof ByteBuf) {
|
||||
if (!doWriteMultiple(in)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// We do not break the loop here even if the outbound buffer was flushed completely,
|
||||
// because a user might have triggered another write and flush when we notify his or her
|
||||
// listeners.
|
||||
} else { // msgCount == 1
|
||||
if (!doWriteSingle(in)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
|
||||
// The outbound buffer contains only one message or it contains a file region.
|
||||
Object msg = in.current();
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!writeBytes(in, buf)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else if (msg instanceof DefaultFileRegion) {
|
||||
DefaultFileRegion region = (DefaultFileRegion) msg;
|
||||
if (!writeFileRegion(in, region)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
// Should never reach here.
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
|
||||
if (PlatformDependent.hasUnsafe()) {
|
||||
// this means we can cast to IovArray and write the IovArray directly.
|
||||
IovArray array = IovArrayThreadLocal.get(in);
|
||||
int cnt = array.count();
|
||||
if (cnt >= 1) {
|
||||
// TODO: Handle the case where cnt == 1 specially.
|
||||
if (!writeBytesMultiple(in, array)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
|
||||
in.removeBytes(0);
|
||||
}
|
||||
} else {
|
||||
ByteBuffer[] buffers = in.nioBuffers();
|
||||
int cnt = in.nioBufferCount();
|
||||
if (cnt >= 1) {
|
||||
// TODO: Handle the case where cnt == 1 specially.
|
||||
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
|
||||
in.removeBytes(0);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||
if (buf instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
|
||||
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
} else {
|
||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
if (msg instanceof DefaultFileRegion) {
|
||||
return msg;
|
||||
}
|
||||
|
||||
throw new UnsupportedOperationException(
|
||||
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
||||
}
|
||||
|
||||
protected boolean isInputShutdown0() {
|
||||
return inputShutdown;
|
||||
}
|
||||
|
||||
protected boolean isOutputShutdown0() {
|
||||
return outputShutdown || !isActive();
|
||||
}
|
||||
|
||||
protected ChannelFuture shutdownOutput0(final ChannelPromise promise) {
|
||||
EventLoop loop = eventLoop();
|
||||
if (loop.inEventLoop()) {
|
||||
try {
|
||||
Native.shutdown(fd, false, true);
|
||||
outputShutdown = true;
|
||||
promise.setSuccess();
|
||||
} catch (Throwable t) {
|
||||
promise.setFailure(t);
|
||||
}
|
||||
} else {
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
shutdownOutput0(promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the remote peer
|
||||
*/
|
||||
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
|
||||
if (localAddress != null) {
|
||||
Native.bind(fd, localAddress);
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
boolean connected = Native.connect(fd, remoteAddress);
|
||||
if (!connected) {
|
||||
setEpollOut();
|
||||
}
|
||||
success = true;
|
||||
return connected;
|
||||
} finally {
|
||||
if (!success) {
|
||||
doClose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final class EpollStreamUnsafe extends AbstractEpollUnsafe {
|
||||
/**
|
||||
* The future of the current connection attempt. If not null, subsequent
|
||||
* connection attempts will fail.
|
||||
*/
|
||||
private ChannelPromise connectPromise;
|
||||
private ScheduledFuture<?> connectTimeoutFuture;
|
||||
private SocketAddress requestedRemoteAddress;
|
||||
|
||||
private RecvByteBufAllocator.Handle allocHandle;
|
||||
|
||||
private void closeOnRead(ChannelPipeline pipeline) {
|
||||
inputShutdown = true;
|
||||
if (isOpen()) {
|
||||
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
|
||||
clearEpollIn0();
|
||||
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||
} else {
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
|
||||
if (byteBuf != null) {
|
||||
if (byteBuf.isReadable()) {
|
||||
readPending = false;
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
} else {
|
||||
byteBuf.release();
|
||||
}
|
||||
}
|
||||
pipeline.fireChannelReadComplete();
|
||||
pipeline.fireExceptionCaught(cause);
|
||||
if (close || cause instanceof IOException) {
|
||||
closeOnRead(pipeline);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(
|
||||
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
||||
if (!promise.setUncancellable() || !ensureOpen(promise)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (connectPromise != null) {
|
||||
throw new IllegalStateException("connection attempt already made");
|
||||
}
|
||||
|
||||
boolean wasActive = isActive();
|
||||
if (doConnect(remoteAddress, localAddress)) {
|
||||
fulfillConnectPromise(promise, wasActive);
|
||||
} else {
|
||||
connectPromise = promise;
|
||||
requestedRemoteAddress = remoteAddress;
|
||||
|
||||
// Schedule connect timeout.
|
||||
int connectTimeoutMillis = config().getConnectTimeoutMillis();
|
||||
if (connectTimeoutMillis > 0) {
|
||||
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise connectPromise = EpollStreamUnsafe.this.connectPromise;
|
||||
ConnectTimeoutException cause =
|
||||
new ConnectTimeoutException("connection timed out: " + remoteAddress);
|
||||
if (connectPromise != null && connectPromise.tryFailure(cause)) {
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isCancelled()) {
|
||||
if (connectTimeoutFuture != null) {
|
||||
connectTimeoutFuture.cancel(false);
|
||||
}
|
||||
connectPromise = null;
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
closeIfClosed();
|
||||
promise.tryFailure(annotateConnectException(t, remoteAddress));
|
||||
}
|
||||
}
|
||||
|
||||
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
|
||||
if (promise == null) {
|
||||
// Closed via cancellation and the promise has been notified already.
|
||||
return;
|
||||
}
|
||||
active = true;
|
||||
|
||||
// trySuccess() will return false if a user cancelled the connection attempt.
|
||||
boolean promiseSet = promise.trySuccess();
|
||||
|
||||
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
|
||||
// because what happened is what happened.
|
||||
if (!wasActive && isActive()) {
|
||||
pipeline().fireChannelActive();
|
||||
}
|
||||
|
||||
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
||||
if (!promiseSet) {
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
|
||||
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
|
||||
if (promise == null) {
|
||||
// Closed via cancellation and the promise has been notified already.
|
||||
return;
|
||||
}
|
||||
|
||||
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
|
||||
promise.tryFailure(cause);
|
||||
closeIfClosed();
|
||||
}
|
||||
|
||||
private void finishConnect() {
|
||||
// Note this method is invoked by the event loop only if the connection attempt was
|
||||
// neither cancelled nor timed out.
|
||||
|
||||
assert eventLoop().inEventLoop();
|
||||
|
||||
boolean connectStillInProgress = false;
|
||||
try {
|
||||
boolean wasActive = isActive();
|
||||
if (!doFinishConnect()) {
|
||||
connectStillInProgress = true;
|
||||
return;
|
||||
}
|
||||
fulfillConnectPromise(connectPromise, wasActive);
|
||||
} catch (Throwable t) {
|
||||
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
|
||||
} finally {
|
||||
if (!connectStillInProgress) {
|
||||
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
|
||||
// See https://github.com/netty/netty/issues/1770
|
||||
if (connectTimeoutFuture != null) {
|
||||
connectTimeoutFuture.cancel(false);
|
||||
}
|
||||
connectPromise = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollOutReady() {
|
||||
if (connectPromise != null) {
|
||||
// pending connect which is now complete so handle it.
|
||||
finishConnect();
|
||||
} else {
|
||||
super.epollOutReady();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish the connect
|
||||
*/
|
||||
private boolean doFinishConnect() throws Exception {
|
||||
if (Native.finishConnect(fd)) {
|
||||
clearEpollOut();
|
||||
return true;
|
||||
} else {
|
||||
setEpollOut();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollRdHupReady() {
|
||||
if (isActive()) {
|
||||
epollInReady();
|
||||
} else {
|
||||
closeOnRead(pipeline());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollInReady() {
|
||||
final ChannelConfig config = config();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
final ByteBufAllocator allocator = config.getAllocator();
|
||||
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
|
||||
if (allocHandle == null) {
|
||||
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
|
||||
}
|
||||
|
||||
ByteBuf byteBuf = null;
|
||||
boolean close = false;
|
||||
try {
|
||||
int totalReadAmount = 0;
|
||||
for (;;) {
|
||||
// we use a direct buffer here as the native implementations only be able
|
||||
// to handle direct buffers.
|
||||
byteBuf = allocHandle.allocate(allocator);
|
||||
int writable = byteBuf.writableBytes();
|
||||
int localReadAmount = doReadBytes(byteBuf);
|
||||
if (localReadAmount <= 0) {
|
||||
// not was read release the buffer
|
||||
byteBuf.release();
|
||||
close = localReadAmount < 0;
|
||||
break;
|
||||
}
|
||||
readPending = false;
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
byteBuf = null;
|
||||
|
||||
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
|
||||
allocHandle.record(totalReadAmount);
|
||||
|
||||
// Avoid overflow.
|
||||
totalReadAmount = localReadAmount;
|
||||
} else {
|
||||
totalReadAmount += localReadAmount;
|
||||
}
|
||||
|
||||
if (localReadAmount < writable) {
|
||||
// Read less than what the buffer can hold,
|
||||
// which might mean we drained the recv buffer completely.
|
||||
break;
|
||||
}
|
||||
}
|
||||
pipeline.fireChannelReadComplete();
|
||||
allocHandle.record(totalReadAmount);
|
||||
|
||||
if (close) {
|
||||
closeOnRead(pipeline);
|
||||
close = false;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
boolean closed = handleReadException(pipeline, byteBuf, t, close);
|
||||
if (!closed) {
|
||||
// trigger a read again as there may be something left to read and because of epoll ET we
|
||||
// will not get notified again until we read everything from the socket
|
||||
eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
epollInReady();
|
||||
}
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
// Check if there is a readPending which was not processed yet.
|
||||
// This could be for two reasons:
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config.isAutoRead() && !readPending) {
|
||||
clearEpollIn0();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
public final class DomainSocketAddress extends SocketAddress {
|
||||
private final String socketPath;
|
||||
|
||||
public DomainSocketAddress(String socketPath) {
|
||||
if (socketPath == null) {
|
||||
throw new NullPointerException("socketPath");
|
||||
}
|
||||
this.socketPath = socketPath;
|
||||
}
|
||||
|
||||
public DomainSocketAddress(File file) {
|
||||
this(file.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* The path to the domain socket.
|
||||
*/
|
||||
public String path() {
|
||||
return socketPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return path();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof DomainSocketAddress)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return ((DomainSocketAddress) o).socketPath.equals(socketPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return socketPath.hashCode();
|
||||
}
|
||||
}
|
@ -66,6 +66,16 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
config = new EpollDatagramChannelConfig(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress remoteAddress() {
|
||||
return (InetSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress localAddress() {
|
||||
return (InetSocketAddress) super.localAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelMetadata metadata() {
|
||||
return METADATA;
|
||||
@ -252,7 +262,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
InetSocketAddress addr = (InetSocketAddress) localAddress;
|
||||
checkResolvable(addr);
|
||||
Native.bind(fd, addr.getAddress(), addr.getPort());
|
||||
Native.bind(fd, addr);
|
||||
local = Native.localAddress(fd);
|
||||
active = true;
|
||||
}
|
||||
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
||||
private final ChannelConfig config = new DefaultChannelConfig(this);
|
||||
|
||||
private volatile DomainSocketAddress local;
|
||||
private volatile DomainSocketAddress remote;
|
||||
|
||||
EpollDomainSocketChannel(Channel parent, int fd) {
|
||||
super(parent, fd);
|
||||
}
|
||||
|
||||
public EpollDomainSocketChannel() {
|
||||
super(Native.socketDomainFd());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DomainSocketAddress localAddress0() {
|
||||
return local;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DomainSocketAddress remoteAddress0() {
|
||||
return remote;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
Native.bind(fd, localAddress);
|
||||
local = (DomainSocketAddress) localAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
|
||||
if (super.doConnect(remoteAddress, localAddress)) {
|
||||
local = (DomainSocketAddress) localAddress;
|
||||
remote = (DomainSocketAddress) remoteAddress;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DomainSocketAddress remoteAddress() {
|
||||
return (DomainSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DomainSocketAddress localAddress() {
|
||||
return (DomainSocketAddress) super.localAddress();
|
||||
}
|
||||
}
|
@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.MessageSizeEstimator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.util.NetUtil;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static io.netty.channel.ChannelOption.SO_BACKLOG;
|
||||
import static io.netty.channel.ChannelOption.SO_RCVBUF;
|
||||
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
|
||||
|
||||
public class EpollServerChannelConfig extends DefaultChannelConfig {
|
||||
protected final AbstractEpollChannel channel;
|
||||
private volatile int backlog = NetUtil.SOMAXCONN;
|
||||
|
||||
EpollServerChannelConfig(AbstractEpollChannel channel) {
|
||||
super(channel);
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> T getOption(ChannelOption<T> option) {
|
||||
if (option == SO_RCVBUF) {
|
||||
return (T) Integer.valueOf(getReceiveBufferSize());
|
||||
}
|
||||
if (option == SO_REUSEADDR) {
|
||||
return (T) Boolean.valueOf(isReuseAddress());
|
||||
}
|
||||
if (option == SO_BACKLOG) {
|
||||
return (T) Integer.valueOf(getBacklog());
|
||||
}
|
||||
return super.getOption(option);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean setOption(ChannelOption<T> option, T value) {
|
||||
validate(option, value);
|
||||
|
||||
if (option == SO_RCVBUF) {
|
||||
setReceiveBufferSize((Integer) value);
|
||||
} else if (option == SO_REUSEADDR) {
|
||||
setReuseAddress((Boolean) value);
|
||||
} else if (option == SO_BACKLOG) {
|
||||
setBacklog((Integer) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isReuseAddress() {
|
||||
return Native.isReuseAddress(channel.fd) == 1;
|
||||
}
|
||||
|
||||
public EpollServerChannelConfig setReuseAddress(boolean reuseAddress) {
|
||||
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getReceiveBufferSize() {
|
||||
return Native.getReceiveBufferSize(channel.fd);
|
||||
}
|
||||
|
||||
public EpollServerChannelConfig setReceiveBufferSize(int receiveBufferSize) {
|
||||
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getBacklog() {
|
||||
return backlog;
|
||||
}
|
||||
|
||||
public EpollServerChannelConfig setBacklog(int backlog) {
|
||||
if (backlog < 0) {
|
||||
throw new IllegalArgumentException("backlog: " + backlog);
|
||||
}
|
||||
this.backlog = backlog;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
|
||||
super.setConnectTimeoutMillis(connectTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
|
||||
super.setMaxMessagesPerRead(maxMessagesPerRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setWriteSpinCount(int writeSpinCount) {
|
||||
super.setWriteSpinCount(writeSpinCount);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setAllocator(ByteBufAllocator allocator) {
|
||||
super.setAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
|
||||
super.setRecvByteBufAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setAutoRead(boolean autoRead) {
|
||||
super.setAutoRead(autoRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
||||
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
|
||||
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
|
||||
super.setMessageSizeEstimator(estimator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void autoReadCleared() {
|
||||
channel.clearEpollIn();
|
||||
}
|
||||
}
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
|
||||
public final class EpollServerDomainSocketChannel extends AbstractEpollServerChannel {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(
|
||||
EpollServerDomainSocketChannel.class);
|
||||
|
||||
private final EpollServerChannelConfig config = new EpollServerChannelConfig(this);
|
||||
private volatile DomainSocketAddress local;
|
||||
|
||||
public EpollServerDomainSocketChannel() {
|
||||
super(Native.socketDomainFd());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Channel newChildChannel(int fd) throws Exception {
|
||||
return new EpollDomainSocketChannel(this, fd);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DomainSocketAddress localAddress0() {
|
||||
return local;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
Native.bind(fd, localAddress);
|
||||
Native.listen(fd, config.getBacklog());
|
||||
local = (DomainSocketAddress) localAddress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception {
|
||||
try {
|
||||
super.doClose();
|
||||
} finally {
|
||||
DomainSocketAddress local = this.local;
|
||||
if (local != null) {
|
||||
// Delete the socket file if possible.
|
||||
File socketFile = new File(local.path());
|
||||
boolean success = socketFile.delete();
|
||||
if (!success && logger.isDebugEnabled()) {
|
||||
logger.debug("Failed to delete a domain socket file: {}", local.path());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerChannelConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DomainSocketAddress remoteAddress() {
|
||||
return (DomainSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DomainSocketAddress localAddress() {
|
||||
return (DomainSocketAddress) super.localAddress();
|
||||
}
|
||||
}
|
@ -15,9 +15,7 @@
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
|
||||
@ -28,13 +26,13 @@ import java.net.SocketAddress;
|
||||
* {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
|
||||
* maximal performance.
|
||||
*/
|
||||
public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel {
|
||||
public final class EpollServerSocketChannel extends AbstractEpollServerChannel implements ServerSocketChannel {
|
||||
|
||||
private final EpollServerSocketChannelConfig config;
|
||||
private volatile InetSocketAddress local;
|
||||
|
||||
public EpollServerSocketChannel() {
|
||||
super(Native.socketStreamFd(), Native.EPOLLACCEPT);
|
||||
super(Native.socketStreamFd());
|
||||
config = new EpollServerSocketChannelConfig(this);
|
||||
}
|
||||
|
||||
@ -47,12 +45,22 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
InetSocketAddress addr = (InetSocketAddress) localAddress;
|
||||
checkResolvable(addr);
|
||||
Native.bind(fd, addr.getAddress(), addr.getPort());
|
||||
Native.bind(fd, addr);
|
||||
local = Native.localAddress(fd);
|
||||
Native.listen(fd, config.getBacklog());
|
||||
active = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress remoteAddress() {
|
||||
return (InetSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress localAddress() {
|
||||
return (InetSocketAddress) super.localAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerSocketChannelConfig config() {
|
||||
return config;
|
||||
@ -64,74 +72,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InetSocketAddress remoteAddress0() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractEpollUnsafe newUnsafe() {
|
||||
return new EpollServerSocketUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object filterOutboundMessage(Object msg) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
|
||||
|
||||
@Override
|
||||
public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
|
||||
// Connect not supported by ServerChannel implementations
|
||||
channelPromise.setFailure(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollInReady() {
|
||||
assert eventLoop().inEventLoop();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
Throwable exception = null;
|
||||
try {
|
||||
try {
|
||||
for (;;) {
|
||||
int socketFd = Native.accept(fd);
|
||||
if (socketFd == -1) {
|
||||
// this means everything was handled for now
|
||||
break;
|
||||
}
|
||||
try {
|
||||
readPending = false;
|
||||
pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this, socketFd));
|
||||
} catch (Throwable t) {
|
||||
// keep on reading as we use epoll ET and need to consume everything from the socket
|
||||
pipeline.fireChannelReadComplete();
|
||||
pipeline.fireExceptionCaught(t);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
exception = t;
|
||||
}
|
||||
pipeline.fireChannelReadComplete();
|
||||
|
||||
if (exception != null) {
|
||||
pipeline.fireExceptionCaught(exception);
|
||||
}
|
||||
} finally {
|
||||
// Check if there is a readPending which was not processed yet.
|
||||
// This could be for two reasons:
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config.isAutoRead() && !readPending) {
|
||||
clearEpollIn0();
|
||||
}
|
||||
}
|
||||
}
|
||||
protected Channel newChildChannel(int fd) throws Exception {
|
||||
return new EpollSocketChannel(this, fd);
|
||||
}
|
||||
}
|
||||
|
@ -17,27 +17,17 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.MessageSizeEstimator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.socket.ServerSocketChannelConfig;
|
||||
import io.netty.util.NetUtil;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static io.netty.channel.ChannelOption.SO_BACKLOG;
|
||||
import static io.netty.channel.ChannelOption.SO_RCVBUF;
|
||||
import static io.netty.channel.ChannelOption.SO_REUSEADDR;
|
||||
|
||||
public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
|
||||
public final class EpollServerSocketChannelConfig extends EpollServerChannelConfig
|
||||
implements ServerSocketChannelConfig {
|
||||
|
||||
private final EpollServerSocketChannel channel;
|
||||
private volatile int backlog = NetUtil.SOMAXCONN;
|
||||
|
||||
EpollServerSocketChannelConfig(EpollServerSocketChannel channel) {
|
||||
super(channel);
|
||||
this.channel = channel;
|
||||
|
||||
// Use SO_REUSEADDR by default as java.nio does the same.
|
||||
//
|
||||
@ -47,21 +37,12 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG, EpollChannelOption.SO_REUSEPORT);
|
||||
return getOptions(super.getOptions(), EpollChannelOption.SO_REUSEPORT);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> T getOption(ChannelOption<T> option) {
|
||||
if (option == SO_RCVBUF) {
|
||||
return (T) Integer.valueOf(getReceiveBufferSize());
|
||||
}
|
||||
if (option == SO_REUSEADDR) {
|
||||
return (T) Boolean.valueOf(isReuseAddress());
|
||||
}
|
||||
if (option == SO_BACKLOG) {
|
||||
return (T) Integer.valueOf(getBacklog());
|
||||
}
|
||||
if (option == EpollChannelOption.SO_REUSEPORT) {
|
||||
return (T) Boolean.valueOf(isReusePort());
|
||||
}
|
||||
@ -72,13 +53,7 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
|
||||
public <T> boolean setOption(ChannelOption<T> option, T value) {
|
||||
validate(option, value);
|
||||
|
||||
if (option == SO_RCVBUF) {
|
||||
setReceiveBufferSize((Integer) value);
|
||||
} else if (option == SO_REUSEADDR) {
|
||||
setReuseAddress((Boolean) value);
|
||||
} else if (option == SO_BACKLOG) {
|
||||
setBacklog((Integer) value);
|
||||
} else if (option == EpollChannelOption.SO_REUSEPORT) {
|
||||
if (option == EpollChannelOption.SO_REUSEPORT) {
|
||||
setReusePort((Boolean) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
@ -87,26 +62,15 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReuseAddress() {
|
||||
return Native.isReuseAddress(channel.fd) == 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
|
||||
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
|
||||
super.setReuseAddress(reuseAddress);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReceiveBufferSize() {
|
||||
return Native.getReceiveBufferSize(channel.fd);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
|
||||
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
|
||||
|
||||
super.setReceiveBufferSize(receiveBufferSize);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -115,17 +79,9 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBacklog() {
|
||||
return backlog;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollServerSocketChannelConfig setBacklog(int backlog) {
|
||||
if (backlog < 0) {
|
||||
throw new IllegalArgumentException("backlog: " + backlog);
|
||||
}
|
||||
this.backlog = backlog;
|
||||
super.setBacklog(backlog);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -201,9 +157,4 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig
|
||||
Native.setReusePort(channel.fd, reusePort ? 1 : 0);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void autoReadCleared() {
|
||||
channel.clearEpollIn();
|
||||
}
|
||||
}
|
||||
|
@ -15,61 +15,28 @@
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ConnectTimeoutException;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
|
||||
* maximal performance.
|
||||
*/
|
||||
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
|
||||
|
||||
private static final String EXPECTED_TYPES =
|
||||
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
||||
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
|
||||
public final class EpollSocketChannel extends AbstractEpollStreamChannel implements SocketChannel {
|
||||
|
||||
private final EpollSocketChannelConfig config;
|
||||
|
||||
/**
|
||||
* The future of the current connection attempt. If not null, subsequent
|
||||
* connection attempts will fail.
|
||||
*/
|
||||
private ChannelPromise connectPromise;
|
||||
private ScheduledFuture<?> connectTimeoutFuture;
|
||||
private SocketAddress requestedRemoteAddress;
|
||||
|
||||
private volatile InetSocketAddress local;
|
||||
private volatile InetSocketAddress remote;
|
||||
private volatile boolean inputShutdown;
|
||||
private volatile boolean outputShutdown;
|
||||
|
||||
EpollSocketChannel(Channel parent, int fd) {
|
||||
super(parent, fd, Native.EPOLLIN, true);
|
||||
super(parent, fd);
|
||||
config = new EpollSocketChannelConfig(this);
|
||||
// Directly cache the remote and local addresses
|
||||
// See https://github.com/netty/netty/issues/2359
|
||||
@ -78,7 +45,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
}
|
||||
|
||||
public EpollSocketChannel() {
|
||||
super(Native.socketStreamFd(), Native.EPOLLIN);
|
||||
super(Native.socketStreamFd());
|
||||
config = new EpollSocketChannelConfig(this);
|
||||
}
|
||||
|
||||
@ -99,8 +66,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractEpollUnsafe newUnsafe() {
|
||||
return new EpollSocketUnsafe();
|
||||
public InetSocketAddress remoteAddress() {
|
||||
return (InetSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress localAddress() {
|
||||
return (InetSocketAddress) super.localAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -124,325 +96,10 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
@Override
|
||||
protected void doBind(SocketAddress local) throws Exception {
|
||||
InetSocketAddress localAddress = (InetSocketAddress) local;
|
||||
Native.bind(fd, localAddress.getAddress(), localAddress.getPort());
|
||||
Native.bind(fd, localAddress);
|
||||
this.local = Native.localAddress(fd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
|
||||
* @param buf the {@link ByteBuf} from which the bytes should be written
|
||||
*/
|
||||
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
|
||||
int readableBytes = buf.readableBytes();
|
||||
if (readableBytes == 0) {
|
||||
in.remove();
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean done = false;
|
||||
long writtenBytes = 0;
|
||||
if (buf.hasMemoryAddress()) {
|
||||
long memoryAddress = buf.memoryAddress();
|
||||
int readerIndex = buf.readerIndex();
|
||||
int writerIndex = buf.writerIndex();
|
||||
for (;;) {
|
||||
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex);
|
||||
if (localFlushedAmount > 0) {
|
||||
writtenBytes += localFlushedAmount;
|
||||
if (writtenBytes == readableBytes) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
readerIndex += localFlushedAmount;
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
in.removeBytes(writtenBytes);
|
||||
return done;
|
||||
} else if (buf.nioBufferCount() == 1) {
|
||||
int readerIndex = buf.readerIndex();
|
||||
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
|
||||
for (;;) {
|
||||
int pos = nioBuf.position();
|
||||
int limit = nioBuf.limit();
|
||||
int localFlushedAmount = Native.write(fd, nioBuf, pos, limit);
|
||||
if (localFlushedAmount > 0) {
|
||||
nioBuf.position(pos + localFlushedAmount);
|
||||
writtenBytes += localFlushedAmount;
|
||||
if (writtenBytes == readableBytes) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
in.removeBytes(writtenBytes);
|
||||
return done;
|
||||
} else {
|
||||
ByteBuffer[] nioBuffers = buf.nioBuffers();
|
||||
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
|
||||
|
||||
long expectedWrittenBytes = array.size();
|
||||
int cnt = array.count();
|
||||
|
||||
assert expectedWrittenBytes != 0;
|
||||
assert cnt != 0;
|
||||
|
||||
boolean done = false;
|
||||
long writtenBytes = 0;
|
||||
int offset = 0;
|
||||
int end = offset + cnt;
|
||||
for (;;) {
|
||||
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
|
||||
if (expectedWrittenBytes == 0) {
|
||||
// Written everything, just break out here (fast-path)
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
|
||||
do {
|
||||
long bytes = array.processWritten(offset, localWrittenBytes);
|
||||
if (bytes == -1) {
|
||||
// incomplete write
|
||||
break;
|
||||
} else {
|
||||
offset++;
|
||||
cnt--;
|
||||
localWrittenBytes -= bytes;
|
||||
}
|
||||
} while (offset < end && localWrittenBytes > 0);
|
||||
}
|
||||
|
||||
in.removeBytes(writtenBytes);
|
||||
return done;
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(
|
||||
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
|
||||
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
|
||||
|
||||
assert expectedWrittenBytes != 0;
|
||||
|
||||
boolean done = false;
|
||||
long writtenBytes = 0;
|
||||
int offset = 0;
|
||||
int end = offset + nioBufferCnt;
|
||||
for (;;) {
|
||||
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
|
||||
if (expectedWrittenBytes == 0) {
|
||||
// Written everything, just break out here (fast-path)
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
do {
|
||||
ByteBuffer buffer = nioBuffers[offset];
|
||||
int pos = buffer.position();
|
||||
int bytes = buffer.limit() - pos;
|
||||
if (bytes > localWrittenBytes) {
|
||||
buffer.position(pos + (int) localWrittenBytes);
|
||||
// incomplete write
|
||||
break;
|
||||
} else {
|
||||
offset++;
|
||||
nioBufferCnt--;
|
||||
localWrittenBytes -= bytes;
|
||||
}
|
||||
} while (offset < end && localWrittenBytes > 0);
|
||||
}
|
||||
|
||||
in.removeBytes(writtenBytes);
|
||||
return done;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a {@link DefaultFileRegion}
|
||||
*
|
||||
* @param region the {@link DefaultFileRegion} from which the bytes should be written
|
||||
* @return amount the amount of written bytes
|
||||
*/
|
||||
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||
final long regionCount = region.count();
|
||||
if (region.transfered() >= regionCount) {
|
||||
in.remove();
|
||||
return true;
|
||||
}
|
||||
|
||||
final long baseOffset = region.position();
|
||||
boolean done = false;
|
||||
long flushedAmount = 0;
|
||||
|
||||
for (;;) {
|
||||
final long offset = region.transfered();
|
||||
final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset);
|
||||
if (localFlushedAmount == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setEpollOut();
|
||||
break;
|
||||
}
|
||||
|
||||
flushedAmount += localFlushedAmount;
|
||||
if (region.transfered() >= regionCount) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (flushedAmount > 0) {
|
||||
in.progress(flushedAmount);
|
||||
}
|
||||
|
||||
if (done) {
|
||||
in.remove();
|
||||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
for (;;) {
|
||||
final int msgCount = in.size();
|
||||
|
||||
if (msgCount == 0) {
|
||||
// Wrote all messages.
|
||||
clearEpollOut();
|
||||
break;
|
||||
}
|
||||
|
||||
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
|
||||
if (msgCount > 1 && in.current() instanceof ByteBuf) {
|
||||
if (!doWriteMultiple(in)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// We do not break the loop here even if the outbound buffer was flushed completely,
|
||||
// because a user might have triggered another write and flush when we notify his or her
|
||||
// listeners.
|
||||
} else { // msgCount == 1
|
||||
if (!doWriteSingle(in)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
|
||||
// The outbound buffer contains only one message or it contains a file region.
|
||||
Object msg = in.current();
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!writeBytes(in, buf)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else if (msg instanceof DefaultFileRegion) {
|
||||
DefaultFileRegion region = (DefaultFileRegion) msg;
|
||||
if (!writeFileRegion(in, region)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
// Should never reach here.
|
||||
throw new Error();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
|
||||
if (PlatformDependent.hasUnsafe()) {
|
||||
// this means we can cast to IovArray and write the IovArray directly.
|
||||
IovArray array = IovArrayThreadLocal.get(in);
|
||||
int cnt = array.count();
|
||||
if (cnt >= 1) {
|
||||
// TODO: Handle the case where cnt == 1 specially.
|
||||
if (!writeBytesMultiple(in, array)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
|
||||
in.removeBytes(0);
|
||||
}
|
||||
} else {
|
||||
ByteBuffer[] buffers = in.nioBuffers();
|
||||
int cnt = in.nioBufferCount();
|
||||
if (cnt >= 1) {
|
||||
// TODO: Handle the case where cnt == 1 specially.
|
||||
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
|
||||
in.removeBytes(0);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
|
||||
if (buf instanceof CompositeByteBuf) {
|
||||
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
|
||||
// in the CompositeByteBuf are backed by a memoryAddress.
|
||||
CompositeByteBuf comp = (CompositeByteBuf) buf;
|
||||
if (!comp.isDirect() || comp.nioBufferCount() > Native.IOV_MAX) {
|
||||
// more then 1024 buffers for gathering writes so just do a memory copy.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
} else {
|
||||
// We can only handle buffers with memory address so we need to copy if a non direct is
|
||||
// passed to write.
|
||||
buf = newDirectBuffer(buf);
|
||||
assert buf.hasMemoryAddress();
|
||||
}
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
if (msg instanceof DefaultFileRegion) {
|
||||
return msg;
|
||||
}
|
||||
|
||||
throw new UnsupportedOperationException(
|
||||
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EpollSocketChannelConfig config() {
|
||||
return config;
|
||||
@ -450,12 +107,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
|
||||
@Override
|
||||
public boolean isInputShutdown() {
|
||||
return inputShutdown;
|
||||
return isInputShutdown0();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOutputShutdown() {
|
||||
return outputShutdown || !isActive();
|
||||
return isOutputShutdown0();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -465,24 +122,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
|
||||
@Override
|
||||
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
|
||||
EventLoop loop = eventLoop();
|
||||
if (loop.inEventLoop()) {
|
||||
try {
|
||||
Native.shutdown(fd, false, true);
|
||||
outputShutdown = true;
|
||||
promise.setSuccess();
|
||||
} catch (Throwable t) {
|
||||
promise.setFailure(t);
|
||||
}
|
||||
} else {
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
shutdownOutput(promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
return promise;
|
||||
return shutdownOutput0(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -490,307 +130,17 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
||||
return (ServerSocketChannel) super.parent();
|
||||
}
|
||||
|
||||
final class EpollSocketUnsafe extends AbstractEpollUnsafe {
|
||||
private RecvByteBufAllocator.Handle allocHandle;
|
||||
|
||||
private void closeOnRead(ChannelPipeline pipeline) {
|
||||
inputShutdown = true;
|
||||
if (isOpen()) {
|
||||
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
|
||||
clearEpollIn0();
|
||||
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||
} else {
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
|
||||
if (localAddress != null) {
|
||||
checkResolvable((InetSocketAddress) localAddress);
|
||||
}
|
||||
|
||||
private boolean handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
|
||||
if (byteBuf != null) {
|
||||
if (byteBuf.isReadable()) {
|
||||
readPending = false;
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
} else {
|
||||
byteBuf.release();
|
||||
}
|
||||
}
|
||||
pipeline.fireChannelReadComplete();
|
||||
pipeline.fireExceptionCaught(cause);
|
||||
if (close || cause instanceof IOException) {
|
||||
closeOnRead(pipeline);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(
|
||||
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
||||
if (!promise.setUncancellable() || !ensureOpen(promise)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (connectPromise != null) {
|
||||
throw new IllegalStateException("connection attempt already made");
|
||||
}
|
||||
|
||||
boolean wasActive = isActive();
|
||||
if (doConnect((InetSocketAddress) remoteAddress, (InetSocketAddress) localAddress)) {
|
||||
fulfillConnectPromise(promise, wasActive);
|
||||
} else {
|
||||
connectPromise = promise;
|
||||
requestedRemoteAddress = remoteAddress;
|
||||
|
||||
// Schedule connect timeout.
|
||||
int connectTimeoutMillis = config().getConnectTimeoutMillis();
|
||||
if (connectTimeoutMillis > 0) {
|
||||
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise connectPromise = EpollSocketChannel.this.connectPromise;
|
||||
ConnectTimeoutException cause =
|
||||
new ConnectTimeoutException("connection timed out: " + remoteAddress);
|
||||
if (connectPromise != null && connectPromise.tryFailure(cause)) {
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isCancelled()) {
|
||||
if (connectTimeoutFuture != null) {
|
||||
connectTimeoutFuture.cancel(false);
|
||||
}
|
||||
connectPromise = null;
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
closeIfClosed();
|
||||
promise.tryFailure(annotateConnectException(t, remoteAddress));
|
||||
}
|
||||
}
|
||||
|
||||
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
|
||||
if (promise == null) {
|
||||
// Closed via cancellation and the promise has been notified already.
|
||||
return;
|
||||
}
|
||||
active = true;
|
||||
|
||||
// trySuccess() will return false if a user cancelled the connection attempt.
|
||||
boolean promiseSet = promise.trySuccess();
|
||||
|
||||
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
|
||||
// because what happened is what happened.
|
||||
if (!wasActive && isActive()) {
|
||||
pipeline().fireChannelActive();
|
||||
}
|
||||
|
||||
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
|
||||
if (!promiseSet) {
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
|
||||
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
|
||||
if (promise == null) {
|
||||
// Closed via cancellation and the promise has been notified already.
|
||||
return;
|
||||
}
|
||||
|
||||
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
|
||||
promise.tryFailure(cause);
|
||||
closeIfClosed();
|
||||
}
|
||||
|
||||
private void finishConnect() {
|
||||
// Note this method is invoked by the event loop only if the connection attempt was
|
||||
// neither cancelled nor timed out.
|
||||
|
||||
assert eventLoop().inEventLoop();
|
||||
|
||||
boolean connectStillInProgress = false;
|
||||
try {
|
||||
boolean wasActive = isActive();
|
||||
if (!doFinishConnect()) {
|
||||
connectStillInProgress = true;
|
||||
return;
|
||||
}
|
||||
fulfillConnectPromise(connectPromise, wasActive);
|
||||
} catch (Throwable t) {
|
||||
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
|
||||
} finally {
|
||||
if (!connectStillInProgress) {
|
||||
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
|
||||
// See https://github.com/netty/netty/issues/1770
|
||||
if (connectTimeoutFuture != null) {
|
||||
connectTimeoutFuture.cancel(false);
|
||||
}
|
||||
connectPromise = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollOutReady() {
|
||||
if (connectPromise != null) {
|
||||
// pending connect which is now complete so handle it.
|
||||
finishConnect();
|
||||
} else {
|
||||
super.epollOutReady();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the remote peer
|
||||
*/
|
||||
private boolean doConnect(InetSocketAddress remoteAddress, InetSocketAddress localAddress) throws Exception {
|
||||
if (localAddress != null) {
|
||||
checkResolvable(localAddress);
|
||||
Native.bind(fd, localAddress.getAddress(), localAddress.getPort());
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
try {
|
||||
checkResolvable(remoteAddress);
|
||||
boolean connected = Native.connect(fd, remoteAddress.getAddress(),
|
||||
remoteAddress.getPort());
|
||||
remote = remoteAddress;
|
||||
local = Native.localAddress(fd);
|
||||
if (!connected) {
|
||||
setEpollOut();
|
||||
}
|
||||
success = true;
|
||||
return connected;
|
||||
} finally {
|
||||
if (!success) {
|
||||
doClose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish the connect
|
||||
*/
|
||||
private boolean doFinishConnect() throws Exception {
|
||||
if (Native.finishConnect(fd)) {
|
||||
clearEpollOut();
|
||||
return true;
|
||||
} else {
|
||||
setEpollOut();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read bytes into the given {@link ByteBuf} and return the amount.
|
||||
*/
|
||||
private int doReadBytes(ByteBuf byteBuf) throws Exception {
|
||||
int writerIndex = byteBuf.writerIndex();
|
||||
int localReadAmount;
|
||||
if (byteBuf.hasMemoryAddress()) {
|
||||
localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
|
||||
} else {
|
||||
ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
|
||||
localReadAmount = Native.read(fd, buf, buf.position(), buf.limit());
|
||||
}
|
||||
if (localReadAmount > 0) {
|
||||
byteBuf.writerIndex(writerIndex + localReadAmount);
|
||||
}
|
||||
return localReadAmount;
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollRdHupReady() {
|
||||
if (isActive()) {
|
||||
epollInReady();
|
||||
} else {
|
||||
closeOnRead(pipeline());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void epollInReady() {
|
||||
final ChannelConfig config = config();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
final ByteBufAllocator allocator = config.getAllocator();
|
||||
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
|
||||
if (allocHandle == null) {
|
||||
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
|
||||
}
|
||||
|
||||
ByteBuf byteBuf = null;
|
||||
boolean close = false;
|
||||
try {
|
||||
int totalReadAmount = 0;
|
||||
for (;;) {
|
||||
// we use a direct buffer here as the native implementations only be able
|
||||
// to handle direct buffers.
|
||||
byteBuf = allocHandle.allocate(allocator);
|
||||
int writable = byteBuf.writableBytes();
|
||||
int localReadAmount = doReadBytes(byteBuf);
|
||||
if (localReadAmount <= 0) {
|
||||
// not was read release the buffer
|
||||
byteBuf.release();
|
||||
close = localReadAmount < 0;
|
||||
break;
|
||||
}
|
||||
readPending = false;
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
byteBuf = null;
|
||||
|
||||
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
|
||||
allocHandle.record(totalReadAmount);
|
||||
|
||||
// Avoid overflow.
|
||||
totalReadAmount = localReadAmount;
|
||||
} else {
|
||||
totalReadAmount += localReadAmount;
|
||||
}
|
||||
|
||||
if (localReadAmount < writable) {
|
||||
// Read less than what the buffer can hold,
|
||||
// which might mean we drained the recv buffer completely.
|
||||
break;
|
||||
}
|
||||
}
|
||||
pipeline.fireChannelReadComplete();
|
||||
allocHandle.record(totalReadAmount);
|
||||
|
||||
if (close) {
|
||||
closeOnRead(pipeline);
|
||||
close = false;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
boolean closed = handleReadException(pipeline, byteBuf, t, close);
|
||||
if (!closed) {
|
||||
// trigger a read again as there may be something left to read and because of epoll ET we
|
||||
// will not get notified again until we read everything from the socket
|
||||
eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
epollInReady();
|
||||
}
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
// Check if there is a readPending which was not processed yet.
|
||||
// This could be for two reasons:
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
|
||||
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/2254
|
||||
if (!config.isAutoRead() && !readPending) {
|
||||
clearEpollIn0();
|
||||
}
|
||||
}
|
||||
checkResolvable((InetSocketAddress) remoteAddress);
|
||||
if (super.doConnect(remoteAddress, localAddress)) {
|
||||
local = Native.localAddress(fd);
|
||||
remote = Native.remoteAddress(fd);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
@ -346,18 +347,39 @@ final class Native {
|
||||
return res;
|
||||
}
|
||||
|
||||
public static int socketDomainFd() {
|
||||
int res = socketDomain();
|
||||
if (res < 0) {
|
||||
throw new ChannelException(newIOException("socketDomain", res));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
private static native int socketStream();
|
||||
private static native int socketDgram();
|
||||
private static native int socketDomain();
|
||||
|
||||
public static void bind(int fd, InetAddress addr, int port) throws IOException {
|
||||
NativeInetAddress address = toNativeInetAddress(addr);
|
||||
int res = bind(fd, address.address, address.scopeId, port);
|
||||
if (res < 0) {
|
||||
throw newIOException("bind", res);
|
||||
public static void bind(int fd, SocketAddress socketAddress) throws IOException {
|
||||
if (socketAddress instanceof InetSocketAddress) {
|
||||
InetSocketAddress addr = (InetSocketAddress) socketAddress;
|
||||
NativeInetAddress address = toNativeInetAddress(addr.getAddress());
|
||||
int res = bind(fd, address.address, address.scopeId, addr.getPort());
|
||||
if (res < 0) {
|
||||
throw newIOException("bind", res);
|
||||
}
|
||||
} else if (socketAddress instanceof DomainSocketAddress) {
|
||||
DomainSocketAddress addr = (DomainSocketAddress) socketAddress;
|
||||
int res = bindDomainSocket(fd, addr.path());
|
||||
if (res < 0) {
|
||||
throw newIOException("bind", res);
|
||||
}
|
||||
} else {
|
||||
throw new Error("Unexpected SocketAddress implementation " + socketAddress);
|
||||
}
|
||||
}
|
||||
|
||||
private static native int bind(int fd, byte[] address, int scopeId, int port);
|
||||
private static native int bindDomainSocket(int fd, String path);
|
||||
|
||||
public static void listen(int fd, int backlog) throws IOException {
|
||||
int res = listen0(fd, backlog);
|
||||
@ -368,9 +390,18 @@ final class Native {
|
||||
|
||||
private static native int listen0(int fd, int backlog);
|
||||
|
||||
public static boolean connect(int fd, InetAddress addr, int port) throws IOException {
|
||||
NativeInetAddress address = toNativeInetAddress(addr);
|
||||
int res = connect(fd, address.address, address.scopeId, port);
|
||||
public static boolean connect(int fd, SocketAddress socketAddress) throws IOException {
|
||||
int res;
|
||||
if (socketAddress instanceof InetSocketAddress) {
|
||||
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
|
||||
NativeInetAddress address = toNativeInetAddress(inetSocketAddress.getAddress());
|
||||
res = connect(fd, address.address, address.scopeId, inetSocketAddress.getPort());
|
||||
} else if (socketAddress instanceof DomainSocketAddress) {
|
||||
DomainSocketAddress unixDomainSocketAddress = (DomainSocketAddress) socketAddress;
|
||||
res = connectDomainSocket(fd, unixDomainSocketAddress.path());
|
||||
} else {
|
||||
throw new Error("Unexpected SocketAddress implementation " + socketAddress);
|
||||
}
|
||||
if (res < 0) {
|
||||
if (res == ERRNO_EINPROGRESS_NEGATIVE) {
|
||||
// connect not complete yet need to wait for EPOLLOUT event
|
||||
@ -382,6 +413,7 @@ final class Native {
|
||||
}
|
||||
|
||||
private static native int connect(int fd, byte[] address, int scopeId, int port);
|
||||
private static native int connectDomainSocket(int fd, String path);
|
||||
|
||||
public static boolean finishConnect(int fd) throws IOException {
|
||||
int res = finishConnect0(fd);
|
||||
@ -477,6 +509,20 @@ final class Native {
|
||||
|
||||
private static native int accept0(int fd);
|
||||
|
||||
public static int recvFd(int fd) throws IOException {
|
||||
int res = recvFd0(fd);
|
||||
if (res >= 0) {
|
||||
return res;
|
||||
}
|
||||
if (res == ERRNO_EAGAIN_NEGATIVE || res == ERRNO_EWOULDBLOCK_NEGATIVE) {
|
||||
// Everything consumed so just return -1 here.
|
||||
return -1;
|
||||
}
|
||||
throw newIOException("recvFd", res);
|
||||
}
|
||||
|
||||
private static native int recvFd0(int fd);
|
||||
|
||||
public static void shutdown(int fd, boolean read, boolean write) throws IOException {
|
||||
int res = shutdown0(fd, read, write);
|
||||
if (res < 0) {
|
||||
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketEchoTest extends EpollSocketEchoTest {
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketFileRegionTest extends EpollSocketFileRegionTest {
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest {
|
||||
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketGatheringWriteTest;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketGatheringWriteTest extends SocketGatheringWriteTest {
|
||||
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketObjectEchoTest;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketObjectEchoTest extends SocketObjectEchoTest {
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketSslEchoTest;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketSslEchoTest extends SocketSslEchoTest {
|
||||
public EpollDomainSocketSslEchoTest(
|
||||
SslContext serverCtx, SslContext clientCtx, Renegotiation renegotiation,
|
||||
boolean serverUsesDelegatedTaskExecutor, boolean clientUsesDelegatedTaskExecutor,
|
||||
boolean autoRead, boolean useChunkedWriteHandler, boolean useCompositeByteBuf) {
|
||||
|
||||
super(serverCtx, clientCtx, renegotiation,
|
||||
serverUsesDelegatedTaskExecutor, clientUsesDelegatedTaskExecutor,
|
||||
autoRead, useChunkedWriteHandler, useCompositeByteBuf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketSslGreetingTest;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketSslGreetingTest extends SocketSslGreetingTest {
|
||||
|
||||
public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) {
|
||||
super(serverCtx, clientCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketStartTlsTest;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketStartTlsTest extends SocketStartTlsTest {
|
||||
|
||||
public EpollDomainSocketStartTlsTest(SslContext serverCtx, SslContext clientCtx) {
|
||||
super(serverCtx, clientCtx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.SocketStringEchoTest;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class EpollDomainSocketStringEchoTest extends SocketStringEchoTest {
|
||||
@Override
|
||||
protected SocketAddress newSocketAddress() {
|
||||
return EpollSocketTestPermutation.newSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||
return EpollSocketTestPermutation.INSTANCE.domainSocket();
|
||||
}
|
||||
}
|
@ -29,12 +29,15 @@ import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory;
|
||||
import io.netty.testsuite.transport.socket.SocketTestPermutation;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
|
||||
static final SocketTestPermutation INSTANCE = new EpollSocketTestPermutation();
|
||||
static final EpollSocketTestPermutation INSTANCE = new EpollSocketTestPermutation();
|
||||
|
||||
static final EventLoopGroup EPOLL_BOSS_GROUP =
|
||||
new EpollEventLoopGroup(BOSSES, new DefaultThreadFactory("testsuite-epoll-boss", true));
|
||||
@ -119,4 +122,44 @@ class EpollSocketTestPermutation extends SocketTestPermutation {
|
||||
);
|
||||
return combo(bfs, bfs);
|
||||
}
|
||||
|
||||
public List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> domainSocket() {
|
||||
|
||||
List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> list =
|
||||
combo(serverDomainSocket(), clientDomainSocket());
|
||||
return list;
|
||||
}
|
||||
|
||||
public List<BootstrapFactory<ServerBootstrap>> serverDomainSocket() {
|
||||
return Collections.<BootstrapFactory<ServerBootstrap>>singletonList(
|
||||
new BootstrapFactory<ServerBootstrap>() {
|
||||
@Override
|
||||
public ServerBootstrap newInstance() {
|
||||
return new ServerBootstrap().group(EPOLL_BOSS_GROUP, EPOLL_WORKER_GROUP)
|
||||
.channel(EpollServerDomainSocketChannel.class);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public List<BootstrapFactory<Bootstrap>> clientDomainSocket() {
|
||||
return Collections.<BootstrapFactory<Bootstrap>>singletonList(
|
||||
new BootstrapFactory<Bootstrap>() {
|
||||
@Override
|
||||
public Bootstrap newInstance() {
|
||||
return new Bootstrap().group(EPOLL_WORKER_GROUP).channel(EpollDomainSocketChannel.class);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public static DomainSocketAddress newSocketAddress() {
|
||||
try {
|
||||
File file = File.createTempFile("netty", "dsocket");
|
||||
file.delete();
|
||||
return new DomainSocketAddress(file);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user