Remove spliceTo(...) support from native epoll transport (#9825)

Motivation:

At some point we added spliceTo(...) support which was never really used and so we should better take the chance and remove it again now as part of the next major release

Modifications:

Remove spliceTo(...) related code

Result:

Less code to maintain
This commit is contained in:
Norman Maurer 2019-12-01 18:07:37 +01:00 committed by GitHub
parent 29c471ec52
commit 8a9f6415d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 2 additions and 735 deletions

View File

@ -454,25 +454,6 @@ static jint netty_epoll_native_offsetofEpollData(JNIEnv* env, jclass clazz) {
return offsetof(struct epoll_event, data); return offsetof(struct epoll_event, data);
} }
static jint netty_epoll_native_splice0(JNIEnv* env, jclass clazz, jint fd, jlong offIn, jint fdOut, jlong offOut, jlong len) {
ssize_t res;
int err;
loff_t off_in = (loff_t) offIn;
loff_t off_out = (loff_t) offOut;
loff_t* p_off_in = off_in >= 0 ? &off_in : NULL;
loff_t* p_off_out = off_out >= 0 ? &off_out : NULL;
do {
res = splice(fd, p_off_in, fdOut, p_off_out, (size_t) len, SPLICE_F_NONBLOCK | SPLICE_F_MOVE);
// keep on splicing if it was interrupted
} while (res == -1 && ((err = errno) == EINTR));
if (res < 0) {
return -err;
}
return (jint) res;
}
static jint netty_epoll_native_tcpMd5SigMaxKeyLen(JNIEnv* env, jclass clazz) { static jint netty_epoll_native_tcpMd5SigMaxKeyLen(JNIEnv* env, jclass clazz) {
struct tcp_md5sig md5sig; struct tcp_md5sig md5sig;
@ -515,8 +496,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 }, { "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 },
// "sendmmsg0" has a dynamic signature // "sendmmsg0" has a dynamic signature
{ "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent }, { "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent },
{ "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData }, { "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData }
{ "splice0", "(IJIJJ)I", (void *) netty_epoll_native_splice0 }
}; };
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);

View File

@ -31,11 +31,9 @@ import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.internal.ChannelUtils; import io.netty.channel.internal.ChannelUtils;
import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray; import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.SocketWritableByteChannel; import io.netty.channel.unix.SocketWritableByteChannel;
import io.netty.channel.unix.UnixChannelUtil; import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -44,16 +42,11 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD; import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
import static io.netty.channel.unix.FileDescriptor.pipe;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import static java.util.Objects.requireNonNull;
public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel { public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
@ -67,11 +60,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
((AbstractEpollUnsafe) unsafe()).flush0(); ((AbstractEpollUnsafe) unsafe()).flush0();
}; };
// Lazy init these if we need to splice(...)
private volatile Queue<SpliceInTask> spliceQueue;
private FileDescriptor pipeIn;
private FileDescriptor pipeOut;
private WritableByteChannel byteChannel; private WritableByteChannel byteChannel;
protected AbstractEpollStreamChannel(Channel parent, EventLoop eventLoop, int fd) { protected AbstractEpollStreamChannel(Channel parent, EventLoop eventLoop, int fd) {
@ -114,118 +102,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
return METADATA; return METADATA;
} }
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
* The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
* splice until the {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
* {@link IllegalArgumentException} is thrown. </li>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
* target {@link AbstractEpollStreamChannel}</li>
* </ul>
*
*/
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
return spliceTo(ch, len, newPromise());
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
* The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
* splice until the {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
* {@link IllegalArgumentException} is thrown. </li>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
* target {@link AbstractEpollStreamChannel}</li>
* </ul>
*
*/
public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
final ChannelPromise promise) {
if (ch.eventLoop() != eventLoop()) {
throw new IllegalArgumentException("EventLoops are not the same.");
}
checkPositiveOrZero(len, "len");
if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
|| config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
}
requireNonNull(promise, "promise");
if (!isOpen()) {
promise.tryFailure(new ClosedChannelException());
} else {
addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
failSpliceIfClosed(promise);
}
return promise;
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
* The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
* number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
* {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
* {@link AbstractEpollStreamChannel}</li>
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
* <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
* </ul>
*/
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
return spliceTo(ch, offset, len, newPromise());
}
/**
* Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
* The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
* number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
* {@link ChannelFuture} was canceled or it was failed.
*
* Please note:
* <ul>
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
* {@link AbstractEpollStreamChannel}</li>
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
* <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
* </ul>
*/
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
final ChannelPromise promise) {
checkPositiveOrZero(len, "len");
checkPositiveOrZero(offset, "offset");
if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
}
requireNonNull(promise, "promise");
if (!isOpen()) {
promise.tryFailure(new ClosedChannelException());
} else {
addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
failSpliceIfClosed(promise);
}
return promise;
}
private void failSpliceIfClosed(ChannelPromise promise) {
if (!isOpen()) {
// Seems like the Channel was closed in the meantime try to fail the promise to prevent any
// cases where a future may not be notified otherwise.
if (promise.tryFailure(new ClosedChannelException())) {
// Call this via the EventLoop as it is a MPSC queue.
eventLoop().execute(this::clearSpliceQueue);
}
}
}
/** /**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param in the collection which contains objects to write. * @param in the collection which contains objects to write.
@ -464,12 +340,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
return writeDefaultFileRegion(in, (DefaultFileRegion) msg); return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
} else if (msg instanceof FileRegion) { } else if (msg instanceof FileRegion) {
return writeFileRegion(in, (FileRegion) msg); return writeFileRegion(in, (FileRegion) msg);
} else if (msg instanceof SpliceOutTask) {
if (!((SpliceOutTask) msg).spliceOut()) {
return WRITE_STATUS_SNDBUF_FULL;
}
in.remove();
return 1;
} else { } else {
// Should never reach here. // Should never reach here.
throw new Error(); throw new Error();
@ -512,7 +382,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf; return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
} }
if (msg instanceof FileRegion || msg instanceof SpliceOutTask) { if (msg instanceof FileRegion) {
return msg; return msg;
} }
@ -633,47 +503,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
} }
} }
@Override
protected void doClose() throws Exception {
try {
// Calling super.doClose() first so spliceTo(...) will fail on next call.
super.doClose();
} finally {
safeClosePipe(pipeIn);
safeClosePipe(pipeOut);
clearSpliceQueue();
}
}
private void clearSpliceQueue() {
Queue<SpliceInTask> sQueue = spliceQueue;
if (sQueue == null) {
return;
}
ClosedChannelException exception = null;
for (;;) {
SpliceInTask task = sQueue.poll();
if (task == null) {
break;
}
if (exception == null) {
exception = new ClosedChannelException();
}
task.promise.tryFailure(exception);
}
}
private static void safeClosePipe(FileDescriptor fd) {
if (fd != null) {
try {
fd.close();
} catch (IOException e) {
logger.warn("Error while closing a pipe", e);
}
}
}
class EpollStreamUnsafe extends AbstractEpollUnsafe { class EpollStreamUnsafe extends AbstractEpollUnsafe {
// Overridden here just to be able to access this method from AbstractEpollStreamChannel // Overridden here just to be able to access this method from AbstractEpollStreamChannel
@Override @Override
@ -724,24 +553,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
try { try {
Queue<SpliceInTask> sQueue = null;
do { do {
if (sQueue != null || (sQueue = spliceQueue) != null) {
SpliceInTask spliceTask = sQueue.peek();
if (spliceTask != null) {
if (spliceTask.spliceIn(allocHandle)) {
// We need to check if it is still active as if not we removed all SpliceTasks in
// doClose(...)
if (isActive()) {
sQueue.remove();
}
continue;
} else {
break;
}
}
}
// we use a direct buffer here as the native implementations only be able // we use a direct buffer here as the native implementations only be able
// to handle direct buffers. // to handle direct buffers.
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
@ -794,209 +606,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
} }
} }
private void addToSpliceQueue(final SpliceInTask task) {
Queue<SpliceInTask> sQueue = spliceQueue;
if (sQueue == null) {
synchronized (this) {
sQueue = spliceQueue;
if (sQueue == null) {
spliceQueue = sQueue = PlatformDependent.newMpscQueue();
}
}
}
sQueue.add(task);
}
protected abstract class SpliceInTask {
final ChannelPromise promise;
int len;
protected SpliceInTask(int len, ChannelPromise promise) {
this.promise = promise;
this.len = len;
}
abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
// calculate the maximum amount of data we are allowed to splice
int length = Math.min(handle.guess(), len);
int splicedIn = 0;
for (;;) {
// Splicing until there is nothing left to splice.
int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
if (localSplicedIn == 0) {
break;
}
splicedIn += localSplicedIn;
length -= localSplicedIn;
}
return splicedIn;
}
}
// Let it directly implement channelFutureListener as well to reduce object creation.
private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
private final AbstractEpollStreamChannel ch;
SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
super(len, promise);
this.ch = ch;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
}
}
@Override
public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
assert ch.eventLoop().inEventLoop();
if (len == 0) {
promise.setSuccess();
return true;
}
try {
// We create the pipe on the target channel as this will allow us to just handle pending writes
// later in a correct fashion without get into any ordering issues when spliceTo(...) is called
// on multiple Channels pointing to one target Channel.
FileDescriptor pipeOut = ch.pipeOut;
if (pipeOut == null) {
// Create a new pipe as non was created before.
FileDescriptor[] pipe = pipe();
ch.pipeIn = pipe[0];
pipeOut = ch.pipeOut = pipe[1];
}
int splicedIn = spliceIn(pipeOut, handle);
if (splicedIn > 0) {
// Integer.MAX_VALUE is a special value which will result in splice forever.
if (len != Integer.MAX_VALUE) {
len -= splicedIn;
}
// Depending on if we are done with splicing inbound data we set the right promise for the
// outbound splicing.
final ChannelPromise splicePromise;
if (len == 0) {
splicePromise = promise;
} else {
splicePromise = ch.newPromise().addListener(this);
}
boolean autoRead = config().isAutoRead();
// Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
// case.
ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
ch.unsafe().flush();
if (autoRead && !splicePromise.isDone()) {
// Write was not done which means the target channel was not writable. In this case we need to
// disable reading until we are done with splicing to the target channel because:
//
// - The user may want to to trigger another splice operation once the splicing was complete.
config().setAutoRead(false);
}
}
return len == 0;
} catch (Throwable cause) {
promise.setFailure(cause);
return true;
}
}
}
private final class SpliceOutTask {
private final AbstractEpollStreamChannel ch;
private final boolean autoRead;
private int len;
SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
this.ch = ch;
this.len = len;
this.autoRead = autoRead;
}
public boolean spliceOut() throws Exception {
assert ch.eventLoop().inEventLoop();
try {
int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
len -= splicedOut;
if (len == 0) {
if (autoRead) {
// AutoRead was used and we spliced everything so start reading again
config().setAutoRead(true);
}
return true;
}
return false;
} catch (IOException e) {
if (autoRead) {
// AutoRead was used and we spliced everything so start reading again
config().setAutoRead(true);
}
throw e;
}
}
}
private final class SpliceFdTask extends SpliceInTask {
private final FileDescriptor fd;
private final ChannelPromise promise;
private int offset;
SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
super(len, promise);
this.fd = fd;
this.promise = promise;
this.offset = offset;
}
@Override
public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
assert eventLoop().inEventLoop();
if (len == 0) {
promise.setSuccess();
return true;
}
try {
FileDescriptor[] pipe = pipe();
FileDescriptor pipeIn = pipe[0];
FileDescriptor pipeOut = pipe[1];
try {
int splicedIn = spliceIn(pipeOut, handle);
if (splicedIn > 0) {
// Integer.MAX_VALUE is a special value which will result in splice forever.
if (len != Integer.MAX_VALUE) {
len -= splicedIn;
}
do {
int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
offset += splicedOut;
splicedIn -= splicedOut;
} while (splicedIn > 0);
if (len == 0) {
promise.setSuccess();
return true;
}
}
return false;
} finally {
safeClosePipe(pipeIn);
safeClosePipe(pipeOut);
}
} catch (Throwable cause) {
promise.setFailure(cause);
return true;
}
}
}
private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel { private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
EpollSocketWritableByteChannel() { EpollSocketWritableByteChannel() {
super(socket); super(socket);

View File

@ -164,17 +164,6 @@ public final class Native {
} }
private static native int epollCtlDel0(int efd, int fd); private static native int epollCtlDel0(int efd, int fd);
// File-descriptor operations
public static int splice(int fd, long offIn, int fdOut, long offOut, long len) throws IOException {
int res = splice0(fd, offIn, fdOut, offOut, len);
if (res >= 0) {
return res;
}
return ioResult("splice", res);
}
private static native int splice0(int fd, long offIn, int fdOut, long offOut, long len);
@Deprecated @Deprecated
public static int sendmmsg(int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, public static int sendmmsg(int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs,
int offset, int len) throws IOException { int offset, int len) throws IOException {

View File

@ -1,311 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.unix.FileDescriptor;
import io.netty.util.NetUtil;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
public class EpollSpliceTest {
private static final int SPLICE_LEN = 32 * 1024;
private static final Random random = new Random();
private static final byte[] data = new byte[1048576];
static {
random.nextBytes(data);
}
@Test
public void spliceToSocket() throws Throwable {
final EchoHandler sh = new EchoHandler();
final EchoHandler ch = new EchoHandler();
EventLoopGroup group = new MultithreadEventLoopGroup(1, EpollHandler.newFactory());
ServerBootstrap bs = new ServerBootstrap();
bs.channel(EpollServerSocketChannel.class);
bs.group(group).childHandler(sh);
final Channel sc = bs.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel();
ServerBootstrap bs2 = new ServerBootstrap();
bs2.channel(EpollServerSocketChannel.class);
bs2.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bs2.group(group).childHandler(new ChannelHandler() {
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setAutoRead(false);
Bootstrap bs = new Bootstrap();
bs.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bs.channel(EpollSocketChannel.class);
bs.group(ctx.channel().eventLoop()).handler(new ChannelHandler() {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel();
final EpollSocketChannel ch2 = (EpollSocketChannel) context.channel();
// We are splicing two channels together, at this point we have a tcp proxy which handles all
// the data transfer only in kernel space!
// Integer.MAX_VALUE will splice infinitly.
ch.spliceTo(ch2, Integer.MAX_VALUE).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
future.channel().close();
}
});
// Trigger multiple splices to see if partial splicing works as well.
ch2.spliceTo(ch, SPLICE_LEN).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.channel().close();
} else {
ch2.spliceTo(ch, SPLICE_LEN).addListener(this);
}
}
});
ctx.channel().config().setAutoRead(true);
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
context.close();
}
});
bs.connect(sc.localAddress()).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
ctx.close();
} else {
future.channel().closeFuture().addListener((ChannelFutureListener) future1 -> ctx.close());
}
});
}
});
Channel pc = bs2.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(group);
cb.channel(EpollSocketChannel.class);
cb.handler(ch);
Channel cc = cb.connect(pc.localAddress()).syncUninterruptibly().channel();
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
cc.writeAndFlush(buf);
i += length;
}
while (ch.counter < data.length) {
if (sh.exception.get() != null) {
break;
}
if (ch.exception.get() != null) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
while (sh.counter < data.length) {
if (sh.exception.get() != null) {
break;
}
if (ch.exception.get() != null) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
sh.channel.close().sync();
ch.channel.close().sync();
sc.close().sync();
pc.close().sync();
group.shutdownGracefully();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
@Test(timeout = 10000)
public void spliceToFile() throws Throwable {
EventLoopGroup group = new MultithreadEventLoopGroup(1, EpollHandler.newFactory());
File file = File.createTempFile("netty-splice", null);
file.deleteOnExit();
SpliceHandler sh = new SpliceHandler(file);
ServerBootstrap bs = new ServerBootstrap();
bs.channel(EpollServerSocketChannel.class);
bs.group(group).childHandler(sh);
bs.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
Channel sc = bs.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(group);
cb.channel(EpollSocketChannel.class);
cb.handler(new ChannelHandler() { });
Channel cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel();
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
cc.writeAndFlush(buf);
i += length;
}
while (sh.future2 == null || !sh.future2.isDone() || !sh.future.isDone()) {
if (sh.exception.get() != null) {
break;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore.
}
}
sc.close().sync();
cc.close().sync();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
byte[] written = new byte[data.length];
FileInputStream in = new FileInputStream(file);
try {
Assert.assertEquals(written.length, in.read(written));
Assert.assertArrayEquals(data, written);
} finally {
in.close();
group.shutdownGracefully();
}
}
private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<>();
volatile int counter;
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
channel = ctx.channel();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual);
int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
assertEquals(data[i + lastIdx], actual[i]);
}
if (channel.parent() != null) {
channel.write(Unpooled.wrappedBuffer(actual));
}
counter += actual.length;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
ctx.close();
}
}
}
private static class SpliceHandler implements ChannelHandler {
private final File file;
volatile ChannelFuture future;
volatile ChannelFuture future2;
final AtomicReference<Throwable> exception = new AtomicReference<>();
SpliceHandler(File file) {
this.file = file;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel();
final FileDescriptor fd = FileDescriptor.from(file);
// splice two halves separately to test starting offset
future = ch.spliceTo(fd, 0, data.length / 2);
future2 = ch.spliceTo(fd, data.length / 2, data.length / 2);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
ctx.close();
}
}
}
}