Allow to recv and send file descriptors when using EpollDomainSocketChannel.

Motiviation:

When using domain sockets on linux it is supported to recv and send file descriptors. This can be used to pass around for example sockets.

Modifications:
- Add support for recv and send file descriptors when using EpollDomainSocketChannel.
- Allow to obtain the file descriptor for an Epoll*Channel so it can be send via domain sockets.

Result:
recv and send of file descriptors is supported now.
This commit is contained in:
Norman Maurer 2015-01-15 14:38:14 +01:00
parent f771a97592
commit 3030b4afe3
21 changed files with 661 additions and 112 deletions

View File

@ -1445,47 +1445,88 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_connectDomainSocket(JN
return 0;
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_recvFd(JNIEnv* env, jclass clazz, jint fd) {
jint socketFd;
struct msghdr msg;
struct iovec iov[1];
struct cmsghdr* ctrl_msg = NULL;
char msg_buffer[1];
char elem_buffer[CMSG_SPACE(sizeof(int))];
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_recvFd0(JNIEnv* env, jclass clazz, jint fd) {
int socketFd;
struct msghdr descriptorMessage = { 0 };
struct iovec iov[1] = { 0 };
char control[CMSG_SPACE(sizeof(int))] = { 0 };
char iovecData[1];
/* Fill all with 0 */
memset(&msg, 0, sizeof(struct msghdr));
memset(elem_buffer, 0, CMSG_SPACE(sizeof(int)));
descriptorMessage.msg_control = control;
descriptorMessage.msg_controllen = sizeof(control);
descriptorMessage.msg_iov = iov;
descriptorMessage.msg_iovlen = 1;
iov[0].iov_base = iovecData;
iov[0].iov_len = sizeof(iovecData);
iov[0].iov_base = msg_buffer;
iov[0].iov_len = 1;
ssize_t res;
int err;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = elem_buffer;
msg.msg_controllen = CMSG_SPACE(sizeof(int));
for (;;) {
do {
res = recvmsg(fd, &descriptorMessage, 0);
// Keep on reading if we was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if(recvmsg(fd, &msg, MSG_CMSG_CLOEXEC) < 0) {
// All read, return -1
return -1;
}
if (res == 0) {
return 0;
}
if((msg.msg_flags & MSG_CTRUNC) == MSG_CTRUNC) {
// Not enough space ?!?!
return -1;
}
if (res < 0) {
return -err;
}
// skip empty entries
for(ctrl_msg = CMSG_FIRSTHDR(&msg); ctrl_msg != NULL; ctrl_msg = CMSG_NXTHDR(&msg, ctrl_msg)) {
if((ctrl_msg->cmsg_level == SOL_SOCKET) && (ctrl_msg->cmsg_type == SCM_RIGHTS)) {
socketFd = *((int *) CMSG_DATA(ctrl_msg));
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&descriptorMessage);
if (!cmsg) {
return -errno;
}
if ((cmsg->cmsg_len == CMSG_LEN(sizeof(int))) && (cmsg->cmsg_level == SOL_SOCKET) && (cmsg->cmsg_type == SCM_RIGHTS)) {
socketFd = *((int *) CMSG_DATA(cmsg));
// set as non blocking as we want to use it with epoll
if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) {
return -errno;
err = errno;
close(socketFd);
return -err;
}
return socketFd;
}
}
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sendFd0(JNIEnv* env, jclass clazz, jint socketFd, jint fd) {
struct msghdr descriptorMessage = { 0 };
struct iovec iov[1] = { 0 };
char control[CMSG_SPACE(sizeof(int))] = { 0 };
char iovecData[1];
descriptorMessage.msg_control = control;
descriptorMessage.msg_controllen = sizeof(control);
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&descriptorMessage);
if (cmsg) {
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
*((int *)CMSG_DATA(cmsg)) = fd;
descriptorMessage.msg_iov = iov;
descriptorMessage.msg_iovlen = 1;
iov[0].iov_base = iovecData;
iov[0].iov_len = sizeof(iovecData);
size_t res;
int err;
do {
res = sendmsg(socketFd, &descriptorMessage, 0);
// keep on writing if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
return (jint) res;
}
return -1;
}

View File

@ -23,6 +23,7 @@ import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.EventLoop;
import io.netty.channel.FileDescriptor;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.OneTimeTask;
@ -33,9 +34,9 @@ import java.nio.channels.UnresolvedAddressException;
abstract class AbstractEpollChannel extends AbstractChannel {
private static final ChannelMetadata DATA = new ChannelMetadata(false);
private final int readFlag;
private volatile FileDescriptor fileDescriptor;
protected int flags;
protected volatile boolean active;
volatile int fd;
int id;
AbstractEpollChannel(int fd, int flag) {
@ -44,14 +45,17 @@ abstract class AbstractEpollChannel extends AbstractChannel {
AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) {
super(parent);
this.fd = fd;
readFlag = flag;
flags |= flag;
this.active = active;
fileDescriptor = new EpollFileDescriptor(fd);
}
protected final int fd() {
return fd;
/**
* Returns the {@link FileDescriptor} that is used by this {@link Channel}.
*/
public final FileDescriptor fd() {
return fileDescriptor;
}
@Override
@ -71,9 +75,9 @@ abstract class AbstractEpollChannel extends AbstractChannel {
// deregister from epoll now
doDeregister();
int fd = this.fd;
this.fd = -1;
Native.close(fd);
FileDescriptor fd = fileDescriptor;
fileDescriptor = FileDescriptor.INVALID;
Native.close(fd.intValue());
}
@Override
@ -88,7 +92,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
@Override
public boolean isOpen() {
return fd != -1;
return fileDescriptor != EpollFileDescriptor.INVALID;
}
@Override
@ -216,10 +220,11 @@ abstract class AbstractEpollChannel extends AbstractChannel {
int writerIndex = byteBuf.writerIndex();
int localReadAmount;
if (byteBuf.hasMemoryAddress()) {
localReadAmount = Native.readAddress(fd, byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
localReadAmount = Native.readAddress(
fileDescriptor.intValue(), byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
} else {
ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
localReadAmount = Native.read(fd, buf, buf.position(), buf.limit());
localReadAmount = Native.read(fileDescriptor.intValue(), buf, buf.position(), buf.limit());
}
if (localReadAmount > 0) {
byteBuf.writerIndex(writerIndex + localReadAmount);
@ -235,7 +240,8 @@ abstract class AbstractEpollChannel extends AbstractChannel {
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
for (;;) {
int localFlushedAmount = Native.writeAddress(fd, memoryAddress, readerIndex, writerIndex);
int localFlushedAmount = Native.writeAddress(
fileDescriptor.intValue(), memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) {
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
@ -258,7 +264,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
for (;;) {
int pos = nioBuf.position();
int limit = nioBuf.limit();
int localFlushedAmount = Native.write(fd, nioBuf, pos, limit);
int localFlushedAmount = Native.write(fileDescriptor.intValue(), nioBuf, pos, limit);
if (localFlushedAmount > 0) {
nioBuf.position(pos + localFlushedAmount);
writtenBytes += localFlushedAmount;

View File

@ -75,7 +75,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
try {
try {
for (;;) {
int socketFd = Native.accept(fd);
int socketFd = Native.accept(fd().intValue());
if (socketFd == -1) {
// this means everything was handled for now
break;

View File

@ -97,7 +97,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
int offset = 0;
int end = offset + cnt;
for (;;) {
long localWrittenBytes = Native.writevAddresses(fd, array.memoryAddress(offset), cnt);
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
@ -139,7 +139,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
int offset = 0;
int end = offset + nioBufferCnt;
for (;;) {
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, nioBufferCnt);
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
@ -191,7 +191,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
for (;;) {
final long offset = region.transfered();
final long localFlushedAmount = Native.sendfile(fd, region, baseOffset, offset, regionCount - offset);
final long localFlushedAmount =
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
@ -243,7 +244,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
}
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
@ -345,7 +346,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
try {
Native.shutdown(fd, false, true);
Native.shutdown(fd().intValue(), false, true);
outputShutdown = true;
promise.setSuccess();
} catch (Throwable t) {
@ -367,12 +368,12 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
*/
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
Native.bind(fd, localAddress);
Native.bind(fd().intValue(), localAddress);
}
boolean success = false;
try {
boolean connected = Native.connect(fd, remoteAddress);
boolean connected = Native.connect(fd().intValue(), remoteAddress);
if (!connected) {
setEpollOut();
}
@ -385,7 +386,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
}
final class EpollStreamUnsafe extends AbstractEpollUnsafe {
class EpollStreamUnsafe extends AbstractEpollUnsafe {
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
@ -555,7 +556,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
* Finish the connect
*/
private boolean doFinishConnect() throws Exception {
if (Native.finishConnect(fd)) {
if (Native.finishConnect(fd().intValue())) {
clearEpollOut();
return true;
} else {

View File

@ -0,0 +1,29 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
public enum DomainSocketReadMode {
/**
* Read (@link ByteBuf)s from the {@link EpollSocketChannel}.
*/
BYTES,
/**
* Read (@link FileDscriptor)s from the {@link EpollSocketChannel}.
*/
FILE_DESCRIPTORS
}

View File

@ -25,6 +25,8 @@ public final class EpollChannelOption {
public static final ChannelOption<Integer> TCP_KEEPIDLE = ChannelOption.valueOf(T, "TCP_KEEPIDLE");
public static final ChannelOption<Integer> TCP_KEEPINTVL = ChannelOption.valueOf(T, "TCP_KEEPINTVL");
public static final ChannelOption<Integer> TCP_KEEPCNT = ChannelOption.valueOf(T, "TCP_KEEPCNT");
public static final ChannelOption<DomainSocketReadMode> DOMAIN_SOCKET_READ_MODE =
ChannelOption.valueOf(T, "DOMAIN_SOCKET_READ_MODE");
private EpollChannelOption() { }

View File

@ -84,7 +84,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override
@SuppressWarnings("deprecation")
public boolean isActive() {
return fd != -1 &&
return fd() != EpollFileDescriptor.INVALID &&
(config.getOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) && isRegistered()
|| active);
}
@ -262,6 +262,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr);
int fd = fd().intValue();
Native.bind(fd, addr);
local = Native.localAddress(fd);
active = true;
@ -289,7 +290,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
while (cnt > 0) {
int send = Native.sendmmsg(fd, packets, offset, cnt);
int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
setEpollOut();
@ -357,18 +358,18 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
final int writtenBytes;
if (data.hasMemoryAddress()) {
long memoryAddress = data.memoryAddress();
writtenBytes = Native.sendToAddress(fd, memoryAddress, data.readerIndex(), data.writerIndex(),
writtenBytes = Native.sendToAddress(fd().intValue(), memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
} else if (data instanceof CompositeByteBuf) {
IovArray array = IovArrayThreadLocal.get((CompositeByteBuf) data);
int cnt = array.count();
assert cnt != 0;
writtenBytes = Native.sendToAddresses(fd, array.memoryAddress(0),
writtenBytes = Native.sendToAddresses(fd().intValue(), array.memoryAddress(0),
cnt, remoteAddress.getAddress(), remoteAddress.getPort());
} else {
ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
writtenBytes = Native.sendTo(fd, nioData, nioData.position(), nioData.limit(),
writtenBytes = Native.sendTo(fd().intValue(), nioData, nioData.position(), nioData.limit(),
remoteAddress.getAddress(), remoteAddress.getPort());
}
@ -475,7 +476,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
checkResolvable(remoteAddress);
EpollDatagramChannel.this.remote = remoteAddress;
EpollDatagramChannel.this.local = Native.localAddress(fd);
EpollDatagramChannel.this.local = Native.localAddress(fd().intValue());
success = true;
} finally {
if (!success) {
@ -509,11 +510,11 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (data.hasMemoryAddress()) {
// has a memory address so use optimized call
remoteAddress = Native.recvFromAddress(
fd, data.memoryAddress(), writerIndex, data.capacity());
fd().intValue(), data.memoryAddress(), writerIndex, data.capacity());
} else {
ByteBuffer nioData = data.internalNioBuffer(writerIndex, data.writableBytes());
remoteAddress = Native.recvFrom(
fd, nioData, nioData.position(), nioData.limit());
fd().intValue(), nioData, nioData.position(), nioData.limit());
}
if (remoteAddress == null) {

View File

@ -192,56 +192,56 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
@Override
public int getSendBufferSize() {
return Native.getSendBufferSize(datagramChannel.fd);
return Native.getSendBufferSize(datagramChannel.fd().intValue());
}
@Override
public EpollDatagramChannelConfig setSendBufferSize(int sendBufferSize) {
Native.setSendBufferSize(datagramChannel.fd, sendBufferSize);
Native.setSendBufferSize(datagramChannel.fd().intValue(), sendBufferSize);
return this;
}
@Override
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(datagramChannel.fd);
return Native.getReceiveBufferSize(datagramChannel.fd().intValue());
}
@Override
public EpollDatagramChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(datagramChannel.fd, receiveBufferSize);
Native.setReceiveBufferSize(datagramChannel.fd().intValue(), receiveBufferSize);
return this;
}
@Override
public int getTrafficClass() {
return Native.getTrafficClass(datagramChannel.fd);
return Native.getTrafficClass(datagramChannel.fd().intValue());
}
@Override
public EpollDatagramChannelConfig setTrafficClass(int trafficClass) {
Native.setTrafficClass(datagramChannel.fd, trafficClass);
Native.setTrafficClass(datagramChannel.fd().intValue(), trafficClass);
return this;
}
@Override
public boolean isReuseAddress() {
return Native.isReuseAddress(datagramChannel.fd) == 1;
return Native.isReuseAddress(datagramChannel.fd().intValue()) == 1;
}
@Override
public EpollDatagramChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(datagramChannel.fd, reuseAddress ? 1 : 0);
Native.setReuseAddress(datagramChannel.fd().intValue(), reuseAddress ? 1 : 0);
return this;
}
@Override
public boolean isBroadcast() {
return Native.isBroadcast(datagramChannel.fd) == 1;
return Native.isBroadcast(datagramChannel.fd().intValue()) == 1;
}
@Override
public EpollDatagramChannelConfig setBroadcast(boolean broadcast) {
Native.setBroadcast(datagramChannel.fd, broadcast ? 1 : 0);
Native.setBroadcast(datagramChannel.fd().intValue(), broadcast ? 1 : 0);
return this;
}
@ -289,7 +289,7 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
* Returns {@code true} if the SO_REUSEPORT option is set.
*/
public boolean isReusePort() {
return Native.isReusePort(datagramChannel.fd) == 1;
return Native.isReusePort(datagramChannel.fd().intValue()) == 1;
}
/**
@ -300,7 +300,7 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple
* any affect.
*/
public EpollDatagramChannelConfig setReusePort(boolean reusePort) {
Native.setReusePort(datagramChannel.fd, reusePort ? 1 : 0);
Native.setReusePort(datagramChannel.fd().intValue(), reusePort ? 1 : 0);
return this;
}

View File

@ -17,22 +17,33 @@ package io.netty.channel.epoll;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileDescriptor;
import java.net.SocketAddress;
public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
private final ChannelConfig config = new DefaultChannelConfig(this);
private final EpollDomainSocketChannelConfig config = new EpollDomainSocketChannelConfig(this);
private volatile DomainSocketAddress local;
private volatile DomainSocketAddress remote;
public EpollDomainSocketChannel() {
super(Native.socketDomainFd());
}
public EpollDomainSocketChannel(Channel parent, FileDescriptor fd) {
super(parent, fd.intValue());
}
EpollDomainSocketChannel(Channel parent, int fd) {
super(parent, fd);
}
public EpollDomainSocketChannel() {
super(Native.socketDomainFd());
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollDomainUnsafe();
}
@Override
@ -47,12 +58,12 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
Native.bind(fd, localAddress);
Native.bind(fd().intValue(), localAddress);
local = (DomainSocketAddress) localAddress;
}
@Override
public ChannelConfig config() {
public EpollDomainSocketChannelConfig config() {
return config;
}
@ -75,4 +86,82 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
public DomainSocketAddress localAddress() {
return (DomainSocketAddress) super.localAddress();
}
@Override
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
// File descriptor was written, so remove it.
in.remove();
return true;
}
return super.doWriteSingle(in);
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof EpollFileDescriptor) {
return msg;
}
return super.filterOutboundMessage(msg);
}
private final class EpollDomainUnsafe extends EpollStreamUnsafe {
@Override
void epollInReady() {
switch (config().getReadMode()) {
case BYTES:
super.epollInReady();
break;
case FILE_DESCRIPTORS:
epollInReadFd();
break;
default:
throw new Error();
}
}
private void epollInReadFd() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
try {
for (;;) {
int socketFd = Native.recvFd(fd().intValue());
if (socketFd == 0) {
break;
}
if (socketFd == -1) {
close(voidPromise());
return;
}
readPending = false;
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
}
pipeline.fireChannelReadComplete();
} catch (Throwable t) {
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
eventLoop().execute(new Runnable() {
@Override
public void run() {
epollInReady();
}
});
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !readPending) {
clearEpollIn0();
}
}
}
}
}

View File

@ -0,0 +1,145 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import java.util.Map;
public final class EpollDomainSocketChannelConfig extends DefaultChannelConfig {
private volatile DomainSocketReadMode mode =
DomainSocketReadMode.BYTES;
EpollDomainSocketChannelConfig(Channel channel) {
super(channel);
}
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), EpollChannelOption.DOMAIN_SOCKET_READ_MODE);
}
@SuppressWarnings("unchecked")
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == EpollChannelOption.DOMAIN_SOCKET_READ_MODE) {
return (T) getReadMode();
}
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == EpollChannelOption.DOMAIN_SOCKET_READ_MODE) {
setReadMode((DomainSocketReadMode) value);
} else {
return super.setOption(option, value);
}
return true;
}
@Override
public EpollDomainSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
super.setMaxMessagesPerRead(maxMessagesPerRead);
return this;
}
@Override
public EpollDomainSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
}
@Override
public EpollDomainSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
super.setWriteSpinCount(writeSpinCount);
return this;
}
@Override
public EpollDomainSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public EpollDomainSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public EpollDomainSocketChannelConfig setAutoClose(boolean autoClose) {
super.setAutoClose(autoClose);
return this;
}
@Override
public EpollDomainSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
@Override
public EpollDomainSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public EpollDomainSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public EpollDomainSocketChannelConfig setAutoRead(boolean autoRead) {
super.setAutoRead(autoRead);
return this;
}
/**
* Change the {@link DomainSocketReadMode} for the channel. The default is
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the
* {@link Channel} and passed through the pipeline. If
* {@link DomainSocketReadMode#FILE_DESCRIPTORS} is used
* {@link EpollFileDescriptor}s will be passed through the {@link ChannelPipeline}.
*
* This setting can be modified on the fly if needed.
*/
public EpollDomainSocketChannelConfig setReadMode(DomainSocketReadMode mode) {
if (mode == null) {
throw new NullPointerException("mode");
}
this.mode = mode;
return this;
}
/**
* Return the {@link DomainSocketReadMode} for the channel.
*/
public DomainSocketReadMode getReadMode() {
return mode;
}
}

View File

@ -134,7 +134,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
void add(AbstractEpollChannel ch) {
assert inEventLoop();
int id = nextId();
Native.epollCtlAdd(epollFd, ch.fd, ch.flags, id);
Native.epollCtlAdd(epollFd, ch.fd().intValue(), ch.flags, id);
ch.id = id;
ids.put(id, ch);
}
@ -144,7 +144,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
*/
void modify(AbstractEpollChannel ch) {
assert inEventLoop();
Native.epollCtlMod(epollFd, ch.fd, ch.flags, ch.id);
Native.epollCtlMod(epollFd, ch.fd().intValue(), ch.flags, ch.id);
}
/**
@ -155,7 +155,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
if (ids.remove(ch.id) != null && ch.isOpen()) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd, ch.fd);
Native.epollCtlDel(epollFd, ch.fd().intValue());
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.FileDescriptor;
import java.io.IOException;
final class EpollFileDescriptor implements FileDescriptor {
private final int fd;
EpollFileDescriptor(int fd) {
if (fd < 0) {
throw new IllegalArgumentException("fd must be >= 0");
}
this.fd = fd;
}
@Override
public int intValue() {
return fd;
}
@Override
public void close() throws IOException {
Native.close(fd);
}
@Override
public String toString() {
return "FileDescriptor{" +
"fd=" + fd +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof EpollFileDescriptor)) {
return false;
}
return fd == ((EpollFileDescriptor) o).fd;
}
@Override
public int hashCode() {
return fd;
}
}

View File

@ -75,20 +75,20 @@ public class EpollServerChannelConfig extends DefaultChannelConfig {
}
public boolean isReuseAddress() {
return Native.isReuseAddress(channel.fd) == 1;
return Native.isReuseAddress(channel.fd().intValue()) == 1;
}
public EpollServerChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
Native.setReuseAddress(channel.fd().intValue(), reuseAddress ? 1 : 0);
return this;
}
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(channel.fd);
return Native.getReceiveBufferSize(channel.fd().intValue());
}
public EpollServerChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
Native.setReceiveBufferSize(channel.fd().intValue(), receiveBufferSize);
return this;
}

View File

@ -46,6 +46,7 @@ public final class EpollServerDomainSocketChannel extends AbstractEpollServerCha
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
int fd = fd().intValue();
Native.bind(fd, localAddress);
Native.listen(fd, config.getBacklog());
local = (DomainSocketAddress) localAddress;

View File

@ -45,6 +45,7 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
protected void doBind(SocketAddress localAddress) throws Exception {
InetSocketAddress addr = (InetSocketAddress) localAddress;
checkResolvable(addr);
int fd = fd().intValue();
Native.bind(fd, addr);
local = Native.localAddress(fd);
Native.listen(fd, config.getBacklog());

View File

@ -143,7 +143,7 @@ public final class EpollServerSocketChannelConfig extends EpollServerChannelConf
* Returns {@code true} if the SO_REUSEPORT option is set.
*/
public boolean isReusePort() {
return Native.isReusePort(channel.fd) == 1;
return Native.isReusePort(channel.fd().intValue()) == 1;
}
/**
@ -154,7 +154,7 @@ public final class EpollServerSocketChannelConfig extends EpollServerChannelConf
* any affect.
*/
public EpollServerSocketChannelConfig setReusePort(boolean reusePort) {
Native.setReusePort(channel.fd, reusePort ? 1 : 0);
Native.setReusePort(channel.fd().intValue(), reusePort ? 1 : 0);
return this;
}
}

View File

@ -61,7 +61,7 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
* See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
*/
public EpollTcpInfo tcpInfo(EpollTcpInfo info) {
Native.tcpInfo(fd, info);
Native.tcpInfo(fd().intValue(), info);
return info;
}
@ -84,7 +84,7 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
protected SocketAddress remoteAddress0() {
if (remote == null) {
// Remote address not know, try to get it now.
InetSocketAddress address = Native.remoteAddress(fd);
InetSocketAddress address = Native.remoteAddress(fd().intValue());
if (address != null) {
remote = address;
}
@ -96,6 +96,7 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
@Override
protected void doBind(SocketAddress local) throws Exception {
InetSocketAddress localAddress = (InetSocketAddress) local;
int fd = fd().intValue();
Native.bind(fd, localAddress);
this.local = Native.localAddress(fd);
}
@ -137,6 +138,7 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
}
checkResolvable((InetSocketAddress) remoteAddress);
if (super.doConnect(remoteAddress, localAddress)) {
int fd = fd().intValue();
local = Native.localAddress(fd);
remote = Native.remoteAddress(fd);
return true;

View File

@ -132,70 +132,70 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
@Override
public int getReceiveBufferSize() {
return Native.getReceiveBufferSize(channel.fd);
return Native.getReceiveBufferSize(channel.fd().intValue());
}
@Override
public int getSendBufferSize() {
return Native.getSendBufferSize(channel.fd);
return Native.getSendBufferSize(channel.fd().intValue());
}
@Override
public int getSoLinger() {
return Native.getSoLinger(channel.fd);
return Native.getSoLinger(channel.fd().intValue());
}
@Override
public int getTrafficClass() {
return Native.getTrafficClass(channel.fd);
return Native.getTrafficClass(channel.fd().intValue());
}
@Override
public boolean isKeepAlive() {
return Native.isKeepAlive(channel.fd) == 1;
return Native.isKeepAlive(channel.fd().intValue()) == 1;
}
@Override
public boolean isReuseAddress() {
return Native.isReuseAddress(channel.fd) == 1;
return Native.isReuseAddress(channel.fd().intValue()) == 1;
}
@Override
public boolean isTcpNoDelay() {
return Native.isTcpNoDelay(channel.fd) == 1;
return Native.isTcpNoDelay(channel.fd().intValue()) == 1;
}
/**
* Get the {@code TCP_CORK} option on the socket. See {@code man 7 tcp} for more details.
*/
public boolean isTcpCork() {
return Native.isTcpCork(channel.fd) == 1;
return Native.isTcpCork(channel.fd().intValue()) == 1;
}
/**
* Get the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details.
*/
public int getTcpKeepIdle() {
return Native.getTcpKeepIdle(channel.fd);
return Native.getTcpKeepIdle(channel.fd().intValue());
}
/**
* Get the {@code TCP_KEEPINTVL} option on the socket. See {@code man 7 tcp} for more details.
*/
public int getTcpKeepIntvl() {
return Native.getTcpKeepIntvl(channel.fd);
return Native.getTcpKeepIntvl(channel.fd().intValue());
}
/**
* Get the {@code TCP_KEEPCNT} option on the socket. See {@code man 7 tcp} for more details.
*/
public int getTcpKeepCnt() {
return Native.getTcpKeepCnt(channel.fd);
return Native.getTcpKeepCnt(channel.fd().intValue());
}
@Override
public EpollSocketChannelConfig setKeepAlive(boolean keepAlive) {
Native.setKeepAlive(channel.fd, keepAlive ? 1 : 0);
Native.setKeepAlive(channel.fd().intValue(), keepAlive ? 1 : 0);
return this;
}
@ -207,31 +207,31 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
@Override
public EpollSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
Native.setReceiveBufferSize(channel.fd().intValue(), receiveBufferSize);
return this;
}
@Override
public EpollSocketChannelConfig setReuseAddress(boolean reuseAddress) {
Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
Native.setReuseAddress(channel.fd().intValue(), reuseAddress ? 1 : 0);
return this;
}
@Override
public EpollSocketChannelConfig setSendBufferSize(int sendBufferSize) {
Native.setSendBufferSize(channel.fd, sendBufferSize);
Native.setSendBufferSize(channel.fd().intValue(), sendBufferSize);
return this;
}
@Override
public EpollSocketChannelConfig setSoLinger(int soLinger) {
Native.setSoLinger(channel.fd, soLinger);
Native.setSoLinger(channel.fd().intValue(), soLinger);
return this;
}
@Override
public EpollSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
Native.setTcpNoDelay(channel.fd, tcpNoDelay ? 1 : 0);
Native.setTcpNoDelay(channel.fd().intValue(), tcpNoDelay ? 1 : 0);
return this;
}
@ -239,13 +239,13 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
* Set the {@code TCP_CORK} option on the socket. See {@code man 7 tcp} for more details.
*/
public EpollSocketChannelConfig setTcpCork(boolean tcpCork) {
Native.setTcpCork(channel.fd, tcpCork ? 1 : 0);
Native.setTcpCork(channel.fd().intValue(), tcpCork ? 1 : 0);
return this;
}
@Override
public EpollSocketChannelConfig setTrafficClass(int trafficClass) {
Native.setTrafficClass(channel.fd, trafficClass);
Native.setTrafficClass(channel.fd().intValue(), trafficClass);
return this;
}
@ -253,7 +253,7 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
* Set the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details.
*/
public EpollSocketChannelConfig setTcpKeepIdle(int seconds) {
Native.setTcpKeepIdle(channel.fd, seconds);
Native.setTcpKeepIdle(channel.fd().intValue(), seconds);
return this;
}
@ -261,7 +261,7 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
* Set the {@code TCP_KEEPINTVL} option on the socket. See {@code man 7 tcp} for more details.
*/
public EpollSocketChannelConfig setTcpKeepIntvl(int seconds) {
Native.setTcpKeepIntvl(channel.fd, seconds);
Native.setTcpKeepIntvl(channel.fd().intValue(), seconds);
return this;
}
@ -269,7 +269,7 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig impleme
* Set the {@code TCP_KEEPCNT} option on the socket. See {@code man 7 tcp} for more details.
*/
public EpollSocketChannelConfig setTcpKeepCntl(int probes) {
Native.setTcpKeepCnt(channel.fd, probes);
Native.setTcpKeepCnt(channel.fd().intValue(), probes);
return this;
}

View File

@ -511,6 +511,24 @@ final class Native {
public static int recvFd(int fd) throws IOException {
int res = recvFd0(fd);
if (res > 0) {
return res;
}
if (res == 0) {
return -1;
}
if (res == ERRNO_EAGAIN_NEGATIVE || res == ERRNO_EWOULDBLOCK_NEGATIVE) {
// Everything consumed so just return -1 here.
return 0;
}
throw newIOException("recvFd", res);
}
private static native int recvFd0(int fd);
public static int sendFd(int socketFd, int fd) throws IOException {
int res = sendFd0(socketFd, fd);
if (res >= 0) {
return res;
}
@ -518,10 +536,10 @@ final class Native {
// Everything consumed so just return -1 here.
return -1;
}
throw newIOException("recvFd", res);
throw newIOException("sendFd", res);
}
private static native int recvFd0(int fd);
private static native int sendFd0(int socketFd, int fd);
public static void shutdown(int fd, boolean read, boolean write) throws IOException {
int res = shutdown0(fd, read, write);

View File

@ -0,0 +1,101 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.AbstractSocketTest;
import org.junit.Assert;
import org.junit.Test;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class EpollDomainSocketFdTest extends AbstractSocketTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
@Test(timeout = 30000)
public void testSendRecvFd() throws Throwable {
run();
}
public void testSendRecvFd(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(1);
sb.childHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Create new channel and obtain a file descriptor from it.
final EpollDomainSocketChannel ch = new EpollDomainSocketChannel();
ctx.writeAndFlush(ch.fd()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause();
queue.offer(cause);
}
}
});
}
});
cb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
EpollFileDescriptor fd = (EpollFileDescriptor) msg;
queue.offer(fd);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
queue.add(cause);
ctx.close();
}
});
cb.option(EpollChannelOption.DOMAIN_SOCKET_READ_MODE,
DomainSocketReadMode.FILE_DESCRIPTORS);
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
Object received = queue.take();
cc.close().sync();
sc.close().sync();
if (received instanceof EpollFileDescriptor) {
Assert.assertNotSame(EpollFileDescriptor.INVALID, received);
((EpollFileDescriptor) received).close();
Assert.assertNull(queue.poll());
} else {
throw (Throwable) received;
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.io.IOException;
public interface FileDescriptor {
/**
* An invalid file descriptor which was closed before.
*/
FileDescriptor INVALID = new FileDescriptor() {
@Override
public int intValue() {
throw new IllegalStateException("invalid file descriptor");
}
@Override
public void close() {
// NOOP
}
};
/**
* Return the int value of the filedescriptor.
*/
int intValue();
/**
* Close the file descriptor.
*/
void close() throws IOException;
}