Add support for splice(...)

Motivation:

Linux supports splice(...) to transfer data from one filedescriptor to another without
pass data through the user-space. This allows to write high-performant proxy code or to stream
stuff from the socket directly the the filesystem.

Modification:

Add AbstractEpollStreamChannel.spliceTo(...) method to support splice(...) system call

Result:

Splice is now supported when using the native linux transport.

Conflicts:

	transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java
This commit is contained in:
Norman Maurer 2015-04-14 06:54:20 +02:00
parent 55fbf007f0
commit 2c2b89d30f
8 changed files with 842 additions and 19 deletions

View File

@ -46,6 +46,7 @@
// optional
extern int accept4(int sockFd, struct sockaddr* addr, socklen_t* addrlen, int flags) __attribute__((weak));
extern int epoll_create1(int flags) __attribute__((weak));
extern int pipe2(int pipefd[2], int flags) __attribute__((weak));
#ifdef IO_NETTY_SENDMMSG_NOT_FOUND
extern int sendmmsg(int sockfd, struct mmsghdr* msgvec, unsigned int vlen, unsigned int flags) __attribute__((weak));
@ -1578,3 +1579,50 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEn
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz) {
return offsetof(struct epoll_event, data);
}
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_pipe0(JNIEnv* env, jclass clazz) {
int fd[2];
if (pipe2) {
// we can just use pipe2 and so save extra syscalls;
if (pipe2(fd, O_NONBLOCK) != 0) {
return -errno;
}
} else {
if (pipe(fd) == 0) {
if (fcntl(fd[0], F_SETFD, O_NONBLOCK) < 0) {
int err = errno;
close(fd[0]);
close(fd[1]);
return -err;
}
if (fcntl(fd[1], F_SETFD, O_NONBLOCK) < 0) {
int err = errno;
close(fd[0]);
close(fd[1]);
return -err;
}
} else {
return -errno;
}
}
// encode the fds into a long
return (((long) fd[0]) << 32) | (fd[1] & 0xffffffffL);
}
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_splice0(JNIEnv* env, jclass clazz, jint fd, jint offIn, jint fdOut, jint offOut, jint len) {
ssize_t res;
int err;
loff_t off_in = offIn >= 0 ? (loff_t) offIn : NULL;
loff_t off_out = offOut >= 0 ? (loff_t) offOut : NULL;
do {
res = splice(fd, off_in, fdOut, off_out, (size_t) len, SPLICE_F_NONBLOCK | SPLICE_F_MOVE);
// keep on splicing if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
return (jint) res;
}

View File

@ -120,3 +120,6 @@ jint Java_io_netty_channel_epoll_Native_epollrdhup(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_epollet(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz);
jlong Java_io_netty_channel_epoll_Native_pipe0(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_splice0(JNIEnv* env, jclass clazz, jint fd, jint offIn, jint fdOut, jint offOut, jint len);

View File

@ -16,6 +16,7 @@
#include <jni.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include "io_netty_channel_unix_FileDescriptor.h"
JNIEXPORT int JNICALL Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* env, jclass clazz, jint fd) {
@ -24,3 +25,16 @@ JNIEXPORT int JNICALL Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* en
}
return 0;
}
JNIEXPORT int JNICALL Java_io_netty_channel_unix_FileDescriptor_open(JNIEnv* env, jclass clazz, jstring path) {
const char* f_path = (*env)->GetStringUTFChars(env, path, 0);
int res = open(f_path, O_WRONLY | O_CREAT | O_TRUNC, 0666);
(*env)->ReleaseStringUTFChars(env, path, f_path);
if (res < 0) {
return -errno;
}
return res;
}

View File

@ -16,3 +16,4 @@
#include <jni.h>
int Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* env, jclass clazz, jint fd);
int Java_io_netty_channel_unix_FileDescriptor_open(JNIEnv* env, jclass clazz, jstring path);

View File

@ -28,26 +28,35 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.unix.FileDescriptor;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.MpscLinkedQueueNode;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
static {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
@ -60,10 +69,15 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue();
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
// Lazy init these if we need to splice(...)
private int pipeIn = -1;
private int pipeOut = -1;
protected AbstractEpollStreamChannel(Channel parent, int fd) {
super(parent, fd, Native.EPOLLIN, true);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
@ -85,6 +99,129 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
return new EpollStreamUnsafe();
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
* The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
* splice until the {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
* {@link IllegalArgumentException} is thrown. </li>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
* target {@link AbstractEpollStreamChannel}</li>
* </ul>
*
*/
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
return spliceTo(ch, len, newPromise());
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
* The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
* splice until the {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
* {@link IllegalArgumentException} is thrown. </li>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
* target {@link AbstractEpollStreamChannel}</li>
* </ul>
*
*/
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
final ChannelPromise promise) {
if (ch.eventLoop() != eventLoop()) {
throw new IllegalArgumentException("EventLoops are not the same.");
}
if (len < 0) {
throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
}
if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
|| config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
}
checkNotNull(promise, "promise");
if (!isOpen()) {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
} else {
SpliceInTask task = new SpliceInChannelTask(ch, len, checkNotNull(promise, "promise"));
spliceQueue.add(task);
failSpliceIfClosed(promise);
}
return promise;
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
* The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
* number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
* {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
* {@link AbstractEpollStreamChannel}</li>
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
* </ul>
*/
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
return spliceTo(ch, offset, len, newPromise());
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
* The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
* number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
* {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
* {@link AbstractEpollStreamChannel}</li>
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
* </ul>
*/
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
final ChannelPromise promise) {
if (len < 0) {
throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
}
if (offset < 0) {
throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
}
if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
}
checkNotNull(promise, "promise");
if (!isOpen()) {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
} else {
SpliceInTask task = new SpliceFdTask(ch, offset, len, checkNotNull(promise, "promise"));
spliceQueue.add(task);
failSpliceIfClosed(promise);
}
return promise;
}
private void failSpliceIfClosed(ChannelPromise promise) {
if (!isOpen()) {
// Seems like the Channel was closed in the meantime try to fail the promise to prevent any
// cases where a future may not be notified otherwise.
if (promise.tryFailure(CLOSED_CHANNEL_EXCEPTION)) {
eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
// Call this via the EventLoop as it is a MPSC queue.
clearSpliceQueue();
}
});
}
}
}
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written
@ -289,6 +426,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof SpliceOutTask) {
if (!((SpliceOutTask) msg).spliceOut()) {
return false;
}
in.remove();
} else {
// Should never reach here.
throw new Error();
@ -354,7 +496,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
return buf;
}
if (msg instanceof DefaultFileRegion) {
if (msg instanceof DefaultFileRegion || msg instanceof SpliceOutTask) {
return msg;
}
@ -380,6 +522,40 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
}
@Override
protected void doClose() throws Exception {
try {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
// Calling super.doClose() first so splceTo(...) will fail on next call.
super.doClose();
} finally {
safeClosePipe(pipeIn);
safeClosePipe(pipeOut);
clearSpliceQueue();
}
}
private void clearSpliceQueue() {
for (;;) {
SpliceInTask task = spliceQueue.poll();
if (task == null) {
break;
}
task.promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
}
}
/**
* Connect to the remote peer
*/
@ -403,21 +579,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
}
@Override
protected void doClose() throws Exception {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
private void safeClosePipe(int pipe) {
if (pipe != -1) {
try {
Native.close(pipe);
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn("Error while closing a pipe", e);
}
}
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
super.doClose();
}
class EpollStreamUnsafe extends AbstractEpollUnsafe {
@ -628,6 +799,20 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
int messages = 0;
int totalReadAmount = 0;
do {
SpliceInTask spliceTask = spliceQueue.peek();
if (spliceTask != null) {
if (spliceTask.spliceIn(allocHandle)) {
// We need to check if it is still active as if not we removed all SpliceTasks in
// doClose(...)
if (isActive()) {
spliceQueue.remove();
}
continue;
} else {
break;
}
}
// we use a direct buffer here as the native implementations only be able
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
@ -697,4 +882,206 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
}
}
protected abstract class SpliceInTask extends MpscLinkedQueueNode<SpliceInTask> {
final ChannelPromise promise;
int len;
protected SpliceInTask(int len, ChannelPromise promise) {
this.promise = promise;
this.len = len;
}
@Override
public SpliceInTask value() {
return this;
}
abstract boolean spliceIn(RecvByteBufAllocator.Handle handle) throws IOException;
protected final int spliceIn(int pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
// calculate the maximum amount of data we are allowed to splice
int length = Math.min(handle.guess(), len);
int splicedIn = 0;
for (;;) {
// Splicing until there is nothing left to splice.
int localSplicedIn = Native.splice(fd().intValue(), -1, pipeOut, -1, length);
if (localSplicedIn == 0) {
break;
}
splicedIn += localSplicedIn;
length -= localSplicedIn;
}
// record the number of bytes we spliced before
handle.record(splicedIn);
return splicedIn;
}
}
// Let it directly implement channelFutureListener as well to reduce object creation.
private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
private final AbstractEpollStreamChannel ch;
SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
super(len, promise);
this.ch = ch;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
}
}
@Override
public boolean spliceIn(RecvByteBufAllocator.Handle handle) throws IOException {
assert ch.eventLoop().inEventLoop();
if (len == 0) {
promise.setSuccess();
return true;
}
try {
// We create the pipe on the target channel as this will allow us to just handle pending writes
// later in a correct fashion without get into any ordering issues when spliceTo(...) is called
// on multiple Channels pointing to one target Channel.
int pipeOut = ch.pipeOut;
if (pipeOut == -1) {
// Create a new pipe as non was created before.
long fds = Native.pipe();
ch.pipeIn = (int) (fds >> 32);
pipeOut = ch.pipeOut = (int) fds;
}
int splicedIn = spliceIn(pipeOut, handle);
if (splicedIn > 0) {
// Integer.MAX_VALUE is a special value which will result in splice forever.
if (len != Integer.MAX_VALUE) {
len -= splicedIn;
}
// Depending on if we are done with splicing inbound data we set the right promise for the
// outbound splicing.
final ChannelPromise splicePromise;
if (len == 0) {
splicePromise = promise;
} else {
splicePromise = ch.newPromise().addListener(this);
}
boolean autoRead = config().isAutoRead();
// Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
// case.
ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
ch.unsafe().flush();
if (autoRead && !splicePromise.isDone()) {
// Write was not done which means the target channel was not writable. In this case we need to
// disable reading until we are done with splicing to the target channel because:
//
// - The user may want to to trigger another splice operation once the splicing was complete.
config().setAutoRead(false);
}
}
return len == 0;
} catch (Throwable cause) {
promise.setFailure(cause);
return true;
}
}
}
private final class SpliceOutTask {
private final AbstractEpollStreamChannel ch;
private final boolean autoRead;
private int len;
SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
this.ch = ch;
this.len = len;
this.autoRead = autoRead;
}
public boolean spliceOut() throws Exception {
assert ch.eventLoop().inEventLoop();
try {
int splicedOut = Native.splice(ch.pipeIn, -1, ch.fd().intValue(), -1, len);
len -= splicedOut;
if (len == 0) {
if (autoRead) {
// AutoRead was used and we spliced everything so start reading again
config().setAutoRead(true);
}
return true;
}
return false;
} catch (IOException e) {
if (autoRead) {
// AutoRead was used and we spliced everything so start reading again
config().setAutoRead(true);
}
throw e;
}
}
}
private final class SpliceFdTask extends SpliceInTask {
private final FileDescriptor fd;
private final ChannelPromise promise;
private int offset;
SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
super(len, promise);
this.fd = fd;
this.promise = promise;
this.offset = offset;
}
@Override
public SpliceFdTask value() {
return this;
}
@Override
public boolean spliceIn(RecvByteBufAllocator.Handle handle) throws IOException {
assert eventLoop().inEventLoop();
if (len == 0) {
promise.setSuccess();
return true;
}
int pipeIn = -1;
int pipeOut = -1;
try {
long fds = Native.pipe();
pipeIn = (int) (fds >> 32);
pipeOut = (int) fds;
int splicedIn = spliceIn(pipeOut, handle);
if (splicedIn > 0) {
// Integer.MAX_VALUE is a special value which will result in splice forever.
if (len != Integer.MAX_VALUE) {
len -= splicedIn;
}
do {
int splicedOut = Native.splice(pipeIn, -1, fd.intValue(), offset, splicedIn);
splicedIn -= splicedOut;
} while (splicedIn > 0);
if (len == 0) {
promise.setSuccess();
return true;
}
}
return false;
} catch (Throwable cause) {
promise.setFailure(cause);
return true;
} finally {
safeClosePipe(pipeIn);
safeClosePipe(pipeOut);
}
}
}
}

View File

@ -39,7 +39,7 @@ import java.util.Locale;
*
* <strong>Internal usage only!</strong>
*/
final class Native {
public final class Native {
static {
String name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim();
@ -89,6 +89,7 @@ final class Native {
private static final IOException CONNECTION_RESET_EXCEPTION_SENDTO;
private static final IOException CONNECTION_RESET_EXCEPTION_SENDMSG;
private static final IOException CONNECTION_RESET_EXCEPTION_SENDMMSG;
private static final IOException CONNECTION_RESET_EXCEPTION_SPLICE;
static {
for (int i = 0; i < ERRORS.length; i++) {
@ -110,6 +111,8 @@ final class Native {
ERRNO_EPIPE_NEGATIVE);
CONNECTION_RESET_EXCEPTION_SENDMMSG = newConnectionResetException("syscall:sendmmsg(...)",
ERRNO_EPIPE_NEGATIVE);
CONNECTION_RESET_EXCEPTION_SPLICE = newConnectionResetException("syscall:splice(...)",
ERRNO_EPIPE_NEGATIVE);
CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}
@ -120,7 +123,7 @@ final class Native {
return exception;
}
private static IOException newIOException(String method, int err) {
public static IOException newIOException(String method, int err) {
return new IOException(method + "() failed: " + ERRORS[-err]);
}
@ -177,6 +180,26 @@ final class Native {
private static native int close0(int fd);
public static int splice(int fd, int offIn, int fdOut, int offOut, int len) throws IOException {
int res = splice0(fd, offIn, fdOut, offOut, len);
if (res >= 0) {
return res;
}
return ioResult("splice", res, CONNECTION_RESET_EXCEPTION_SPLICE);
}
private static native int splice0(int fd, int offIn, int fdOut, int offOut, int len);
public static long pipe() throws IOException {
long res = pipe0();
if (res >= 0) {
return res;
}
throw newIOException("pipe", (int) res);
}
private static native long pipe0();
public static int write(int fd, ByteBuffer buf, int pos, int limit) throws IOException {
int res = write0(fd, buf, pos, limit);
if (res >= 0) {

View File

@ -15,8 +15,13 @@
*/
package io.netty.channel.unix;
import io.netty.channel.epoll.Native;
import java.io.File;
import java.io.IOException;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a
* {@link FileDescriptor} for it.
@ -80,4 +85,25 @@ public class FileDescriptor {
}
private static native int close(int fd);
/**
* Open a new {@link FileDescriptor} for the given path.
*/
public static FileDescriptor from(String path) throws IOException {
checkNotNull(path, "path");
int res = open(path);
if (res < 0) {
throw Native.newIOException("open", res);
}
return new FileDescriptor(res);
}
/**
* Open a new {@link FileDescriptor} for the given {@link File}.
*/
public static FileDescriptor from(File file) throws IOException {
return from(checkNotNull(file, "file").getPath());
}
private static native int open(String path);
}

View File

@ -0,0 +1,321 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.unix.FileDescriptor;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
public class EpollSpliceTest {
private static final int SPLICE_LEN = 32 * 1024;
private static final Random random = new Random();
private static final byte[] data = new byte[1048576];
static {
random.nextBytes(data);
}
@Test
public void spliceToSocket() throws Throwable {
final EchoHandler sh = new EchoHandler();
final EchoHandler ch = new EchoHandler();
EventLoopGroup group = new EpollEventLoopGroup(1);
ServerBootstrap bs = new ServerBootstrap();
bs.channel(EpollServerSocketChannel.class);
bs.group(group).childHandler(sh);
final Channel sc = bs.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
ServerBootstrap bs2 = new ServerBootstrap();
bs2.channel(EpollServerSocketChannel.class);
bs2.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bs2.group(group).childHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setAutoRead(false);
Bootstrap bs = new Bootstrap();
bs.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bs.channel(EpollSocketChannel.class);
bs.group(ctx.channel().eventLoop()).handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel();
final EpollSocketChannel ch2 = (EpollSocketChannel) context.channel();
// We are splicing two channels together, at this point we have a tcp proxy which handles all
// the data transfer only in kernel space!
// Integer.MAX_VALUE will splice infinitly.
ch.spliceTo(ch2, Integer.MAX_VALUE).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.channel().close();
}
}
});
// Trigger multiple splices to see if partial splicing works as well.
ch2.spliceTo(ch, SPLICE_LEN).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.channel().close();
} else {
ch2.spliceTo(ch, SPLICE_LEN).addListener(this);
}
}
});
ctx.channel().config().setAutoRead(true);
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
context.close();
}
});
bs.connect(sc.localAddress()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.close();
} else {
future.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.close();
}
});
}
}
});
}
});
Channel pc = bs2.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(group);
cb.channel(EpollSocketChannel.class);
cb.handler(ch);
Channel cc = cb.connect(pc.localAddress()).syncUninterruptibly().channel();
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
cc.writeAndFlush(buf);
i += length;
}
while (ch.counter < data.length) {
if (sh.exception.get() != null) {
break;
}
if (ch.exception.get() != null) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
while (sh.counter < data.length) {
if (sh.exception.get() != null) {
break;
}
if (ch.exception.get() != null) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
sh.channel.close().sync();
ch.channel.close().sync();
sc.close().sync();
pc.close().sync();
group.shutdownGracefully();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
@Test
public void spliceToFile() throws Throwable {
EventLoopGroup group = new EpollEventLoopGroup(1);
File file = File.createTempFile("netty-splice", null);
file.deleteOnExit();
SpliceHandler sh = new SpliceHandler(file);
ServerBootstrap bs = new ServerBootstrap();
bs.channel(EpollServerSocketChannel.class);
bs.group(group).childHandler(sh);
bs.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
Channel sc = bs.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(group);
cb.channel(EpollSocketChannel.class);
cb.handler(new ChannelInboundHandlerAdapter());
Channel cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
cc.writeAndFlush(buf);
i += length;
}
while (sh.future == null || !sh.future.isDone()) {
if (sh.exception.get() != null) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
sc.close().sync();
cc.close().sync();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
byte[] written = new byte[data.length];
FileInputStream in = new FileInputStream(file);
try {
Assert.assertEquals(written.length, in.read(written));
Assert.assertArrayEquals(data, written);
} finally {
in.close();
group.shutdownGracefully();
}
}
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
channel = ctx.channel();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual);
int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
assertEquals(data[i + lastIdx], actual[i]);
}
if (channel.parent() != null) {
channel.write(Unpooled.wrappedBuffer(actual));
}
counter += actual.length;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
ctx.close();
}
}
}
private static class SpliceHandler extends ChannelInboundHandlerAdapter {
private final File file;
volatile Channel channel;
volatile ChannelFuture future;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
public SpliceHandler(File file) {
this.file = file;
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
channel = ctx.channel();
final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel();
final FileDescriptor fd = FileDescriptor.from(file);
future = ch.spliceTo(fd, 0, data.length);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
ctx.close();
}
}
}
}