Allow to create Epoll*Channel from FileDescriptor
Motivation: Sometimes it's useful to be able to create a Epoll*Channel from an existing file descriptor. This is especially helpful if you integrade some c/jni code. Modifications: - Add extra constructor to Epoll*Channel implementations that take a FileDescriptor as an argument - Make Rename EpollFileDescriptor to NativeFileDescriptor and make it public - Also ensure we obtain the correct remote/local address when create a Channel from a FileDescriptor Result: It's now possible to create a FileDescriptor and instance a Epoll*Channel via it.
This commit is contained in:
parent
daa04cb4f1
commit
865a83c15d
@ -1222,6 +1222,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv* en
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoError(JNIEnv* env, jclass clazz, jint fd) {
|
||||||
|
int optval = 0;
|
||||||
|
if (getOption(env, fd, SOL_SOCKET, SO_ERROR, &optval, sizeof(optval)) == -1) {
|
||||||
|
return optval;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv* env, jclass clazz, jint fd) {
|
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv* env, jclass clazz, jint fd) {
|
||||||
int optval;
|
int optval;
|
||||||
if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) {
|
if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) {
|
||||||
|
@ -99,6 +99,7 @@ jint Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv* env, jclass clazz, j
|
|||||||
jint Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd);
|
jint Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd);
|
||||||
jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd);
|
jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd);
|
||||||
jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd);
|
jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd);
|
||||||
|
jint Java_io_netty_channel_epoll_Native_getSoError(JNIEnv* env, jclass clazz, jint fd);
|
||||||
|
|
||||||
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv* env, jclass clazz);
|
jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv* env, jclass clazz);
|
||||||
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv* env, jclass clazz);
|
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv* env, jclass clazz);
|
||||||
|
@ -44,11 +44,18 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) {
|
AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) {
|
||||||
|
this(parent, new NativeFileDescriptor(fd), flag, active);
|
||||||
|
}
|
||||||
|
|
||||||
|
AbstractEpollChannel(Channel parent, FileDescriptor fd, int flag, boolean active) {
|
||||||
super(parent);
|
super(parent);
|
||||||
|
if (fd == null) {
|
||||||
|
throw new NullPointerException("fd");
|
||||||
|
}
|
||||||
readFlag = flag;
|
readFlag = flag;
|
||||||
flags |= flag;
|
flags |= flag;
|
||||||
this.active = active;
|
this.active = active;
|
||||||
fileDescriptor = new EpollFileDescriptor(fd);
|
fileDescriptor = fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setFlag(int flag) {
|
void setFlag(int flag) {
|
||||||
@ -113,7 +120,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isOpen() {
|
public boolean isOpen() {
|
||||||
return fileDescriptor != EpollFileDescriptor.INVALID;
|
return fileDescriptor != FileDescriptor.INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -20,6 +20,7 @@ import io.netty.channel.ChannelOutboundBuffer;
|
|||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.channel.FileDescriptor;
|
||||||
import io.netty.channel.ServerChannel;
|
import io.netty.channel.ServerChannel;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
@ -32,6 +33,10 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
|
|||||||
super(fd, Native.EPOLLIN);
|
super(fd, Native.EPOLLIN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected AbstractEpollServerChannel(FileDescriptor fd) {
|
||||||
|
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isCompatible(EventLoop loop) {
|
protected boolean isCompatible(EventLoop loop) {
|
||||||
return loop instanceof EpollEventLoop;
|
return loop instanceof EpollEventLoop;
|
||||||
|
@ -29,6 +29,7 @@ import io.netty.channel.ChannelPromise;
|
|||||||
import io.netty.channel.ConnectTimeoutException;
|
import io.netty.channel.ConnectTimeoutException;
|
||||||
import io.netty.channel.DefaultFileRegion;
|
import io.netty.channel.DefaultFileRegion;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.channel.FileDescriptor;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
@ -61,6 +62,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
flags |= Native.EPOLLRDHUP;
|
flags |= Native.EPOLLRDHUP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected AbstractEpollStreamChannel(FileDescriptor fd) {
|
||||||
|
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected AbstractEpollUnsafe newUnsafe() {
|
protected AbstractEpollUnsafe newUnsafe() {
|
||||||
return new EpollStreamUnsafe();
|
return new EpollStreamUnsafe();
|
||||||
|
@ -67,6 +67,18 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
config = new EpollDatagramChannelConfig(this);
|
config = new EpollDatagramChannelConfig(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link EpollDatagramChannel} from the given {@link FileDescriptor}.
|
||||||
|
*/
|
||||||
|
public EpollDatagramChannel(FileDescriptor fd) {
|
||||||
|
super(null, fd, Native.EPOLLIN, true);
|
||||||
|
config = new EpollDatagramChannelConfig(this);
|
||||||
|
|
||||||
|
// As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local
|
||||||
|
// address from it. This is needed as the FileDescriptor may be bound already.
|
||||||
|
local = Native.localAddress(fd.intValue());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InetSocketAddress remoteAddress() {
|
public InetSocketAddress remoteAddress() {
|
||||||
return (InetSocketAddress) super.remoteAddress();
|
return (InetSocketAddress) super.remoteAddress();
|
||||||
|
@ -37,6 +37,13 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
|||||||
super(parent, fd.intValue());
|
super(parent, fd.intValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link EpollDomainSocketChannel} from an existing {@link FileDescriptor}
|
||||||
|
*/
|
||||||
|
public EpollDomainSocketChannel(FileDescriptor fd) {
|
||||||
|
super(fd);
|
||||||
|
}
|
||||||
|
|
||||||
EpollDomainSocketChannel(Channel parent, int fd) {
|
EpollDomainSocketChannel(Channel parent, int fd) {
|
||||||
super(parent, fd);
|
super(parent, fd);
|
||||||
}
|
}
|
||||||
@ -100,7 +107,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object filterOutboundMessage(Object msg) {
|
protected Object filterOutboundMessage(Object msg) {
|
||||||
if (msg instanceof EpollFileDescriptor) {
|
if (msg instanceof NativeFileDescriptor) {
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
return super.filterOutboundMessage(msg);
|
return super.filterOutboundMessage(msg);
|
||||||
@ -143,7 +150,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
|||||||
readPending = false;
|
readPending = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
|
pipeline.fireChannelRead(new NativeFileDescriptor(socketFd));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
// keep on reading as we use epoll ET and need to consume everything from the socket
|
// keep on reading as we use epoll ET and need to consume everything from the socket
|
||||||
pipeline.fireChannelReadComplete();
|
pipeline.fireChannelReadComplete();
|
||||||
|
@ -129,7 +129,7 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig {
|
|||||||
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the
|
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the
|
||||||
* {@link Channel} and passed through the pipeline. If
|
* {@link Channel} and passed through the pipeline. If
|
||||||
* {@link DomainSocketReadMode#FILE_DESCRIPTORS} is used
|
* {@link DomainSocketReadMode#FILE_DESCRIPTORS} is used
|
||||||
* {@link EpollFileDescriptor}s will be passed through the {@link ChannelPipeline}.
|
* {@link NativeFileDescriptor}s will be passed through the {@link ChannelPipeline}.
|
||||||
*
|
*
|
||||||
* This setting can be modified on the fly if needed.
|
* This setting can be modified on the fly if needed.
|
||||||
*/
|
*/
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
package io.netty.channel.epoll;
|
package io.netty.channel.epoll;
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.FileDescriptor;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
@ -34,6 +35,13 @@ public final class EpollServerDomainSocketChannel extends AbstractEpollServerCha
|
|||||||
super(Native.socketDomainFd());
|
super(Native.socketDomainFd());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link EpollServerDomainSocketChannel} from an existing {@link FileDescriptor}.
|
||||||
|
*/
|
||||||
|
public EpollServerDomainSocketChannel(FileDescriptor fd) {
|
||||||
|
super(fd);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Channel newChildChannel(int fd) throws Exception {
|
protected Channel newChildChannel(int fd) throws Exception {
|
||||||
return new EpollDomainSocketChannel(this, fd);
|
return new EpollDomainSocketChannel(this, fd);
|
||||||
|
@ -17,6 +17,7 @@ package io.netty.channel.epoll;
|
|||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.channel.FileDescriptor;
|
||||||
import io.netty.channel.socket.ServerSocketChannel;
|
import io.netty.channel.socket.ServerSocketChannel;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
@ -36,6 +37,18 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
|
|||||||
config = new EpollServerSocketChannelConfig(this);
|
config = new EpollServerSocketChannelConfig(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link EpollServerSocketChannel} from an existing {@link FileDescriptor}.
|
||||||
|
*/
|
||||||
|
public EpollServerSocketChannel(FileDescriptor fd) {
|
||||||
|
super(fd);
|
||||||
|
config = new EpollServerSocketChannelConfig(this);
|
||||||
|
|
||||||
|
// As we create an EpollServerSocketChannel from a FileDescriptor we should try to obtain the remote and local
|
||||||
|
// address from it. This is needed as the FileDescriptor may be bound already.
|
||||||
|
local = Native.localAddress(fd.intValue());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isCompatible(EventLoop loop) {
|
protected boolean isCompatible(EventLoop loop) {
|
||||||
return loop instanceof EpollEventLoop;
|
return loop instanceof EpollEventLoop;
|
||||||
|
@ -19,6 +19,7 @@ import io.netty.channel.Channel;
|
|||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
|
import io.netty.channel.FileDescriptor;
|
||||||
import io.netty.channel.socket.ServerSocketChannel;
|
import io.netty.channel.socket.ServerSocketChannel;
|
||||||
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
||||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||||
@ -53,6 +54,19 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
|
|||||||
config = new EpollSocketChannelConfig(this);
|
config = new EpollSocketChannelConfig(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link EpollSocketChannel} from an existing {@link FileDescriptor}.
|
||||||
|
*/
|
||||||
|
public EpollSocketChannel(FileDescriptor fd) {
|
||||||
|
super(fd);
|
||||||
|
config = new EpollSocketChannelConfig(this);
|
||||||
|
|
||||||
|
// As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local
|
||||||
|
// address from it. This is needed as the FileDescriptor may be bound/connected already.
|
||||||
|
remote = Native.remoteAddress(fd.intValue());
|
||||||
|
local = Native.localAddress(fd.intValue());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
|
* Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
|
||||||
*/
|
*/
|
||||||
|
@ -574,6 +574,7 @@ final class Native {
|
|||||||
public static native int getTcpKeepIdle(int fd);
|
public static native int getTcpKeepIdle(int fd);
|
||||||
public static native int getTcpKeepIntvl(int fd);
|
public static native int getTcpKeepIntvl(int fd);
|
||||||
public static native int getTcpKeepCnt(int fd);
|
public static native int getTcpKeepCnt(int fd);
|
||||||
|
public static native int getSoError(int fd);
|
||||||
|
|
||||||
public static native void setKeepAlive(int fd, int keepAlive);
|
public static native void setKeepAlive(int fd, int keepAlive);
|
||||||
public static native void setReceiveBufferSize(int fd, int receiveBufferSize);
|
public static native void setReceiveBufferSize(int fd, int receiveBufferSize);
|
||||||
|
@ -19,11 +19,15 @@ import io.netty.channel.FileDescriptor;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
final class EpollFileDescriptor implements FileDescriptor {
|
/**
|
||||||
|
* Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a
|
||||||
|
* {@link FileDescriptor} for it.
|
||||||
|
*/
|
||||||
|
public final class NativeFileDescriptor implements FileDescriptor {
|
||||||
|
|
||||||
private final int fd;
|
private final int fd;
|
||||||
|
|
||||||
EpollFileDescriptor(int fd) {
|
public NativeFileDescriptor(int fd) {
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
throw new IllegalArgumentException("fd must be >= 0");
|
throw new IllegalArgumentException("fd must be >= 0");
|
||||||
}
|
}
|
||||||
@ -52,11 +56,11 @@ final class EpollFileDescriptor implements FileDescriptor {
|
|||||||
if (this == o) {
|
if (this == o) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (!(o instanceof EpollFileDescriptor)) {
|
if (!(o instanceof NativeFileDescriptor)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return fd == ((EpollFileDescriptor) o).fd;
|
return fd == ((NativeFileDescriptor) o).fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
@ -71,7 +71,7 @@ public class EpollDomainSocketFdTest extends AbstractSocketTest {
|
|||||||
cb.handler(new ChannelInboundHandlerAdapter() {
|
cb.handler(new ChannelInboundHandlerAdapter() {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
EpollFileDescriptor fd = (EpollFileDescriptor) msg;
|
NativeFileDescriptor fd = (NativeFileDescriptor) msg;
|
||||||
queue.offer(fd);
|
queue.offer(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,9 +90,9 @@ public class EpollDomainSocketFdTest extends AbstractSocketTest {
|
|||||||
cc.close().sync();
|
cc.close().sync();
|
||||||
sc.close().sync();
|
sc.close().sync();
|
||||||
|
|
||||||
if (received instanceof EpollFileDescriptor) {
|
if (received instanceof NativeFileDescriptor) {
|
||||||
Assert.assertNotSame(EpollFileDescriptor.INVALID, received);
|
Assert.assertNotSame(NativeFileDescriptor.INVALID, received);
|
||||||
((EpollFileDescriptor) received).close();
|
((NativeFileDescriptor) received).close();
|
||||||
Assert.assertNull(queue.poll());
|
Assert.assertNull(queue.poll());
|
||||||
} else {
|
} else {
|
||||||
throw (Throwable) received;
|
throw (Throwable) received;
|
||||||
|
Loading…
Reference in New Issue
Block a user