Add support for splice(...)
Motivation: Linux supports splice(...) to transfer data from one filedescriptor to another without pass data through the user-space. This allows to write high-performant proxy code or to stream stuff from the socket directly the the filesystem. Modification: Add AbstractEpollStreamChannel.spliceTo(...) method to support splice(...) system call Result: Splice is now supported when using the native linux transport. Conflicts: transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java
This commit is contained in:
parent
694eab76eb
commit
ca250c8a4b
@ -46,6 +46,7 @@
|
|||||||
// optional
|
// optional
|
||||||
extern int accept4(int sockFd, struct sockaddr* addr, socklen_t* addrlen, int flags) __attribute__((weak));
|
extern int accept4(int sockFd, struct sockaddr* addr, socklen_t* addrlen, int flags) __attribute__((weak));
|
||||||
extern int epoll_create1(int flags) __attribute__((weak));
|
extern int epoll_create1(int flags) __attribute__((weak));
|
||||||
|
extern int pipe2(int pipefd[2], int flags) __attribute__((weak));
|
||||||
|
|
||||||
#ifdef IO_NETTY_SENDMMSG_NOT_FOUND
|
#ifdef IO_NETTY_SENDMMSG_NOT_FOUND
|
||||||
extern int sendmmsg(int sockfd, struct mmsghdr* msgvec, unsigned int vlen, unsigned int flags) __attribute__((weak));
|
extern int sendmmsg(int sockfd, struct mmsghdr* msgvec, unsigned int vlen, unsigned int flags) __attribute__((weak));
|
||||||
@ -1578,3 +1579,50 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEn
|
|||||||
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz) {
|
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz) {
|
||||||
return offsetof(struct epoll_event, data);
|
return offsetof(struct epoll_event, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_pipe0(JNIEnv* env, jclass clazz) {
|
||||||
|
int fd[2];
|
||||||
|
if (pipe2) {
|
||||||
|
// we can just use pipe2 and so save extra syscalls;
|
||||||
|
if (pipe2(fd, O_NONBLOCK) != 0) {
|
||||||
|
return -errno;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pipe(fd) == 0) {
|
||||||
|
if (fcntl(fd[0], F_SETFD, O_NONBLOCK) < 0) {
|
||||||
|
int err = errno;
|
||||||
|
close(fd[0]);
|
||||||
|
close(fd[1]);
|
||||||
|
return -err;
|
||||||
|
}
|
||||||
|
if (fcntl(fd[1], F_SETFD, O_NONBLOCK) < 0) {
|
||||||
|
int err = errno;
|
||||||
|
close(fd[0]);
|
||||||
|
close(fd[1]);
|
||||||
|
return -err;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return -errno;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// encode the fds into a long
|
||||||
|
return (((long) fd[0]) << 32) | (fd[1] & 0xffffffffL);
|
||||||
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_splice0(JNIEnv* env, jclass clazz, jint fd, jint offIn, jint fdOut, jint offOut, jint len) {
|
||||||
|
ssize_t res;
|
||||||
|
int err;
|
||||||
|
loff_t off_in = offIn >= 0 ? (loff_t) offIn : NULL;
|
||||||
|
loff_t off_out = offOut >= 0 ? (loff_t) offOut : NULL;
|
||||||
|
|
||||||
|
do {
|
||||||
|
res = splice(fd, off_in, fdOut, off_out, (size_t) len, SPLICE_F_NONBLOCK | SPLICE_F_MOVE);
|
||||||
|
// keep on splicing if it was interrupted
|
||||||
|
} while (res == -1 && ((err = errno) == EINTR));
|
||||||
|
|
||||||
|
if (res < 0) {
|
||||||
|
return -err;
|
||||||
|
}
|
||||||
|
return (jint) res;
|
||||||
|
}
|
@ -120,3 +120,6 @@ jint Java_io_netty_channel_epoll_Native_epollrdhup(JNIEnv* env, jclass clazz);
|
|||||||
jint Java_io_netty_channel_epoll_Native_epollet(JNIEnv* env, jclass clazz);
|
jint Java_io_netty_channel_epoll_Native_epollet(JNIEnv* env, jclass clazz);
|
||||||
jint Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass clazz);
|
jint Java_io_netty_channel_epoll_Native_sizeofEpollEvent(JNIEnv* env, jclass clazz);
|
||||||
jint Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz);
|
jint Java_io_netty_channel_epoll_Native_offsetofEpollData(JNIEnv* env, jclass clazz);
|
||||||
|
|
||||||
|
jlong Java_io_netty_channel_epoll_Native_pipe0(JNIEnv* env, jclass clazz);
|
||||||
|
jint Java_io_netty_channel_epoll_Native_splice0(JNIEnv* env, jclass clazz, jint fd, jint offIn, jint fdOut, jint offOut, jint len);
|
@ -16,6 +16,7 @@
|
|||||||
#include <jni.h>
|
#include <jni.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
#include <fcntl.h>
|
||||||
#include "io_netty_channel_unix_FileDescriptor.h"
|
#include "io_netty_channel_unix_FileDescriptor.h"
|
||||||
|
|
||||||
JNIEXPORT int JNICALL Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* env, jclass clazz, jint fd) {
|
JNIEXPORT int JNICALL Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* env, jclass clazz, jint fd) {
|
||||||
@ -24,3 +25,16 @@ JNIEXPORT int JNICALL Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* en
|
|||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT int JNICALL Java_io_netty_channel_unix_FileDescriptor_open(JNIEnv* env, jclass clazz, jstring path) {
|
||||||
|
|
||||||
|
const char* f_path = (*env)->GetStringUTFChars(env, path, 0);
|
||||||
|
|
||||||
|
int res = open(f_path, O_WRONLY | O_CREAT | O_TRUNC, 0666);
|
||||||
|
(*env)->ReleaseStringUTFChars(env, path, f_path);
|
||||||
|
|
||||||
|
if (res < 0) {
|
||||||
|
return -errno;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
@ -15,4 +15,5 @@
|
|||||||
*/
|
*/
|
||||||
#include <jni.h>
|
#include <jni.h>
|
||||||
|
|
||||||
int Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* env, jclass clazz, jint fd);
|
int Java_io_netty_channel_unix_FileDescriptor_close(JNIEnv* env, jclass clazz, jint fd);
|
||||||
|
int Java_io_netty_channel_unix_FileDescriptor_open(JNIEnv* env, jclass clazz, jstring path);
|
||||||
|
@ -28,26 +28,35 @@ import io.netty.channel.ChannelPipeline;
|
|||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.ConnectTimeoutException;
|
import io.netty.channel.ConnectTimeoutException;
|
||||||
import io.netty.channel.DefaultFileRegion;
|
import io.netty.channel.DefaultFileRegion;
|
||||||
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||||
import io.netty.channel.unix.FileDescriptor;
|
import io.netty.channel.unix.FileDescriptor;
|
||||||
import io.netty.util.internal.EmptyArrays;
|
import io.netty.util.internal.EmptyArrays;
|
||||||
|
import io.netty.util.internal.MpscLinkedQueueNode;
|
||||||
|
import io.netty.util.internal.OneTimeTask;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
|
|
||||||
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||||
|
|
||||||
private static final String EXPECTED_TYPES =
|
private static final String EXPECTED_TYPES =
|
||||||
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
||||||
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
|
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
|
||||||
private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
|
||||||
|
static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
||||||
@ -60,10 +69,15 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
private ChannelPromise connectPromise;
|
private ChannelPromise connectPromise;
|
||||||
private ScheduledFuture<?> connectTimeoutFuture;
|
private ScheduledFuture<?> connectTimeoutFuture;
|
||||||
private SocketAddress requestedRemoteAddress;
|
private SocketAddress requestedRemoteAddress;
|
||||||
|
private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue();
|
||||||
|
|
||||||
private volatile boolean inputShutdown;
|
private volatile boolean inputShutdown;
|
||||||
private volatile boolean outputShutdown;
|
private volatile boolean outputShutdown;
|
||||||
|
|
||||||
|
// Lazy init these if we need to splice(...)
|
||||||
|
private int pipeIn = -1;
|
||||||
|
private int pipeOut = -1;
|
||||||
|
|
||||||
protected AbstractEpollStreamChannel(Channel parent, int fd) {
|
protected AbstractEpollStreamChannel(Channel parent, int fd) {
|
||||||
super(parent, fd, Native.EPOLLIN, true);
|
super(parent, fd, Native.EPOLLIN, true);
|
||||||
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
|
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
|
||||||
@ -85,6 +99,129 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
return new EpollStreamUnsafe();
|
return new EpollStreamUnsafe();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
|
||||||
|
* The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
|
||||||
|
* splice until the {@link ChannelFuture} was canceled or it was failed.
|
||||||
|
*
|
||||||
|
* Please note:
|
||||||
|
* <ul>
|
||||||
|
* <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
|
||||||
|
* {@link IllegalArgumentException} is thrown. </li>
|
||||||
|
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
|
||||||
|
* target {@link AbstractEpollStreamChannel}</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
|
||||||
|
return spliceTo(ch, len, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
|
||||||
|
* The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
|
||||||
|
* splice until the {@link ChannelFuture} was canceled or it was failed.
|
||||||
|
*
|
||||||
|
* Please note:
|
||||||
|
* <ul>
|
||||||
|
* <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
|
||||||
|
* {@link IllegalArgumentException} is thrown. </li>
|
||||||
|
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
|
||||||
|
* target {@link AbstractEpollStreamChannel}</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
|
||||||
|
final ChannelPromise promise) {
|
||||||
|
if (ch.eventLoop().unwrap() != eventLoop().unwrap()) {
|
||||||
|
throw new IllegalArgumentException("EventLoops are not the same.");
|
||||||
|
}
|
||||||
|
if (len < 0) {
|
||||||
|
throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
|
||||||
|
}
|
||||||
|
if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
|
||||||
|
|| config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
|
||||||
|
throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
|
||||||
|
}
|
||||||
|
checkNotNull(promise, "promise");
|
||||||
|
if (!isOpen()) {
|
||||||
|
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
||||||
|
} else {
|
||||||
|
SpliceInTask task = new SpliceInChannelTask(ch, len, checkNotNull(promise, "promise"));
|
||||||
|
spliceQueue.add(task);
|
||||||
|
failSpliceIfClosed(promise);
|
||||||
|
}
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
|
||||||
|
* The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
|
||||||
|
* number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
|
||||||
|
* {@link ChannelFuture} was canceled or it was failed.
|
||||||
|
*
|
||||||
|
* Please note:
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
|
||||||
|
* {@link AbstractEpollStreamChannel}</li>
|
||||||
|
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
|
||||||
|
return spliceTo(ch, offset, len, newPromise());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
|
||||||
|
* The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
|
||||||
|
* number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
|
||||||
|
* {@link ChannelFuture} was canceled or it was failed.
|
||||||
|
*
|
||||||
|
* Please note:
|
||||||
|
* <ul>
|
||||||
|
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
|
||||||
|
* {@link AbstractEpollStreamChannel}</li>
|
||||||
|
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
|
||||||
|
final ChannelPromise promise) {
|
||||||
|
if (len < 0) {
|
||||||
|
throw new IllegalArgumentException("len: " + len + " (expected: >= 0)");
|
||||||
|
}
|
||||||
|
if (offset < 0) {
|
||||||
|
throw new IllegalArgumentException("offset must be >= 0 but was " + offset);
|
||||||
|
}
|
||||||
|
if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
|
||||||
|
throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
|
||||||
|
}
|
||||||
|
checkNotNull(promise, "promise");
|
||||||
|
if (!isOpen()) {
|
||||||
|
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
||||||
|
} else {
|
||||||
|
SpliceInTask task = new SpliceFdTask(ch, offset, len, checkNotNull(promise, "promise"));
|
||||||
|
spliceQueue.add(task);
|
||||||
|
failSpliceIfClosed(promise);
|
||||||
|
}
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void failSpliceIfClosed(ChannelPromise promise) {
|
||||||
|
if (!isOpen()) {
|
||||||
|
// Seems like the Channel was closed in the meantime try to fail the promise to prevent any
|
||||||
|
// cases where a future may not be notified otherwise.
|
||||||
|
if (promise.tryFailure(CLOSED_CHANNEL_EXCEPTION)) {
|
||||||
|
eventLoop().execute(new OneTimeTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// Call this via the EventLoop as it is a MPSC queue.
|
||||||
|
clearSpliceQueue();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
|
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
|
||||||
* @param buf the {@link ByteBuf} from which the bytes should be written
|
* @param buf the {@link ByteBuf} from which the bytes should be written
|
||||||
@ -289,6 +426,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
// the network stack can handle more writes.
|
// the network stack can handle more writes.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
} else if (msg instanceof SpliceOutTask) {
|
||||||
|
if (!((SpliceOutTask) msg).spliceOut()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
in.remove();
|
||||||
} else {
|
} else {
|
||||||
// Should never reach here.
|
// Should never reach here.
|
||||||
throw new Error();
|
throw new Error();
|
||||||
@ -354,7 +496,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg instanceof DefaultFileRegion) {
|
if (msg instanceof DefaultFileRegion || msg instanceof SpliceOutTask) {
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -380,6 +522,40 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() throws Exception {
|
||||||
|
try {
|
||||||
|
ChannelPromise promise = connectPromise;
|
||||||
|
if (promise != null) {
|
||||||
|
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
|
||||||
|
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
||||||
|
connectPromise = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledFuture<?> future = connectTimeoutFuture;
|
||||||
|
if (future != null) {
|
||||||
|
future.cancel(false);
|
||||||
|
connectTimeoutFuture = null;
|
||||||
|
}
|
||||||
|
// Calling super.doClose() first so splceTo(...) will fail on next call.
|
||||||
|
super.doClose();
|
||||||
|
} finally {
|
||||||
|
safeClosePipe(pipeIn);
|
||||||
|
safeClosePipe(pipeOut);
|
||||||
|
clearSpliceQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void clearSpliceQueue() {
|
||||||
|
for (;;) {
|
||||||
|
SpliceInTask task = spliceQueue.poll();
|
||||||
|
if (task == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
task.promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to the remote peer
|
* Connect to the remote peer
|
||||||
*/
|
*/
|
||||||
@ -403,21 +579,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void safeClosePipe(int pipe) {
|
||||||
protected void doClose() throws Exception {
|
if (pipe != -1) {
|
||||||
ChannelPromise promise = connectPromise;
|
try {
|
||||||
if (promise != null) {
|
Native.close(pipe);
|
||||||
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
|
} catch (IOException e) {
|
||||||
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
if (logger.isWarnEnabled()) {
|
||||||
connectPromise = null;
|
logger.warn("Error while closing a pipe", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ScheduledFuture<?> future = connectTimeoutFuture;
|
|
||||||
if (future != null) {
|
|
||||||
future.cancel(false);
|
|
||||||
connectTimeoutFuture = null;
|
|
||||||
}
|
|
||||||
super.doClose();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class EpollStreamUnsafe extends AbstractEpollUnsafe {
|
class EpollStreamUnsafe extends AbstractEpollUnsafe {
|
||||||
@ -628,6 +799,20 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
int messages = 0;
|
int messages = 0;
|
||||||
int totalReadAmount = 0;
|
int totalReadAmount = 0;
|
||||||
do {
|
do {
|
||||||
|
SpliceInTask spliceTask = spliceQueue.peek();
|
||||||
|
if (spliceTask != null) {
|
||||||
|
if (spliceTask.spliceIn(allocHandle)) {
|
||||||
|
// We need to check if it is still active as if not we removed all SpliceTasks in
|
||||||
|
// doClose(...)
|
||||||
|
if (isActive()) {
|
||||||
|
spliceQueue.remove();
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// we use a direct buffer here as the native implementations only be able
|
// we use a direct buffer here as the native implementations only be able
|
||||||
// to handle direct buffers.
|
// to handle direct buffers.
|
||||||
byteBuf = allocHandle.allocate(allocator);
|
byteBuf = allocHandle.allocate(allocator);
|
||||||
@ -697,4 +882,206 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract class SpliceInTask extends MpscLinkedQueueNode<SpliceInTask> {
|
||||||
|
final ChannelPromise promise;
|
||||||
|
int len;
|
||||||
|
|
||||||
|
protected SpliceInTask(int len, ChannelPromise promise) {
|
||||||
|
this.promise = promise;
|
||||||
|
this.len = len;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SpliceInTask value() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract boolean spliceIn(RecvByteBufAllocator.Handle handle) throws IOException;
|
||||||
|
|
||||||
|
protected final int spliceIn(int pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
|
||||||
|
// calculate the maximum amount of data we are allowed to splice
|
||||||
|
int length = Math.min(handle.guess(), len);
|
||||||
|
int splicedIn = 0;
|
||||||
|
for (;;) {
|
||||||
|
// Splicing until there is nothing left to splice.
|
||||||
|
int localSplicedIn = Native.splice(fd().intValue(), -1, pipeOut, -1, length);
|
||||||
|
if (localSplicedIn == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
splicedIn += localSplicedIn;
|
||||||
|
length -= localSplicedIn;
|
||||||
|
}
|
||||||
|
|
||||||
|
// record the number of bytes we spliced before
|
||||||
|
handle.record(splicedIn);
|
||||||
|
return splicedIn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let it directly implement channelFutureListener as well to reduce object creation.
|
||||||
|
private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
|
||||||
|
private final AbstractEpollStreamChannel ch;
|
||||||
|
|
||||||
|
SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
|
||||||
|
super(len, promise);
|
||||||
|
this.ch = ch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
promise.setFailure(future.cause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean spliceIn(RecvByteBufAllocator.Handle handle) throws IOException {
|
||||||
|
assert ch.eventLoop().inEventLoop();
|
||||||
|
if (len == 0) {
|
||||||
|
promise.setSuccess();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// We create the pipe on the target channel as this will allow us to just handle pending writes
|
||||||
|
// later in a correct fashion without get into any ordering issues when spliceTo(...) is called
|
||||||
|
// on multiple Channels pointing to one target Channel.
|
||||||
|
int pipeOut = ch.pipeOut;
|
||||||
|
if (pipeOut == -1) {
|
||||||
|
// Create a new pipe as non was created before.
|
||||||
|
long fds = Native.pipe();
|
||||||
|
ch.pipeIn = (int) (fds >> 32);
|
||||||
|
pipeOut = ch.pipeOut = (int) fds;
|
||||||
|
}
|
||||||
|
|
||||||
|
int splicedIn = spliceIn(pipeOut, handle);
|
||||||
|
if (splicedIn > 0) {
|
||||||
|
// Integer.MAX_VALUE is a special value which will result in splice forever.
|
||||||
|
if (len != Integer.MAX_VALUE) {
|
||||||
|
len -= splicedIn;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Depending on if we are done with splicing inbound data we set the right promise for the
|
||||||
|
// outbound splicing.
|
||||||
|
final ChannelPromise splicePromise;
|
||||||
|
if (len == 0) {
|
||||||
|
splicePromise = promise;
|
||||||
|
} else {
|
||||||
|
splicePromise = ch.newPromise().addListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean autoRead = config().isAutoRead();
|
||||||
|
|
||||||
|
// Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
|
||||||
|
// case.
|
||||||
|
ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
|
||||||
|
ch.unsafe().flush();
|
||||||
|
if (autoRead && !splicePromise.isDone()) {
|
||||||
|
// Write was not done which means the target channel was not writable. In this case we need to
|
||||||
|
// disable reading until we are done with splicing to the target channel because:
|
||||||
|
//
|
||||||
|
// - The user may want to to trigger another splice operation once the splicing was complete.
|
||||||
|
config().setAutoRead(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return len == 0;
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
promise.setFailure(cause);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class SpliceOutTask {
|
||||||
|
private final AbstractEpollStreamChannel ch;
|
||||||
|
private final boolean autoRead;
|
||||||
|
private int len;
|
||||||
|
|
||||||
|
SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
|
||||||
|
this.ch = ch;
|
||||||
|
this.len = len;
|
||||||
|
this.autoRead = autoRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean spliceOut() throws Exception {
|
||||||
|
assert ch.eventLoop().inEventLoop();
|
||||||
|
try {
|
||||||
|
int splicedOut = Native.splice(ch.pipeIn, -1, ch.fd().intValue(), -1, len);
|
||||||
|
len -= splicedOut;
|
||||||
|
if (len == 0) {
|
||||||
|
if (autoRead) {
|
||||||
|
// AutoRead was used and we spliced everything so start reading again
|
||||||
|
config().setAutoRead(true);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (autoRead) {
|
||||||
|
// AutoRead was used and we spliced everything so start reading again
|
||||||
|
config().setAutoRead(true);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class SpliceFdTask extends SpliceInTask {
|
||||||
|
private final FileDescriptor fd;
|
||||||
|
private final ChannelPromise promise;
|
||||||
|
private int offset;
|
||||||
|
|
||||||
|
SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
|
||||||
|
super(len, promise);
|
||||||
|
this.fd = fd;
|
||||||
|
this.promise = promise;
|
||||||
|
this.offset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SpliceFdTask value() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean spliceIn(RecvByteBufAllocator.Handle handle) throws IOException {
|
||||||
|
assert eventLoop().inEventLoop();
|
||||||
|
if (len == 0) {
|
||||||
|
promise.setSuccess();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int pipeIn = -1;
|
||||||
|
int pipeOut = -1;
|
||||||
|
try {
|
||||||
|
long fds = Native.pipe();
|
||||||
|
pipeIn = (int) (fds >> 32);
|
||||||
|
pipeOut = (int) fds;
|
||||||
|
|
||||||
|
int splicedIn = spliceIn(pipeOut, handle);
|
||||||
|
if (splicedIn > 0) {
|
||||||
|
// Integer.MAX_VALUE is a special value which will result in splice forever.
|
||||||
|
if (len != Integer.MAX_VALUE) {
|
||||||
|
len -= splicedIn;
|
||||||
|
}
|
||||||
|
do {
|
||||||
|
int splicedOut = Native.splice(pipeIn, -1, fd.intValue(), offset, splicedIn);
|
||||||
|
splicedIn -= splicedOut;
|
||||||
|
} while (splicedIn > 0);
|
||||||
|
if (len == 0) {
|
||||||
|
promise.setSuccess();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
promise.setFailure(cause);
|
||||||
|
return true;
|
||||||
|
} finally {
|
||||||
|
safeClosePipe(pipeIn);
|
||||||
|
safeClosePipe(pipeOut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ import java.util.Locale;
|
|||||||
*
|
*
|
||||||
* <strong>Internal usage only!</strong>
|
* <strong>Internal usage only!</strong>
|
||||||
*/
|
*/
|
||||||
final class Native {
|
public final class Native {
|
||||||
|
|
||||||
static {
|
static {
|
||||||
String name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim();
|
String name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim();
|
||||||
@ -89,6 +89,7 @@ final class Native {
|
|||||||
private static final IOException CONNECTION_RESET_EXCEPTION_SENDTO;
|
private static final IOException CONNECTION_RESET_EXCEPTION_SENDTO;
|
||||||
private static final IOException CONNECTION_RESET_EXCEPTION_SENDMSG;
|
private static final IOException CONNECTION_RESET_EXCEPTION_SENDMSG;
|
||||||
private static final IOException CONNECTION_RESET_EXCEPTION_SENDMMSG;
|
private static final IOException CONNECTION_RESET_EXCEPTION_SENDMMSG;
|
||||||
|
private static final IOException CONNECTION_RESET_EXCEPTION_SPLICE;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
for (int i = 0; i < ERRORS.length; i++) {
|
for (int i = 0; i < ERRORS.length; i++) {
|
||||||
@ -110,6 +111,8 @@ final class Native {
|
|||||||
ERRNO_EPIPE_NEGATIVE);
|
ERRNO_EPIPE_NEGATIVE);
|
||||||
CONNECTION_RESET_EXCEPTION_SENDMMSG = newConnectionResetException("syscall:sendmmsg(...)",
|
CONNECTION_RESET_EXCEPTION_SENDMMSG = newConnectionResetException("syscall:sendmmsg(...)",
|
||||||
ERRNO_EPIPE_NEGATIVE);
|
ERRNO_EPIPE_NEGATIVE);
|
||||||
|
CONNECTION_RESET_EXCEPTION_SPLICE = newConnectionResetException("syscall:splice(...)",
|
||||||
|
ERRNO_EPIPE_NEGATIVE);
|
||||||
CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
|
CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
|
||||||
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
||||||
}
|
}
|
||||||
@ -120,7 +123,7 @@ final class Native {
|
|||||||
return exception;
|
return exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IOException newIOException(String method, int err) {
|
public static IOException newIOException(String method, int err) {
|
||||||
return new IOException(method + "() failed: " + ERRORS[-err]);
|
return new IOException(method + "() failed: " + ERRORS[-err]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,6 +180,26 @@ final class Native {
|
|||||||
|
|
||||||
private static native int close0(int fd);
|
private static native int close0(int fd);
|
||||||
|
|
||||||
|
public static int splice(int fd, int offIn, int fdOut, int offOut, int len) throws IOException {
|
||||||
|
int res = splice0(fd, offIn, fdOut, offOut, len);
|
||||||
|
if (res >= 0) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
return ioResult("splice", res, CONNECTION_RESET_EXCEPTION_SPLICE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static native int splice0(int fd, int offIn, int fdOut, int offOut, int len);
|
||||||
|
|
||||||
|
public static long pipe() throws IOException {
|
||||||
|
long res = pipe0();
|
||||||
|
if (res >= 0) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
throw newIOException("pipe", (int) res);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static native long pipe0();
|
||||||
|
|
||||||
public static int write(int fd, ByteBuffer buf, int pos, int limit) throws IOException {
|
public static int write(int fd, ByteBuffer buf, int pos, int limit) throws IOException {
|
||||||
int res = write0(fd, buf, pos, limit);
|
int res = write0(fd, buf, pos, limit);
|
||||||
if (res >= 0) {
|
if (res >= 0) {
|
||||||
|
@ -15,8 +15,13 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.channel.unix;
|
package io.netty.channel.unix;
|
||||||
|
|
||||||
|
import io.netty.channel.epoll.Native;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a
|
* Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a
|
||||||
* {@link FileDescriptor} for it.
|
* {@link FileDescriptor} for it.
|
||||||
@ -80,4 +85,25 @@ public class FileDescriptor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static native int close(int fd);
|
private static native int close(int fd);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open a new {@link FileDescriptor} for the given path.
|
||||||
|
*/
|
||||||
|
public static FileDescriptor from(String path) throws IOException {
|
||||||
|
checkNotNull(path, "path");
|
||||||
|
int res = open(path);
|
||||||
|
if (res < 0) {
|
||||||
|
throw Native.newIOException("open", res);
|
||||||
|
}
|
||||||
|
return new FileDescriptor(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open a new {@link FileDescriptor} for the given {@link File}.
|
||||||
|
*/
|
||||||
|
public static FileDescriptor from(File file) throws IOException {
|
||||||
|
return from(checkNotNull(file, "file").getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static native int open(String path);
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,321 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2015 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.channel.epoll;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
import io.netty.channel.unix.FileDescriptor;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
public class EpollSpliceTest {
|
||||||
|
|
||||||
|
private static final int SPLICE_LEN = 32 * 1024;
|
||||||
|
private static final Random random = new Random();
|
||||||
|
private static final byte[] data = new byte[1048576];
|
||||||
|
|
||||||
|
static {
|
||||||
|
random.nextBytes(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void spliceToSocket() throws Throwable {
|
||||||
|
final EchoHandler sh = new EchoHandler();
|
||||||
|
final EchoHandler ch = new EchoHandler();
|
||||||
|
|
||||||
|
EventLoopGroup group = new EpollEventLoopGroup(1);
|
||||||
|
ServerBootstrap bs = new ServerBootstrap();
|
||||||
|
bs.channel(EpollServerSocketChannel.class);
|
||||||
|
bs.group(group).childHandler(sh);
|
||||||
|
final Channel sc = bs.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
ServerBootstrap bs2 = new ServerBootstrap();
|
||||||
|
bs2.channel(EpollServerSocketChannel.class);
|
||||||
|
bs2.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
|
||||||
|
bs2.group(group).childHandler(new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.channel().config().setAutoRead(false);
|
||||||
|
Bootstrap bs = new Bootstrap();
|
||||||
|
bs.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
|
||||||
|
|
||||||
|
bs.channel(EpollSocketChannel.class);
|
||||||
|
bs.group(ctx.channel().eventLoop()).handler(new ChannelInboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext context) throws Exception {
|
||||||
|
final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel();
|
||||||
|
final EpollSocketChannel ch2 = (EpollSocketChannel) context.channel();
|
||||||
|
// We are splicing two channels together, at this point we have a tcp proxy which handles all
|
||||||
|
// the data transfer only in kernel space!
|
||||||
|
|
||||||
|
// Integer.MAX_VALUE will splice infinitly.
|
||||||
|
ch.spliceTo(ch2, Integer.MAX_VALUE).addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
future.channel().close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Trigger multiple splices to see if partial splicing works as well.
|
||||||
|
ch2.spliceTo(ch, SPLICE_LEN).addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
future.channel().close();
|
||||||
|
} else {
|
||||||
|
ch2.spliceTo(ch, SPLICE_LEN).addListener(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ctx.channel().config().setAutoRead(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext context) throws Exception {
|
||||||
|
context.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
bs.connect(sc.localAddress()).addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
if (!future.isSuccess()) {
|
||||||
|
ctx.close();
|
||||||
|
} else {
|
||||||
|
future.channel().closeFuture().addListener(new ChannelFutureListener() {
|
||||||
|
@Override
|
||||||
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Channel pc = bs2.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
Bootstrap cb = new Bootstrap();
|
||||||
|
cb.group(group);
|
||||||
|
cb.channel(EpollSocketChannel.class);
|
||||||
|
cb.handler(ch);
|
||||||
|
Channel cc = cb.connect(pc.localAddress()).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
for (int i = 0; i < data.length;) {
|
||||||
|
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
|
||||||
|
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
|
||||||
|
cc.writeAndFlush(buf);
|
||||||
|
i += length;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (ch.counter < data.length) {
|
||||||
|
if (sh.exception.get() != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (ch.exception.get() != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Ignore.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while (sh.counter < data.length) {
|
||||||
|
if (sh.exception.get() != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (ch.exception.get() != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Ignore.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sh.channel.close().sync();
|
||||||
|
ch.channel.close().sync();
|
||||||
|
sc.close().sync();
|
||||||
|
pc.close().sync();
|
||||||
|
group.shutdownGracefully();
|
||||||
|
|
||||||
|
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
|
||||||
|
throw sh.exception.get();
|
||||||
|
}
|
||||||
|
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
|
||||||
|
throw ch.exception.get();
|
||||||
|
}
|
||||||
|
if (sh.exception.get() != null) {
|
||||||
|
throw sh.exception.get();
|
||||||
|
}
|
||||||
|
if (ch.exception.get() != null) {
|
||||||
|
throw ch.exception.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void spliceToFile() throws Throwable {
|
||||||
|
EventLoopGroup group = new EpollEventLoopGroup(1);
|
||||||
|
File file = File.createTempFile("netty-splice", null);
|
||||||
|
file.deleteOnExit();
|
||||||
|
|
||||||
|
SpliceHandler sh = new SpliceHandler(file);
|
||||||
|
ServerBootstrap bs = new ServerBootstrap();
|
||||||
|
bs.channel(EpollServerSocketChannel.class);
|
||||||
|
bs.group(group).childHandler(sh);
|
||||||
|
bs.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
|
||||||
|
Channel sc = bs.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
Bootstrap cb = new Bootstrap();
|
||||||
|
cb.group(group);
|
||||||
|
cb.channel(EpollSocketChannel.class);
|
||||||
|
cb.handler(new ChannelInboundHandlerAdapter());
|
||||||
|
Channel cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
for (int i = 0; i < data.length;) {
|
||||||
|
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
|
||||||
|
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
|
||||||
|
cc.writeAndFlush(buf);
|
||||||
|
i += length;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (sh.future == null || !sh.future.isDone()) {
|
||||||
|
if (sh.exception.get() != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Ignore.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sc.close().sync();
|
||||||
|
cc.close().sync();
|
||||||
|
|
||||||
|
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
|
||||||
|
throw sh.exception.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] written = new byte[data.length];
|
||||||
|
FileInputStream in = new FileInputStream(file);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Assert.assertEquals(written.length, in.read(written));
|
||||||
|
Assert.assertArrayEquals(data, written);
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
|
group.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||||
|
volatile Channel channel;
|
||||||
|
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||||
|
volatile int counter;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx)
|
||||||
|
throws Exception {
|
||||||
|
channel = ctx.channel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||||
|
byte[] actual = new byte[in.readableBytes()];
|
||||||
|
in.readBytes(actual);
|
||||||
|
|
||||||
|
int lastIdx = counter;
|
||||||
|
for (int i = 0; i < actual.length; i ++) {
|
||||||
|
assertEquals(data[i + lastIdx], actual[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (channel.parent() != null) {
|
||||||
|
channel.write(Unpooled.wrappedBuffer(actual));
|
||||||
|
}
|
||||||
|
|
||||||
|
counter += actual.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
ctx.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx,
|
||||||
|
Throwable cause) throws Exception {
|
||||||
|
if (exception.compareAndSet(null, cause)) {
|
||||||
|
cause.printStackTrace();
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class SpliceHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
private final File file;
|
||||||
|
|
||||||
|
volatile Channel channel;
|
||||||
|
volatile ChannelFuture future;
|
||||||
|
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||||
|
|
||||||
|
public SpliceHandler(File file) {
|
||||||
|
this.file = file;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx)
|
||||||
|
throws Exception {
|
||||||
|
channel = ctx.channel();
|
||||||
|
final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel();
|
||||||
|
final FileDescriptor fd = FileDescriptor.from(file);
|
||||||
|
|
||||||
|
future = ch.spliceTo(fd, 0, data.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx,
|
||||||
|
Throwable cause) throws Exception {
|
||||||
|
if (exception.compareAndSet(null, cause)) {
|
||||||
|
cause.printStackTrace();
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user