diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index 22635489b6..06c923d970 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -454,25 +454,6 @@ static jint netty_epoll_native_offsetofEpollData(JNIEnv* env, jclass clazz) { return offsetof(struct epoll_event, data); } -static jint netty_epoll_native_splice0(JNIEnv* env, jclass clazz, jint fd, jlong offIn, jint fdOut, jlong offOut, jlong len) { - ssize_t res; - int err; - loff_t off_in = (loff_t) offIn; - loff_t off_out = (loff_t) offOut; - - loff_t* p_off_in = off_in >= 0 ? &off_in : NULL; - loff_t* p_off_out = off_out >= 0 ? &off_out : NULL; - - do { - res = splice(fd, p_off_in, fdOut, p_off_out, (size_t) len, SPLICE_F_NONBLOCK | SPLICE_F_MOVE); - // keep on splicing if it was interrupted - } while (res == -1 && ((err = errno) == EINTR)); - - if (res < 0) { - return -err; - } - return (jint) res; -} static jint netty_epoll_native_tcpMd5SigMaxKeyLen(JNIEnv* env, jclass clazz) { struct tcp_md5sig md5sig; @@ -515,8 +496,7 @@ static const JNINativeMethod fixed_method_table[] = { { "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 }, // "sendmmsg0" has a dynamic signature { "sizeofEpollEvent", "()I", (void *) netty_epoll_native_sizeofEpollEvent }, - { "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData }, - { "splice0", "(IJIJJ)I", (void *) netty_epoll_native_splice0 } + { "offsetofEpollData", "()I", (void *) netty_epoll_native_offsetofEpollData } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index f86f5c49f0..b0b742ec4c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -31,11 +31,9 @@ import io.netty.channel.FileRegion; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.internal.ChannelUtils; import io.netty.channel.socket.DuplexChannel; -import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.IovArray; import io.netty.channel.unix.SocketWritableByteChannel; import io.netty.channel.unix.UnixChannelUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; @@ -44,16 +42,11 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.nio.channels.WritableByteChannel; -import java.util.Queue; import java.util.concurrent.Executor; import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD; import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL; -import static io.netty.channel.unix.FileDescriptor.pipe; -import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; -import static java.util.Objects.requireNonNull; public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); @@ -67,11 +60,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im ((AbstractEpollUnsafe) unsafe()).flush0(); }; - // Lazy init these if we need to splice(...) - private volatile Queue spliceQueue; - private FileDescriptor pipeIn; - private FileDescriptor pipeOut; - private WritableByteChannel byteChannel; protected AbstractEpollStreamChannel(Channel parent, EventLoop eventLoop, int fd) { @@ -114,118 +102,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return METADATA; } - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}. - * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will - * splice until the {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - * - */ - public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) { - return spliceTo(ch, len, newPromise()); - } - - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}. - * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will - * splice until the {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - * - */ - public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len, - final ChannelPromise promise) { - if (ch.eventLoop() != eventLoop()) { - throw new IllegalArgumentException("EventLoops are not the same."); - } - checkPositiveOrZero(len, "len"); - if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED - || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) { - throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED); - } - requireNonNull(promise, "promise"); - if (!isOpen()) { - promise.tryFailure(new ClosedChannelException()); - } else { - addToSpliceQueue(new SpliceInChannelTask(ch, len, promise)); - failSpliceIfClosed(promise); - } - return promise; - } - - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}. - * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the - * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the - * {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - */ - public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) { - return spliceTo(ch, offset, len, newPromise()); - } - - /** - * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}. - * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the - * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the - * {@link ChannelFuture} was canceled or it was failed. - * - * Please note: - * - */ - public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len, - final ChannelPromise promise) { - checkPositiveOrZero(len, "len"); - checkPositiveOrZero(offset, "offset"); - if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) { - throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED); - } - requireNonNull(promise, "promise"); - if (!isOpen()) { - promise.tryFailure(new ClosedChannelException()); - } else { - addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise)); - failSpliceIfClosed(promise); - } - return promise; - } - - private void failSpliceIfClosed(ChannelPromise promise) { - if (!isOpen()) { - // Seems like the Channel was closed in the meantime try to fail the promise to prevent any - // cases where a future may not be notified otherwise. - if (promise.tryFailure(new ClosedChannelException())) { - // Call this via the EventLoop as it is a MPSC queue. - eventLoop().execute(this::clearSpliceQueue); - } - } - } - /** * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * @param in the collection which contains objects to write. @@ -464,12 +340,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return writeDefaultFileRegion(in, (DefaultFileRegion) msg); } else if (msg instanceof FileRegion) { return writeFileRegion(in, (FileRegion) msg); - } else if (msg instanceof SpliceOutTask) { - if (!((SpliceOutTask) msg).spliceOut()) { - return WRITE_STATUS_SNDBUF_FULL; - } - in.remove(); - return 1; } else { // Should never reach here. throw new Error(); @@ -512,7 +382,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf; } - if (msg instanceof FileRegion || msg instanceof SpliceOutTask) { + if (msg instanceof FileRegion) { return msg; } @@ -633,47 +503,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } - @Override - protected void doClose() throws Exception { - try { - // Calling super.doClose() first so spliceTo(...) will fail on next call. - super.doClose(); - } finally { - safeClosePipe(pipeIn); - safeClosePipe(pipeOut); - clearSpliceQueue(); - } - } - - private void clearSpliceQueue() { - Queue sQueue = spliceQueue; - if (sQueue == null) { - return; - } - ClosedChannelException exception = null; - - for (;;) { - SpliceInTask task = sQueue.poll(); - if (task == null) { - break; - } - if (exception == null) { - exception = new ClosedChannelException(); - } - task.promise.tryFailure(exception); - } - } - - private static void safeClosePipe(FileDescriptor fd) { - if (fd != null) { - try { - fd.close(); - } catch (IOException e) { - logger.warn("Error while closing a pipe", e); - } - } - } - class EpollStreamUnsafe extends AbstractEpollUnsafe { // Overridden here just to be able to access this method from AbstractEpollStreamChannel @Override @@ -724,24 +553,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im ByteBuf byteBuf = null; boolean close = false; try { - Queue sQueue = null; do { - if (sQueue != null || (sQueue = spliceQueue) != null) { - SpliceInTask spliceTask = sQueue.peek(); - if (spliceTask != null) { - if (spliceTask.spliceIn(allocHandle)) { - // We need to check if it is still active as if not we removed all SpliceTasks in - // doClose(...) - if (isActive()) { - sQueue.remove(); - } - continue; - } else { - break; - } - } - } - // we use a direct buffer here as the native implementations only be able // to handle direct buffers. byteBuf = allocHandle.allocate(allocator); @@ -794,209 +606,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } } - private void addToSpliceQueue(final SpliceInTask task) { - Queue sQueue = spliceQueue; - if (sQueue == null) { - synchronized (this) { - sQueue = spliceQueue; - if (sQueue == null) { - spliceQueue = sQueue = PlatformDependent.newMpscQueue(); - } - } - } - sQueue.add(task); - } - - protected abstract class SpliceInTask { - final ChannelPromise promise; - int len; - - protected SpliceInTask(int len, ChannelPromise promise) { - this.promise = promise; - this.len = len; - } - - abstract boolean spliceIn(RecvByteBufAllocator.Handle handle); - - protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException { - // calculate the maximum amount of data we are allowed to splice - int length = Math.min(handle.guess(), len); - int splicedIn = 0; - for (;;) { - // Splicing until there is nothing left to splice. - int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length); - if (localSplicedIn == 0) { - break; - } - splicedIn += localSplicedIn; - length -= localSplicedIn; - } - - return splicedIn; - } - } - - // Let it directly implement channelFutureListener as well to reduce object creation. - private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener { - private final AbstractEpollStreamChannel ch; - - SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) { - super(len, promise); - this.ch = ch; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - promise.setFailure(future.cause()); - } - } - - @Override - public boolean spliceIn(RecvByteBufAllocator.Handle handle) { - assert ch.eventLoop().inEventLoop(); - if (len == 0) { - promise.setSuccess(); - return true; - } - try { - // We create the pipe on the target channel as this will allow us to just handle pending writes - // later in a correct fashion without get into any ordering issues when spliceTo(...) is called - // on multiple Channels pointing to one target Channel. - FileDescriptor pipeOut = ch.pipeOut; - if (pipeOut == null) { - // Create a new pipe as non was created before. - FileDescriptor[] pipe = pipe(); - ch.pipeIn = pipe[0]; - pipeOut = ch.pipeOut = pipe[1]; - } - - int splicedIn = spliceIn(pipeOut, handle); - if (splicedIn > 0) { - // Integer.MAX_VALUE is a special value which will result in splice forever. - if (len != Integer.MAX_VALUE) { - len -= splicedIn; - } - - // Depending on if we are done with splicing inbound data we set the right promise for the - // outbound splicing. - final ChannelPromise splicePromise; - if (len == 0) { - splicePromise = promise; - } else { - splicePromise = ch.newPromise().addListener(this); - } - - boolean autoRead = config().isAutoRead(); - - // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this - // case. - ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise); - ch.unsafe().flush(); - if (autoRead && !splicePromise.isDone()) { - // Write was not done which means the target channel was not writable. In this case we need to - // disable reading until we are done with splicing to the target channel because: - // - // - The user may want to to trigger another splice operation once the splicing was complete. - config().setAutoRead(false); - } - } - - return len == 0; - } catch (Throwable cause) { - promise.setFailure(cause); - return true; - } - } - } - - private final class SpliceOutTask { - private final AbstractEpollStreamChannel ch; - private final boolean autoRead; - private int len; - - SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) { - this.ch = ch; - this.len = len; - this.autoRead = autoRead; - } - - public boolean spliceOut() throws Exception { - assert ch.eventLoop().inEventLoop(); - try { - int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len); - len -= splicedOut; - if (len == 0) { - if (autoRead) { - // AutoRead was used and we spliced everything so start reading again - config().setAutoRead(true); - } - return true; - } - return false; - } catch (IOException e) { - if (autoRead) { - // AutoRead was used and we spliced everything so start reading again - config().setAutoRead(true); - } - throw e; - } - } - } - - private final class SpliceFdTask extends SpliceInTask { - private final FileDescriptor fd; - private final ChannelPromise promise; - private int offset; - - SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) { - super(len, promise); - this.fd = fd; - this.promise = promise; - this.offset = offset; - } - - @Override - public boolean spliceIn(RecvByteBufAllocator.Handle handle) { - assert eventLoop().inEventLoop(); - if (len == 0) { - promise.setSuccess(); - return true; - } - - try { - FileDescriptor[] pipe = pipe(); - FileDescriptor pipeIn = pipe[0]; - FileDescriptor pipeOut = pipe[1]; - try { - int splicedIn = spliceIn(pipeOut, handle); - if (splicedIn > 0) { - // Integer.MAX_VALUE is a special value which will result in splice forever. - if (len != Integer.MAX_VALUE) { - len -= splicedIn; - } - do { - int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn); - offset += splicedOut; - splicedIn -= splicedOut; - } while (splicedIn > 0); - if (len == 0) { - promise.setSuccess(); - return true; - } - } - return false; - } finally { - safeClosePipe(pipeIn); - safeClosePipe(pipeOut); - } - } catch (Throwable cause) { - promise.setFailure(cause); - return true; - } - } - } - private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel { EpollSocketWritableByteChannel() { super(socket); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index 32c9ba7f5b..ea850e706f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -164,17 +164,6 @@ public final class Native { } private static native int epollCtlDel0(int efd, int fd); - // File-descriptor operations - public static int splice(int fd, long offIn, int fdOut, long offOut, long len) throws IOException { - int res = splice0(fd, offIn, fdOut, offOut, len); - if (res >= 0) { - return res; - } - return ioResult("splice", res); - } - - private static native int splice0(int fd, long offIn, int fdOut, long offOut, long len); - @Deprecated public static int sendmmsg(int fd, NativeDatagramPacketArray.NativeDatagramPacket[] msgs, int offset, int len) throws IOException { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSpliceTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSpliceTest.java deleted file mode 100644 index a1ee7a8213..0000000000 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSpliceTest.java +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.epoll; - -import io.netty.bootstrap.Bootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.unix.FileDescriptor; -import io.netty.util.NetUtil; -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; - -public class EpollSpliceTest { - - private static final int SPLICE_LEN = 32 * 1024; - private static final Random random = new Random(); - private static final byte[] data = new byte[1048576]; - - static { - random.nextBytes(data); - } - - @Test - public void spliceToSocket() throws Throwable { - final EchoHandler sh = new EchoHandler(); - final EchoHandler ch = new EchoHandler(); - - EventLoopGroup group = new MultithreadEventLoopGroup(1, EpollHandler.newFactory()); - ServerBootstrap bs = new ServerBootstrap(); - bs.channel(EpollServerSocketChannel.class); - bs.group(group).childHandler(sh); - final Channel sc = bs.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel(); - - ServerBootstrap bs2 = new ServerBootstrap(); - bs2.channel(EpollServerSocketChannel.class); - bs2.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - bs2.group(group).childHandler(new ChannelHandler() { - @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - ctx.channel().config().setAutoRead(false); - Bootstrap bs = new Bootstrap(); - bs.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - - bs.channel(EpollSocketChannel.class); - bs.group(ctx.channel().eventLoop()).handler(new ChannelHandler() { - @Override - public void channelActive(ChannelHandlerContext context) throws Exception { - final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel(); - final EpollSocketChannel ch2 = (EpollSocketChannel) context.channel(); - // We are splicing two channels together, at this point we have a tcp proxy which handles all - // the data transfer only in kernel space! - - // Integer.MAX_VALUE will splice infinitly. - ch.spliceTo(ch2, Integer.MAX_VALUE).addListener((ChannelFutureListener) future -> { - if (!future.isSuccess()) { - future.channel().close(); - } - }); - // Trigger multiple splices to see if partial splicing works as well. - ch2.spliceTo(ch, SPLICE_LEN).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - future.channel().close(); - } else { - ch2.spliceTo(ch, SPLICE_LEN).addListener(this); - } - } - }); - ctx.channel().config().setAutoRead(true); - } - - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - context.close(); - } - }); - bs.connect(sc.localAddress()).addListener((ChannelFutureListener) future -> { - if (!future.isSuccess()) { - ctx.close(); - } else { - future.channel().closeFuture().addListener((ChannelFutureListener) future1 -> ctx.close()); - } - }); - } - }); - Channel pc = bs2.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel(); - - Bootstrap cb = new Bootstrap(); - cb.group(group); - cb.channel(EpollSocketChannel.class); - cb.handler(ch); - Channel cc = cb.connect(pc.localAddress()).syncUninterruptibly().channel(); - - for (int i = 0; i < data.length;) { - int length = Math.min(random.nextInt(1024 * 64), data.length - i); - ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); - cc.writeAndFlush(buf); - i += length; - } - - while (ch.counter < data.length) { - if (sh.exception.get() != null) { - break; - } - if (ch.exception.get() != null) { - break; - } - - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore. - } - } - - while (sh.counter < data.length) { - if (sh.exception.get() != null) { - break; - } - if (ch.exception.get() != null) { - break; - } - - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore. - } - } - - sh.channel.close().sync(); - ch.channel.close().sync(); - sc.close().sync(); - pc.close().sync(); - group.shutdownGracefully(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); - } - } - - @Test(timeout = 10000) - public void spliceToFile() throws Throwable { - EventLoopGroup group = new MultithreadEventLoopGroup(1, EpollHandler.newFactory()); - File file = File.createTempFile("netty-splice", null); - file.deleteOnExit(); - - SpliceHandler sh = new SpliceHandler(file); - ServerBootstrap bs = new ServerBootstrap(); - bs.channel(EpollServerSocketChannel.class); - bs.group(group).childHandler(sh); - bs.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); - Channel sc = bs.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel(); - - Bootstrap cb = new Bootstrap(); - cb.group(group); - cb.channel(EpollSocketChannel.class); - cb.handler(new ChannelHandler() { }); - Channel cc = cb.connect(sc.localAddress()).syncUninterruptibly().channel(); - - for (int i = 0; i < data.length;) { - int length = Math.min(random.nextInt(1024 * 64), data.length - i); - ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); - cc.writeAndFlush(buf); - i += length; - } - - while (sh.future2 == null || !sh.future2.isDone() || !sh.future.isDone()) { - if (sh.exception.get() != null) { - break; - } - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore. - } - } - - sc.close().sync(); - cc.close().sync(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - - byte[] written = new byte[data.length]; - FileInputStream in = new FileInputStream(file); - - try { - Assert.assertEquals(written.length, in.read(written)); - Assert.assertArrayEquals(data, written); - } finally { - in.close(); - group.shutdownGracefully(); - } - } - - private static class EchoHandler extends SimpleChannelInboundHandler { - volatile Channel channel; - final AtomicReference exception = new AtomicReference<>(); - volatile int counter; - - @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - channel = ctx.channel(); - } - - @Override - public void messageReceived(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); - - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(data[i + lastIdx], actual[i]); - } - - if (channel.parent() != null) { - channel.write(Unpooled.wrappedBuffer(actual)); - } - - counter += actual.length; - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - if (exception.compareAndSet(null, cause)) { - cause.printStackTrace(); - ctx.close(); - } - } - } - - private static class SpliceHandler implements ChannelHandler { - private final File file; - - volatile ChannelFuture future; - volatile ChannelFuture future2; - final AtomicReference exception = new AtomicReference<>(); - - SpliceHandler(File file) { - this.file = file; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - final EpollSocketChannel ch = (EpollSocketChannel) ctx.channel(); - final FileDescriptor fd = FileDescriptor.from(file); - - // splice two halves separately to test starting offset - future = ch.spliceTo(fd, 0, data.length / 2); - future2 = ch.spliceTo(fd, data.length / 2, data.length / 2); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - if (exception.compareAndSet(null, cause)) { - cause.printStackTrace(); - ctx.close(); - } - } - } -}