Allow to create Epoll*Channel from FileDescriptor
Motivation: Sometimes it's useful to be able to create a Epoll*Channel from an existing file descriptor. This is especially helpful if you integrade some c/jni code. Modifications: - Add extra constructor to Epoll*Channel implementations that take a FileDescriptor as an argument - Make Rename EpollFileDescriptor to NativeFileDescriptor and make it public - Also ensure we obtain the correct remote/local address when create a Channel from a FileDescriptor Result: It's now possible to create a FileDescriptor and instance a Epoll*Channel via it.
This commit is contained in:
parent
5fca3de098
commit
9d697d7a9a
@ -1222,6 +1222,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv* en
|
||||
}
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoError(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval = 0;
|
||||
if (getOption(env, fd, SOL_SOCKET, SO_ERROR, &optval, sizeof(optval)) == -1) {
|
||||
return optval;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv* env, jclass clazz, jint fd) {
|
||||
int optval;
|
||||
if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) {
|
||||
|
@ -99,6 +99,7 @@ jint Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv* env, jclass clazz, j
|
||||
jint Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd);
|
||||
jint Java_io_netty_channel_epoll_Native_getSoError(JNIEnv* env, jclass clazz, jint fd);
|
||||
|
||||
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv* env, jclass clazz);
|
||||
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv* env, jclass clazz);
|
||||
|
@ -44,11 +44,18 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
}
|
||||
|
||||
AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) {
|
||||
this(parent, new NativeFileDescriptor(fd), flag, active);
|
||||
}
|
||||
|
||||
AbstractEpollChannel(Channel parent, FileDescriptor fd, int flag, boolean active) {
|
||||
super(parent);
|
||||
if (fd == null) {
|
||||
throw new NullPointerException("fd");
|
||||
}
|
||||
readFlag = flag;
|
||||
flags |= flag;
|
||||
this.active = active;
|
||||
fileDescriptor = new EpollFileDescriptor(fd);
|
||||
fileDescriptor = fd;
|
||||
}
|
||||
|
||||
void setFlag(int flag) {
|
||||
@ -113,7 +120,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return fileDescriptor != EpollFileDescriptor.INVALID;
|
||||
return fileDescriptor != FileDescriptor.INVALID;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -20,6 +20,7 @@ import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.FileDescriptor;
|
||||
import io.netty.channel.ServerChannel;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
@ -32,6 +33,10 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
|
||||
super(fd, Native.EPOLLIN);
|
||||
}
|
||||
|
||||
protected AbstractEpollServerChannel(FileDescriptor fd) {
|
||||
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return loop instanceof EpollEventLoop;
|
||||
|
@ -29,6 +29,7 @@ import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ConnectTimeoutException;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.FileDescriptor;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
@ -61,6 +62,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
flags |= Native.EPOLLRDHUP;
|
||||
}
|
||||
|
||||
protected AbstractEpollStreamChannel(FileDescriptor fd) {
|
||||
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractEpollUnsafe newUnsafe() {
|
||||
return new EpollStreamUnsafe();
|
||||
|
@ -67,6 +67,18 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
||||
config = new EpollDatagramChannelConfig(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link EpollDatagramChannel} from the given {@link FileDescriptor}.
|
||||
*/
|
||||
public EpollDatagramChannel(FileDescriptor fd) {
|
||||
super(null, fd, Native.EPOLLIN, true);
|
||||
config = new EpollDatagramChannelConfig(this);
|
||||
|
||||
// As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local
|
||||
// address from it. This is needed as the FileDescriptor may be bound already.
|
||||
local = Native.localAddress(fd.intValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress remoteAddress() {
|
||||
return (InetSocketAddress) super.remoteAddress();
|
||||
|
@ -37,6 +37,13 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
||||
super(parent, fd.intValue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link EpollDomainSocketChannel} from an existing {@link FileDescriptor}
|
||||
*/
|
||||
public EpollDomainSocketChannel(FileDescriptor fd) {
|
||||
super(fd);
|
||||
}
|
||||
|
||||
EpollDomainSocketChannel(Channel parent, int fd) {
|
||||
super(parent, fd);
|
||||
}
|
||||
@ -100,7 +107,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
||||
|
||||
@Override
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof EpollFileDescriptor) {
|
||||
if (msg instanceof NativeFileDescriptor) {
|
||||
return msg;
|
||||
}
|
||||
return super.filterOutboundMessage(msg);
|
||||
@ -143,7 +150,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
||||
readPending = false;
|
||||
|
||||
try {
|
||||
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
|
||||
pipeline.fireChannelRead(new NativeFileDescriptor(socketFd));
|
||||
} catch (Throwable t) {
|
||||
// keep on reading as we use epoll ET and need to consume everything from the socket
|
||||
pipeline.fireChannelReadComplete();
|
||||
|
@ -129,7 +129,7 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig {
|
||||
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the
|
||||
* {@link Channel} and passed through the pipeline. If
|
||||
* {@link DomainSocketReadMode#FILE_DESCRIPTORS} is used
|
||||
* {@link EpollFileDescriptor}s will be passed through the {@link ChannelPipeline}.
|
||||
* {@link NativeFileDescriptor}s will be passed through the {@link ChannelPipeline}.
|
||||
*
|
||||
* This setting can be modified on the fly if needed.
|
||||
*/
|
||||
|
@ -16,6 +16,7 @@
|
||||
package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.FileDescriptor;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -34,6 +35,13 @@ public final class EpollServerDomainSocketChannel extends AbstractEpollServerCha
|
||||
super(Native.socketDomainFd());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link EpollServerDomainSocketChannel} from an existing {@link FileDescriptor}.
|
||||
*/
|
||||
public EpollServerDomainSocketChannel(FileDescriptor fd) {
|
||||
super(fd);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Channel newChildChannel(int fd) throws Exception {
|
||||
return new EpollDomainSocketChannel(this, fd);
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.epoll;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.FileDescriptor;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
@ -36,6 +37,18 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
|
||||
config = new EpollServerSocketChannelConfig(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link EpollServerSocketChannel} from an existing {@link FileDescriptor}.
|
||||
*/
|
||||
public EpollServerSocketChannel(FileDescriptor fd) {
|
||||
super(fd);
|
||||
config = new EpollServerSocketChannelConfig(this);
|
||||
|
||||
// As we create an EpollServerSocketChannel from a FileDescriptor we should try to obtain the remote and local
|
||||
// address from it. This is needed as the FileDescriptor may be bound already.
|
||||
local = Native.localAddress(fd.intValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return loop instanceof EpollEventLoop;
|
||||
|
@ -19,6 +19,7 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.FileDescriptor;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
@ -53,6 +54,19 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
|
||||
config = new EpollSocketChannelConfig(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link EpollSocketChannel} from an existing {@link FileDescriptor}.
|
||||
*/
|
||||
public EpollSocketChannel(FileDescriptor fd) {
|
||||
super(fd);
|
||||
config = new EpollSocketChannelConfig(this);
|
||||
|
||||
// As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local
|
||||
// address from it. This is needed as the FileDescriptor may be bound/connected already.
|
||||
remote = Native.remoteAddress(fd.intValue());
|
||||
local = Native.localAddress(fd.intValue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
|
||||
*/
|
||||
|
@ -574,6 +574,7 @@ final class Native {
|
||||
public static native int getTcpKeepIdle(int fd);
|
||||
public static native int getTcpKeepIntvl(int fd);
|
||||
public static native int getTcpKeepCnt(int fd);
|
||||
public static native int getSoError(int fd);
|
||||
|
||||
public static native void setKeepAlive(int fd, int keepAlive);
|
||||
public static native void setReceiveBufferSize(int fd, int receiveBufferSize);
|
||||
|
@ -19,11 +19,15 @@ import io.netty.channel.FileDescriptor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
final class EpollFileDescriptor implements FileDescriptor {
|
||||
/**
|
||||
* Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a
|
||||
* {@link FileDescriptor} for it.
|
||||
*/
|
||||
public final class NativeFileDescriptor implements FileDescriptor {
|
||||
|
||||
private final int fd;
|
||||
|
||||
EpollFileDescriptor(int fd) {
|
||||
public NativeFileDescriptor(int fd) {
|
||||
if (fd < 0) {
|
||||
throw new IllegalArgumentException("fd must be >= 0");
|
||||
}
|
||||
@ -52,11 +56,11 @@ final class EpollFileDescriptor implements FileDescriptor {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof EpollFileDescriptor)) {
|
||||
if (!(o instanceof NativeFileDescriptor)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return fd == ((EpollFileDescriptor) o).fd;
|
||||
return fd == ((NativeFileDescriptor) o).fd;
|
||||
}
|
||||
|
||||
@Override
|
@ -71,7 +71,7 @@ public class EpollDomainSocketFdTest extends AbstractSocketTest {
|
||||
cb.handler(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
EpollFileDescriptor fd = (EpollFileDescriptor) msg;
|
||||
NativeFileDescriptor fd = (NativeFileDescriptor) msg;
|
||||
queue.offer(fd);
|
||||
}
|
||||
|
||||
@ -90,9 +90,9 @@ public class EpollDomainSocketFdTest extends AbstractSocketTest {
|
||||
cc.close().sync();
|
||||
sc.close().sync();
|
||||
|
||||
if (received instanceof EpollFileDescriptor) {
|
||||
Assert.assertNotSame(EpollFileDescriptor.INVALID, received);
|
||||
((EpollFileDescriptor) received).close();
|
||||
if (received instanceof NativeFileDescriptor) {
|
||||
Assert.assertNotSame(NativeFileDescriptor.INVALID, received);
|
||||
((NativeFileDescriptor) received).close();
|
||||
Assert.assertNull(queue.poll());
|
||||
} else {
|
||||
throw (Throwable) received;
|
||||
|
Loading…
x
Reference in New Issue
Block a user