From 8a9f6415d15af8706ec8a547298217d76467aa5e Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 1 Dec 2019 18:07:37 +0100 Subject: [PATCH] Remove spliceTo(...) support from native epoll transport (#9825) Motivation: At some point we added spliceTo(...) support which was never really used and so we should better take the chance and remove it again now as part of the next major release Modifications: Remove spliceTo(...) related code Result: Less code to maintain --- .../src/main/c/netty_epoll_native.c | 22 +- .../epoll/AbstractEpollStreamChannel.java | 393 +----------------- .../java/io/netty/channel/epoll/Native.java | 11 - .../netty/channel/epoll/EpollSpliceTest.java | 311 -------------- 4 files changed, 2 insertions(+), 735 deletions(-) delete mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSpliceTest.java diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index 22635489b6..06c923d970 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -454,25 +454,6 @@ static jint netty_epoll_native_offsetofEpollData(JNIEnv* env, jclass clazz) { return offsetof(struct epoll_event, data); } -static jint netty_epoll_native_splice0(JNIEnv* env, jclass clazz, jint fd, jlong offIn, jint fdOut, jlong offOut, jlong len) { - ssize_t res; - int err; - loff_t off_in = (loff_t) offIn; - loff_t off_out = (loff_t) offOut; - - loff_t* p_off_in = off_in >= 0 ? &off_in : NULL; - loff_t* p_off_out = off_out >= 0 ? &off_out : NULL; - - do { - res = splice(fd, p_off_in, fdOut, p_off_out, (size_t) len, SPLICE_F_NONBLOCK | SPLICE_F_MOVE); - // keep on splicing if it was interrupted - } while (res == -1 && ((err = errno) == EINTR)); - - if (res < 0) { - return -err; - } - return (jint) res; -} static jint netty_epoll_native_tcpMd5SigMaxKeyLen(JNIEnv* env, jclass clazz) { struct tcp_md5sig md5sig; @@ -515,8 +496,7 @@ static const JNINativeMethod fixed_method_table[] = { { "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 }, // "sendmmsg0" has a dynamic signature { "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent }, - { "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData }, - { "splice0", "(IJIJJ)I", (void *) netty_epoll_native_splice0 } + { "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index f86f5c49f0..b0b742ec4c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -31,11 +31,9 @@ import io.netty.channel.FileRegion; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.internal.ChannelUtils; import io.netty.channel.socket.DuplexChannel; -import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.SocketWritableByteChannel; import io.netty.channel.unix.UnixChannelUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; @@ -44,16 +42,11 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.nio.channels.WritableByteChannel; -import java.util.Queue; import java.util.concurrent.Executor; import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD; import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; -import static io.netty.channel.unix.FileDescriptor.pipe; -import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; -import static java.util.Objects.requireNonNull; public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); @@ -67,11 +60,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im ((AbstractEpollUnsafe) unsafe()).flush0(); }; - // Lazy init these if we need to splice(...) - private volatile Queue spliceQueue; - private FileDescriptor pipeIn; - private FileDescriptor pipeOut; - private WritableByteChannel byteChannel; protected AbstractEpollStreamChannel(Channel parent, EventLoop eventLoop, int fd) { @@ -114,118 +102,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return METADATA; } - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}. - * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will - * splice until the {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - * - */ - public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) { - return spliceTo(ch, len, newPromise()); - } - - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}. - * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will - * splice until the {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - * - */ - public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len, - final ChannelPromise promise) { - if (ch.eventLoop() != eventLoop()) { - throw new IllegalArgumentException("EventLoops are not the same."); - } - checkPositiveOrZero(len, "len"); - if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED - || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) { - throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED); - } - requireNonNull(promise, "promise"); - if (!isOpen()) { - promise.tryFailure(new ClosedChannelException()); - } else { - addToSpliceQueue(new SpliceInChannelTask(ch, len, promise)); - failSpliceIfClosed(promise); - } - return promise; - } - - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}. - * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the - * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the - * {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - */ - public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) { - return spliceTo(ch, offset, len, newPromise()); - } - - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}. - * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the - * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the - * {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - */ - public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len, - final ChannelPromise promise) { - checkPositiveOrZero(len, "len"); - checkPositiveOrZero(offset, "offset"); - if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) { - throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED); - } - requireNonNull(promise, "promise"); - if (!isOpen()) { - promise.tryFailure(new ClosedChannelException()); - } else { - addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise)); - failSpliceIfClosed(promise); - } - return promise; - } - - private void failSpliceIfClosed(ChannelPromise promise) { - if (!isOpen()) { - // Seems like the Channel was closed in the meantime try to fail the promise to prevent any - // cases where a future may not be notified otherwise. - if (promise.tryFailure(new ClosedChannelException())) { - // Call this via the EventLoop as it is a MPSC queue. - eventLoop().execute(this::clearSpliceQueue); - } - } - } - /** * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * @param in the collection which contains objects to write. @@ -464,12 +340,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return writeDefaultFileRegion(in, (DefaultFileRegion) msg); } else if (msg instanceof FileRegion) { return writeFileRegion(in, (FileRegion) msg); - } else if (msg instanceof SpliceOutTask) { - if (!((SpliceOutTask) msg).spliceOut()) { - return WRITE_STATUS_SNDBUF_FULL; - } - in.remove(); - return 1; } else { // Should never reach here. throw new Error(); @@ -512,7 +382,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf; } - if (msg instanceof FileRegion || msg instanceof SpliceOutTask) { + if (msg instanceof FileRegion) { return msg; } @@ -633,47 +503,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } - @Override - protected void doClose() throws Exception { - try { - // Calling super.doClose() first so spliceTo(...) will fail on next call. - super.doClose(); - } finally { - safeClosePipe(pipeIn); - safeClosePipe(pipeOut); - clearSpliceQueue(); - } - } - - private void clearSpliceQueue() { - Queue sQueue = spliceQueue; - if (sQueue == null) { - return; - } - ClosedChannelException exception = null; - - for (;;) { - SpliceInTask task = sQueue.poll(); - if (task == null) { - break; - } - if (exception == null) { - exception = new ClosedChannelException(); - } - task.promise.tryFailure(exception); - } - } - - private static void safeClosePipe(FileDescriptor fd) { - if (fd != null) { - try { - fd.close(); - } catch (IOException e) { - logger.warn("Error while closing a pipe", e); - } - } - } - class EpollStreamUnsafe extends AbstractEpollUnsafe { // Overridden here just to be able to access this method from AbstractEpollStreamChannel @Override @@ -724,24 +553,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im ByteBuf byteBuf = null; boolean close = false; try { - Queue sQueue = null; do { - if (sQueue != null || (sQueue = spliceQueue) != null) { - SpliceInTask spliceTask = sQueue.peek(); - if (spliceTask != null) { - if (spliceTask.spliceIn(allocHandle)) { - // We need to check if it is still active as if not we removed all SpliceTasks in - // doClose(...) - if (isActive()) { - sQueue.remove(); - } - continue; - } else { - break; - } - } - } - // we use a direct buffer here as the native implementations only be able // to handle direct buffers. byteBuf = allocHandle.allocate(allocator); @@ -794,209 +606,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } - private void addToSpliceQueue(final SpliceInTask task) { - Queue sQueue = spliceQueue; - if (sQueue == null) { - synchronized (this) { - sQueue = spliceQueue; - if (sQueue == null) { - spliceQueue = sQueue = PlatformDependent.newMpscQueue(); - } - } - } - sQueue.add(task); - } - - protected abstract class SpliceInTask { - final ChannelPromise promise; - int len; - - protected SpliceInTask(int len, ChannelPromise promise) { - this.promise = promise; - this.len = len; - } - - abstract boolean spliceIn(RecvByteBufAllocator.Handle handle); - - protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException { - // calculate the maximum amount of data we are allowed to splice - int length = Math.min(handle.guess(), len); - int splicedIn = 0; - for (;;) { - // Splicing until there is nothing left to splice. - int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length); - if (localSplicedIn == 0) { - break; - } - splicedIn += localSplicedIn; - length -= localSplicedIn; - } - - return splicedIn; - } - } - - // Let it directly implement channelFutureListener as well to reduce object creation. - private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener { - private final AbstractEpollStreamChannel ch; - - SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) { - super(len, promise); - this.ch = ch; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - promise.setFailure(future.cause()); - } - } - - @Override - public boolean spliceIn(RecvByteBufAllocator.Handle handle) { - assert ch.eventLoop().inEventLoop(); - if (len == 0) { - promise.setSuccess(); - return true; - } - try { - // We create the pipe on the target channel as this will allow us to just handle pending writes - // later in a correct fashion without get into any ordering issues when spliceTo(...) is called - // on multiple Channels pointing to one target Channel. - FileDescriptor pipeOut = ch.pipeOut; - if (pipeOut == null) { - // Create a new pipe as non was created before. - FileDescriptor[] pipe = pipe(); - ch.pipeIn = pipe[0]; - pipeOut = ch.pipeOut = pipe[1]; - } - - int splicedIn = spliceIn(pipeOut, handle); - if (splicedIn > 0) { - // Integer.MAX_VALUE is a special value which will result in splice forever. - if (len != Integer.MAX_VALUE) { - len -= splicedIn; - } - - // Depending on if we are done with splicing inbound data we set the right promise for the - // outbound splicing. - final ChannelPromise splicePromise; - if (len == 0) { - splicePromise = promise; - } else { - splicePromise = ch.newPromise().addListener(this); - } - - boolean autoRead = config().isAutoRead(); - - // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this - // case. - ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise); - ch.unsafe().flush(); - if (autoRead && !splicePromise.isDone()) { - // Write was not done which means the target channel was not writable. In this case we need to - // disable reading until we are done with splicing to the target channel because: - // - // - The user may want to to trigger another splice operation once the splicing was complete. - config().setAutoRead(false); - } - } - - return len == 0; - } catch (Throwable cause) { - promise.setFailure(cause); - return true; - } - } - } - - private final class SpliceOutTask { - private final AbstractEpollStreamChannel ch; - private final boolean autoRead; - private int len; - - SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) { - this.ch = ch; - this.len = len; - this.autoRead = autoRead; - } - - public boolean spliceOut() throws Exception { - assert ch.eventLoop().inEventLoop(); - try { - int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len); - len -= splicedOut; - if (len == 0) { - if (autoRead) { - // AutoRead was used and we spliced everything so start reading again - config().setAutoRead(true); - } - return true; - } - return false; - } catch (IOException e) { - if (autoRead) { - // AutoRead was used and we spliced everything so start reading again - config().setAutoRead(true); - } - throw e; - } - } - } - - private final class SpliceFdTask extends SpliceInTask { - private final FileDescriptor fd; - private final ChannelPromise promise; - private int offset; - - SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) { - super(len, promise); - this.fd = fd; - this.promise = promise; - this.offset = offset; - } - - @Override - public boolean spliceIn(RecvByteBufAllocator.Handle handle) { - assert eventLoop().inEventLoop(); - if (len == 0) { - promise.setSuccess(); - return true; - } - - try { - FileDescriptor[] pipe = pipe(); - FileDescriptor pipeIn = pipe[0]; - FileDescriptor pipeOut = pipe[1]; - try { - int splicedIn = spliceIn(pipeOut, handle); - if (splicedIn > 0) { - // Integer.MAX_VALUE is a special value which will result in splice forever. - if (len != Integer.MAX_VALUE) { - len -= splicedIn; - } - do { - int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn); - offset += splicedOut; - splicedIn -= splicedOut; - } while (splicedIn > 0); - if (len == 0) { - promise.setSuccess(); - return true; - } - } - return false; - } finally { - safeClosePipe(pipeIn); - safeClosePipe(pipeOut); - } - } catch (Throwable cause) { - promise.setFailure(cause); - return true; - } - } - } - private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel { EpollSocketWritableByteChannel() { super(socket); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 32c9ba7f5b..ea850e706f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -164,17 +164,6 @@ public final class Native { } private static native int epollCtlDel0(int efd, int fd); - // File-descriptor operations - public static int splice(int fd, long offIn, int fdOut, long offOut, long len) throws IOException { - int res = splice0(fd, offIn, fdOut, offOut, len); - if (res >= 0) { - return res; - } - return ioResult("splice", res); - } - - private static native int splice0(int fd, long offIn, int fdOut, long offOut, long len); - @Deprecated public static int sendmmsg(int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSpliceTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSpliceTest.java deleted file mode 100644 index a1ee7a8213..0000000000 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSpliceTest.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.epoll; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.unix.FileDescriptor; -import io.netty.util.NetUtil; -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; - -public class EpollSpliceTest { - - private static final int SPLICE_LEN = 32 * 1024; - private static final Random random = new Random(); - private static final byte[] data = new byte[1048576]; - - static { - random.nextBytes(data); - } - - @Test - public void spliceToSocket() throws Throwable { - final EchoHandler sh = new EchoHandler(); - final EchoHandler ch = new EchoHandler(); - - EventLoopGroup group = new MultithreadEventLoopGroup(1, EpollHandler.newFactory()); - ServerBootstrap bs = new ServerBootstrap(); - bs.channel(EpollServerSocketChannel.class); - bs.group(group).childHandler(sh); - final Channel sc = bs.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel(); - - ServerBootstrap bs2 = new ServerBootstrap(); - bs2.channel(EpollServerSocketChannel.class); - bs2.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - bs2.group(group).childHandler(new ChannelHandler() { - @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - ctx.channel().config().setAutoRead(false); - Bootstrap bs = new Bootstrap(); - bs.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - - bs.channel(EpollSocketChannel.class); - bs.group(ctx.channel().eventLoop()).handler(new ChannelHandler() { - @Override - public void channelActive(ChannelHandlerContext context) throws Exception { - final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel(); - final EpollSocketChannel ch2 = (EpollSocketChannel) context.channel(); - // We are splicing two channels together, at this point we have a tcp proxy which handles all - // the data transfer only in kernel space! - - // Integer.MAX_VALUE will splice infinitly. - ch.spliceTo(ch2, Integer.MAX_VALUE).addListener((ChannelFutureListener) future -> { - if (!future.isSuccess()) { - future.channel().close(); - } - }); - // Trigger multiple splices to see if partial splicing works as well. - ch2.spliceTo(ch, SPLICE_LEN).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - future.channel().close(); - } else { - ch2.spliceTo(ch, SPLICE_LEN).addListener(this); - } - } - }); - ctx.channel().config().setAutoRead(true); - } - - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - context.close(); - } - }); - bs.connect(sc.localAddress()).addListener((ChannelFutureListener) future -> { - if (!future.isSuccess()) { - ctx.close(); - } else { - future.channel().closeFuture().addListener((ChannelFutureListener) future1 -> ctx.close()); - } - }); - } - }); - Channel pc = bs2.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel(); - - Bootstrap cb = new Bootstrap(); - cb.group(group); - cb.channel(EpollSocketChannel.class); - cb.handler(ch); - Channel cc = cb.connect(pc.localAddress()).syncUninterruptibly().channel(); - - for (int i = 0; i < data.length;) { - int length = Math.min(random.nextInt(1024 * 64), data.length - i); - ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); - cc.writeAndFlush(buf); - i += length; - } - - while (ch.counter < data.length) { - if (sh.exception.get() != null) { - break; - } - if (ch.exception.get() != null) { - break; - } - - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore. - } - } - - while (sh.counter < data.length) { - if (sh.exception.get() != null) { - break; - } - if (ch.exception.get() != null) { - break; - } - - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore. - } - } - - sh.channel.close().sync(); - ch.channel.close().sync(); - sc.close().sync(); - pc.close().sync(); - group.shutdownGracefully(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); - } - } - - @Test(timeout = 10000) - public void spliceToFile() throws Throwable { - EventLoopGroup group = new MultithreadEventLoopGroup(1, EpollHandler.newFactory()); - File file = File.createTempFile("netty-splice", null); - file.deleteOnExit(); - - SpliceHandler sh = new SpliceHandler(file); - ServerBootstrap bs = new ServerBootstrap(); - bs.channel(EpollServerSocketChannel.class); - bs.group(group).childHandler(sh); - bs.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - Channel sc = bs.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel(); - - Bootstrap cb = new Bootstrap(); - cb.group(group); - cb.channel(EpollSocketChannel.class); - cb.handler(new ChannelHandler() { }); - Channel cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); - - for (int i = 0; i < data.length;) { - int length = Math.min(random.nextInt(1024 * 64), data.length - i); - ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); - cc.writeAndFlush(buf); - i += length; - } - - while (sh.future2 == null || !sh.future2.isDone() || !sh.future.isDone()) { - if (sh.exception.get() != null) { - break; - } - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore. - } - } - - sc.close().sync(); - cc.close().sync(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - - byte[] written = new byte[data.length]; - FileInputStream in = new FileInputStream(file); - - try { - Assert.assertEquals(written.length, in.read(written)); - Assert.assertArrayEquals(data, written); - } finally { - in.close(); - group.shutdownGracefully(); - } - } - - private static class EchoHandler extends SimpleChannelInboundHandler { - volatile Channel channel; - final AtomicReference exception = new AtomicReference<>(); - volatile int counter; - - @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - channel = ctx.channel(); - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); - - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(data[i + lastIdx], actual[i]); - } - - if (channel.parent() != null) { - channel.write(Unpooled.wrappedBuffer(actual)); - } - - counter += actual.length; - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - if (exception.compareAndSet(null, cause)) { - cause.printStackTrace(); - ctx.close(); - } - } - } - - private static class SpliceHandler implements ChannelHandler { - private final File file; - - volatile ChannelFuture future; - volatile ChannelFuture future2; - final AtomicReference exception = new AtomicReference<>(); - - SpliceHandler(File file) { - this.file = file; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel(); - final FileDescriptor fd = FileDescriptor.from(file); - - // splice two halves separately to test starting offset - future = ch.spliceTo(fd, 0, data.length / 2); - future2 = ch.spliceTo(fd, data.length / 2, data.length / 2); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - if (exception.compareAndSet(null, cause)) { - cause.printStackTrace(); - ctx.close(); - } - } - } -}