From 699ef0784ef7527b8dc51026033d33fc86bc0230 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 15 May 2013 15:10:41 +0200 Subject: [PATCH] [#1317] Allow to use VoidPromise for flush(...), write(...) and sendFile(...) * This also move rename Channel.Unsafe.voidFuture() to ChannelPropertyAccess.voidPromise() --- .../transport/socket/SocketEchoTest.java | 36 +++- .../socket/SocketFileRegionTest.java | 25 ++- .../io/netty/channel/AbstractChannel.java | 166 ++++++++++++------ .../main/java/io/netty/channel/Channel.java | 6 - .../java/io/netty/channel/ChannelFuture.java | 8 - .../io/netty/channel/ChannelHandlerUtil.java | 21 ++- .../netty/channel/ChannelPropertyAccess.java | 18 ++ .../channel/DefaultChannelHandlerContext.java | 30 ++-- .../channel/ThreadPerChannelEventLoop.java | 2 +- .../io/netty/channel/VoidChannelPromise.java | 26 +-- .../netty/channel/aio/AbstractAioChannel.java | 2 +- .../io/netty/channel/aio/AioEventLoop.java | 2 +- .../io/netty/channel/local/LocalChannel.java | 10 +- .../channel/local/LocalServerChannel.java | 2 +- .../channel/nio/AbstractNioByteChannel.java | 45 ++--- .../netty/channel/nio/AbstractNioChannel.java | 2 +- .../nio/AbstractNioMessageChannel.java | 2 +- .../io/netty/channel/nio/NioEventLoop.java | 8 +- .../channel/oio/AbstractOioByteChannel.java | 4 +- .../oio/AbstractOioMessageChannel.java | 4 +- .../channel/oio/OioByteStreamChannel.java | 22 +-- .../channel/socket/aio/AioSocketChannel.java | 41 ++--- 22 files changed, 290 insertions(+), 192 deletions(-) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java index 7a6ad9fa7c..35b4ec84bd 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java @@ -64,7 +64,7 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false); + testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false, false); } @Test(timeout = 30000) @@ -73,7 +73,7 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoWithBridge(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true); + testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true, false); } @Test(timeout = 30000) @@ -82,7 +82,7 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, 32, false); + testSimpleEcho0(sb, cb, 32, false, false); } @Test(timeout = 30000) @@ -91,11 +91,30 @@ public class SocketEchoTest extends AbstractSocketTest { } public void testSimpleEchoWithBridgedBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable { - testSimpleEcho0(sb, cb, 32, true); + testSimpleEcho0(sb, cb, 32, true, false); + } + + @Test(timeout = 30000) + public void testSimpleEchoWithVoidPromise() throws Throwable { + run(); + } + + public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false, true); + } + + @Test(timeout = 30000) + public void testSimpleEchoWithBridgeAndVoidPromise() throws Throwable { + run(); + } + + public void testSimpleEchoWithBridgeAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true, true); } private static void testSimpleEcho0( - ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge) throws Throwable { + ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge, boolean voidPromise) + throws Throwable { final EchoHandler sh = new EchoHandler(maxInboundBufferSize); final EchoHandler ch = new EchoHandler(maxInboundBufferSize); @@ -123,7 +142,12 @@ public class SocketEchoTest extends AbstractSocketTest { for (int i = 0; i < data.length;) { int length = Math.min(random.nextInt(1024 * 64), data.length - i); - cc.write(Unpooled.wrappedBuffer(data, i, length)); + ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); + if (voidPromise) { + assertEquals(cc.voidPromise(), cc.write(buf, cc.voidPromise())); + } else { + assertNotEquals(cc.voidPromise(), cc.write(buf)); + } i += length; } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java index 26289544e1..0a002c30ba 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java @@ -19,10 +19,10 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.DefaultFileRegion; +import io.netty.channel.FileRegion; import org.junit.Test; import java.io.File; @@ -48,7 +48,20 @@ public class SocketFileRegionTest extends AbstractSocketTest { run(); } + @Test + public void testFileRegionVoidPromise() throws Throwable { + run(); + } + public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testFileRegion0(sb, cb, false); + } + + public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testFileRegion0(sb, cb, true); + } + + private void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable { File file = File.createTempFile("netty-", ".tmp"); file.deleteOnExit(); @@ -75,9 +88,13 @@ public class SocketFileRegionTest extends AbstractSocketTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); - ChannelFuture future = cc.sendFile(new DefaultFileRegion(new FileInputStream(file).getChannel(), - 0L, file.length())).syncUninterruptibly(); - assertTrue(future.isSuccess()); + FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(), + 0L, file.length()); + if (voidPromise) { + assertEquals(cc.voidPromise(), cc.sendFile(region, cc.voidPromise())); + } else { + assertNotEquals(cc.voidPromise(), cc.sendFile(region)); + } while (sh.counter < data.length) { if (sh.exception.get() != null) { break; diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index c4d36516ad..f6cb4c4e87 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -92,6 +92,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ClosedChannelException closedChannelException; private boolean inFlushNow; private boolean flushNowPending; + private FlushTask flushTaskInProgress; /** Cache for the string representation of this channel */ private boolean strValActive; @@ -436,56 +437,124 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return strVal; } + @Override + public final ChannelPromise voidPromise() { + return voidPromise; + } + /** - * {@link Unsafe} implementation which sub-classes must extend and use. + * Task which will flush a {@link FileRegion} */ - protected abstract class AbstractUnsafe implements Unsafe { + protected final class FlushTask { + private final FileRegion region; + private final ChannelPromise promise; + private FlushTask next; + private final AbstractUnsafe unsafe; - private final class FlushTask { - final FileRegion region; - final ChannelPromise promise; - FlushTask next; + FlushTask(AbstractUnsafe unsafe, FileRegion region, ChannelPromise promise) { + this.region = region; + this.promise = promise; + this.unsafe = unsafe; + } - FlushTask(FileRegion region, ChannelPromise promise) { - this.region = region; - this.promise = promise; - promise.addListener(new ChannelFutureListener() { + /** + * Mark the task as success. Multiple calls if this will throw a {@link IllegalStateException}. + * + * This also will call {@link FileRegion#release()}. + */ + public void setSuccess() { + if (eventLoop().inEventLoop()) { + promise.setSuccess(); + complete(); + } else { + eventLoop().execute(new Runnable() { @Override - public void operationComplete(ChannelFuture future) throws Exception { - flushTaskInProgress = next; - if (next != null) { - try { - FileRegion region = next.region; - if (region == null) { - // no region present means the next flush task was to directly flush - // the outbound buffer - flushNotifierAndFlush(next.promise); - } else { - // flush the region now - doFlushFileRegion(region, next.promise); - } - } catch (Throwable cause) { - next.promise.setFailure(cause); - } - } else { - // notify the flush futures - flushFutureNotifier.notifyFlushFutures(); - } + public void run() { + setSuccess(); } }); } } + /** + * Notify the task of progress in transfer of the {@link FileRegion}. + */ + public void setProgress(long progress) { + if (promise instanceof ChannelProgressivePromise) { + ((ChannelProgressivePromise) promise).setProgress(progress, region.count()); + } + } + + /** + * Mark the task as failure. Multiple calls if this will throw a {@link IllegalStateException}. + * + * This also will call {@link FileRegion#release()}. + */ + public void setFailure(final Throwable cause) { + if (eventLoop().inEventLoop()) { + promise.setFailure(cause); + complete(); + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + setFailure(cause); + } + }); + } + } + + /** + * Return the {@link FileRegion} which should be flushed + */ + public FileRegion region() { + return region; + } + + private void complete() { + region.release(); + flushTaskInProgress = next; + if (next != null) { + try { + FileRegion region = next.region; + if (region == null) { + // no region present means the next flush task was to directly flush + // the outbound buffer + unsafe.flushNotifierAndFlush(next.promise); + } else { + // flush the region now + doFlushFileRegion(next); + } + } catch (Throwable cause) { + next.promise.setFailure(cause); + } + } else { + // notify the flush futures + flushFutureNotifier.notifyFlushFutures(); + } + } + } + + /** + * {@link Unsafe} implementation which sub-classes must extend and use. + */ + protected abstract class AbstractUnsafe implements Unsafe { + + private final Runnable beginReadTask = new Runnable() { + @Override + public void run() { + beginRead(); + } + }; + private final Runnable flushLaterTask = new Runnable() { @Override public void run() { flushNowPending = false; - flush(voidFuture()); + flush(voidPromise()); } }; - private FlushTask flushTaskInProgress; - @Override public final void sendFile(final FileRegion region, final ChannelPromise promise) { if (outboundBufSize() > 0) { @@ -504,10 +573,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private void sendFile0(FileRegion region, ChannelPromise promise) { FlushTask task = flushTaskInProgress; if (task == null) { - flushTaskInProgress = new FlushTask(region, promise); + flushTaskInProgress = task = new FlushTask(this, region, promise); try { // the first FileRegion to flush so trigger it now! - doFlushFileRegion(region, promise); + doFlushFileRegion(task); } catch (Throwable cause) { region.release(); promise.setFailure(cause); @@ -523,7 +592,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha task = next; } // there is something that needs to get flushed first so add it as next in the chain - task.next = new FlushTask(region, promise); + task.next = new FlushTask(this, region, promise); } @Override @@ -531,11 +600,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline.head; } - @Override - public final ChannelPromise voidFuture() { - return voidPromise; - } - @Override public final SocketAddress localAddress() { return localAddress0(); @@ -606,7 +670,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + - "Swallowing the cause of the registration failure:", t); + "Swallowing the cause of the registration failure:", t); } closeFuture.setClosed(); } @@ -691,7 +755,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha }); } - deregister(voidFuture()); + deregister(voidPromise()); } else { // Closed already. promise.setSuccess(); @@ -757,7 +821,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha pipeline.fireExceptionCaught(e); } }); - close(unsafe().voidFuture()); + close(voidPromise()); } } @@ -775,7 +839,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } task = t.next; } - task.next = new FlushTask(null, promise); + task.next = new FlushTask(this, null, promise); } } @@ -815,7 +879,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } catch (Throwable t) { flushFutureNotifier.notifyFlushFutures(t); if (t instanceof IOException) { - close(voidFuture()); + close(voidPromise()); } } } else { @@ -864,7 +928,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } else { flushFutureNotifier.notifyFlushFutures(cause); if (cause instanceof IOException) { - close(voidFuture()); + close(voidPromise()); } } } finally { @@ -886,7 +950,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (isOpen()) { return; } - close(voidFuture()); + close(voidPromise()); } private void invokeLater(Runnable task) { @@ -987,11 +1051,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } /** - * Flush the content of the given {@link FileRegion} to the remote peer. + * Flush the content of the given {@link FlushTask} to the remote peer. * * Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException} */ - protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception { + protected void doFlushFileRegion(FlushTask task) throws Exception { throw new UnsupportedOperationException(); } @@ -1008,7 +1072,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha */ protected abstract boolean isFlushPending(); - private final class CloseFuture extends DefaultChannelPromise implements ChannelFuture.Unsafe { + final class CloseFuture extends DefaultChannelPromise { CloseFuture(AbstractChannel ch) { super(ch); diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 81572f7c37..fe34b38791 100755 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -173,7 +173,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr * following methods: *