diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java
index 7a6ad9fa7c..35b4ec84bd 100644
--- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java
+++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketEchoTest.java
@@ -64,7 +64,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
- testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false);
+ testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false, false);
}
@Test(timeout = 30000)
@@ -73,7 +73,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithBridge(ServerBootstrap sb, Bootstrap cb) throws Throwable {
- testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true);
+ testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true, false);
}
@Test(timeout = 30000)
@@ -82,7 +82,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
- testSimpleEcho0(sb, cb, 32, false);
+ testSimpleEcho0(sb, cb, 32, false, false);
}
@Test(timeout = 30000)
@@ -91,11 +91,30 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithBridgedBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
- testSimpleEcho0(sb, cb, 32, true);
+ testSimpleEcho0(sb, cb, 32, true, false);
+ }
+
+ @Test(timeout = 30000)
+ public void testSimpleEchoWithVoidPromise() throws Throwable {
+ run();
+ }
+
+ public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false, true);
+ }
+
+ @Test(timeout = 30000)
+ public void testSimpleEchoWithBridgeAndVoidPromise() throws Throwable {
+ run();
+ }
+
+ public void testSimpleEchoWithBridgeAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true, true);
}
private static void testSimpleEcho0(
- ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge) throws Throwable {
+ ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge, boolean voidPromise)
+ throws Throwable {
final EchoHandler sh = new EchoHandler(maxInboundBufferSize);
final EchoHandler ch = new EchoHandler(maxInboundBufferSize);
@@ -123,7 +142,12 @@ public class SocketEchoTest extends AbstractSocketTest {
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
- cc.write(Unpooled.wrappedBuffer(data, i, length));
+ ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
+ if (voidPromise) {
+ assertEquals(cc.voidPromise(), cc.write(buf, cc.voidPromise()));
+ } else {
+ assertNotEquals(cc.voidPromise(), cc.write(buf));
+ }
i += length;
}
diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java
index 26289544e1..0a002c30ba 100644
--- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java
+++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFileRegionTest.java
@@ -19,10 +19,10 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.FileRegion;
import org.junit.Test;
import java.io.File;
@@ -48,7 +48,20 @@ public class SocketFileRegionTest extends AbstractSocketTest {
run();
}
+ @Test
+ public void testFileRegionVoidPromise() throws Throwable {
+ run();
+ }
+
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ testFileRegion0(sb, cb, false);
+ }
+
+ public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
+ testFileRegion0(sb, cb, true);
+ }
+
+ private void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable {
File file = File.createTempFile("netty-", ".tmp");
file.deleteOnExit();
@@ -75,9 +88,13 @@ public class SocketFileRegionTest extends AbstractSocketTest {
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
- ChannelFuture future = cc.sendFile(new DefaultFileRegion(new FileInputStream(file).getChannel(),
- 0L, file.length())).syncUninterruptibly();
- assertTrue(future.isSuccess());
+ FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(),
+ 0L, file.length());
+ if (voidPromise) {
+ assertEquals(cc.voidPromise(), cc.sendFile(region, cc.voidPromise()));
+ } else {
+ assertNotEquals(cc.voidPromise(), cc.sendFile(region));
+ }
while (sh.counter < data.length) {
if (sh.exception.get() != null) {
break;
diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java
index c4d36516ad..f6cb4c4e87 100644
--- a/transport/src/main/java/io/netty/channel/AbstractChannel.java
+++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java
@@ -92,6 +92,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ClosedChannelException closedChannelException;
private boolean inFlushNow;
private boolean flushNowPending;
+ private FlushTask flushTaskInProgress;
/** Cache for the string representation of this channel */
private boolean strValActive;
@@ -436,56 +437,124 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return strVal;
}
+ @Override
+ public final ChannelPromise voidPromise() {
+ return voidPromise;
+ }
+
/**
- * {@link Unsafe} implementation which sub-classes must extend and use.
+ * Task which will flush a {@link FileRegion}
*/
- protected abstract class AbstractUnsafe implements Unsafe {
+ protected final class FlushTask {
+ private final FileRegion region;
+ private final ChannelPromise promise;
+ private FlushTask next;
+ private final AbstractUnsafe unsafe;
- private final class FlushTask {
- final FileRegion region;
- final ChannelPromise promise;
- FlushTask next;
+ FlushTask(AbstractUnsafe unsafe, FileRegion region, ChannelPromise promise) {
+ this.region = region;
+ this.promise = promise;
+ this.unsafe = unsafe;
+ }
- FlushTask(FileRegion region, ChannelPromise promise) {
- this.region = region;
- this.promise = promise;
- promise.addListener(new ChannelFutureListener() {
+ /**
+ * Mark the task as success. Multiple calls if this will throw a {@link IllegalStateException}.
+ *
+ * This also will call {@link FileRegion#release()}.
+ */
+ public void setSuccess() {
+ if (eventLoop().inEventLoop()) {
+ promise.setSuccess();
+ complete();
+ } else {
+ eventLoop().execute(new Runnable() {
@Override
- public void operationComplete(ChannelFuture future) throws Exception {
- flushTaskInProgress = next;
- if (next != null) {
- try {
- FileRegion region = next.region;
- if (region == null) {
- // no region present means the next flush task was to directly flush
- // the outbound buffer
- flushNotifierAndFlush(next.promise);
- } else {
- // flush the region now
- doFlushFileRegion(region, next.promise);
- }
- } catch (Throwable cause) {
- next.promise.setFailure(cause);
- }
- } else {
- // notify the flush futures
- flushFutureNotifier.notifyFlushFutures();
- }
+ public void run() {
+ setSuccess();
}
});
}
}
+ /**
+ * Notify the task of progress in transfer of the {@link FileRegion}.
+ */
+ public void setProgress(long progress) {
+ if (promise instanceof ChannelProgressivePromise) {
+ ((ChannelProgressivePromise) promise).setProgress(progress, region.count());
+ }
+ }
+
+ /**
+ * Mark the task as failure. Multiple calls if this will throw a {@link IllegalStateException}.
+ *
+ * This also will call {@link FileRegion#release()}.
+ */
+ public void setFailure(final Throwable cause) {
+ if (eventLoop().inEventLoop()) {
+ promise.setFailure(cause);
+ complete();
+ } else {
+ eventLoop().execute(new Runnable() {
+ @Override
+ public void run() {
+ setFailure(cause);
+ }
+ });
+ }
+ }
+
+ /**
+ * Return the {@link FileRegion} which should be flushed
+ */
+ public FileRegion region() {
+ return region;
+ }
+
+ private void complete() {
+ region.release();
+ flushTaskInProgress = next;
+ if (next != null) {
+ try {
+ FileRegion region = next.region;
+ if (region == null) {
+ // no region present means the next flush task was to directly flush
+ // the outbound buffer
+ unsafe.flushNotifierAndFlush(next.promise);
+ } else {
+ // flush the region now
+ doFlushFileRegion(next);
+ }
+ } catch (Throwable cause) {
+ next.promise.setFailure(cause);
+ }
+ } else {
+ // notify the flush futures
+ flushFutureNotifier.notifyFlushFutures();
+ }
+ }
+ }
+
+ /**
+ * {@link Unsafe} implementation which sub-classes must extend and use.
+ */
+ protected abstract class AbstractUnsafe implements Unsafe {
+
+ private final Runnable beginReadTask = new Runnable() {
+ @Override
+ public void run() {
+ beginRead();
+ }
+ };
+
private final Runnable flushLaterTask = new Runnable() {
@Override
public void run() {
flushNowPending = false;
- flush(voidFuture());
+ flush(voidPromise());
}
};
- private FlushTask flushTaskInProgress;
-
@Override
public final void sendFile(final FileRegion region, final ChannelPromise promise) {
if (outboundBufSize() > 0) {
@@ -504,10 +573,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private void sendFile0(FileRegion region, ChannelPromise promise) {
FlushTask task = flushTaskInProgress;
if (task == null) {
- flushTaskInProgress = new FlushTask(region, promise);
+ flushTaskInProgress = task = new FlushTask(this, region, promise);
try {
// the first FileRegion to flush so trigger it now!
- doFlushFileRegion(region, promise);
+ doFlushFileRegion(task);
} catch (Throwable cause) {
region.release();
promise.setFailure(cause);
@@ -523,7 +592,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
task = next;
}
// there is something that needs to get flushed first so add it as next in the chain
- task.next = new FlushTask(region, promise);
+ task.next = new FlushTask(this, region, promise);
}
@Override
@@ -531,11 +600,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.head;
}
- @Override
- public final ChannelPromise voidFuture() {
- return voidPromise;
- }
-
@Override
public final SocketAddress localAddress() {
return localAddress0();
@@ -606,7 +670,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
- "Swallowing the cause of the registration failure:", t);
+ "Swallowing the cause of the registration failure:", t);
}
closeFuture.setClosed();
}
@@ -691,7 +755,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
});
}
- deregister(voidFuture());
+ deregister(voidPromise());
} else {
// Closed already.
promise.setSuccess();
@@ -757,7 +821,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
pipeline.fireExceptionCaught(e);
}
});
- close(unsafe().voidFuture());
+ close(voidPromise());
}
}
@@ -775,7 +839,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
task = t.next;
}
- task.next = new FlushTask(null, promise);
+ task.next = new FlushTask(this, null, promise);
}
}
@@ -815,7 +879,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) {
flushFutureNotifier.notifyFlushFutures(t);
if (t instanceof IOException) {
- close(voidFuture());
+ close(voidPromise());
}
}
} else {
@@ -864,7 +928,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} else {
flushFutureNotifier.notifyFlushFutures(cause);
if (cause instanceof IOException) {
- close(voidFuture());
+ close(voidPromise());
}
}
} finally {
@@ -886,7 +950,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (isOpen()) {
return;
}
- close(voidFuture());
+ close(voidPromise());
}
private void invokeLater(Runnable task) {
@@ -987,11 +1051,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
/**
- * Flush the content of the given {@link FileRegion} to the remote peer.
+ * Flush the content of the given {@link FlushTask} to the remote peer.
*
* Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException}
*/
- protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
+ protected void doFlushFileRegion(FlushTask task) throws Exception {
throw new UnsupportedOperationException();
}
@@ -1008,7 +1072,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected abstract boolean isFlushPending();
- private final class CloseFuture extends DefaultChannelPromise implements ChannelFuture.Unsafe {
+ final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) {
super(ch);
diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java
index 81572f7c37..fe34b38791 100755
--- a/transport/src/main/java/io/netty/channel/Channel.java
+++ b/transport/src/main/java/io/netty/channel/Channel.java
@@ -173,7 +173,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* following methods:
*
* - {@link #headContext()}
- * - {@link #voidFuture()}
* - {@link #localAddress()}
* - {@link #remoteAddress()}
* - {@link #closeForcibly()}
@@ -186,11 +185,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
*/
ChannelHandlerContext headContext();
- /**
- * Return a {@link VoidChannelPromise}. This method always return the same instance.
- */
- ChannelPromise voidFuture();
-
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.
diff --git a/transport/src/main/java/io/netty/channel/ChannelFuture.java b/transport/src/main/java/io/netty/channel/ChannelFuture.java
index 06339950ae..13308400fc 100644
--- a/transport/src/main/java/io/netty/channel/ChannelFuture.java
+++ b/transport/src/main/java/io/netty/channel/ChannelFuture.java
@@ -193,12 +193,4 @@ public interface ChannelFuture extends Future {
@Override
ChannelFuture awaitUninterruptibly();
-
- /**
- * A {@link ChannelFuture} which is not allowed to be sent to {@link ChannelPipeline} due to
- * implementation details.
- */
- interface Unsafe extends ChannelFuture {
- // Tag interface
- }
}
diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java
index 596265abc4..080bb3b4d1 100644
--- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java
+++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java
@@ -91,6 +91,7 @@ public final class ChannelHandlerUtil {
return;
}
+ boolean failed = false;
int processed = 0;
try {
if (!handler.beginFlush(ctx)) {
@@ -120,6 +121,7 @@ public final class ChannelHandlerUtil {
}
}
} catch (Throwable t) {
+ failed = true;
IncompleteFlushException pfe;
if (t instanceof IncompleteFlushException) {
pfe = (IncompleteFlushException) t;
@@ -138,24 +140,25 @@ public final class ChannelHandlerUtil {
try {
handler.endFlush(ctx);
+
} catch (Throwable t) {
- if (promise.isDone()) {
- logger.warn("endFlush() raised a masked exception due to failed flush().", t);
- } else {
- fail(ctx, promise, closeOnFailedFlush, t);
- }
+ failed = true;
+ fail(ctx, promise, closeOnFailedFlush, t);
}
- if (!promise.isDone()) {
+ if (!failed) {
ctx.flush(promise);
}
}
private static void fail(
ChannelHandlerContext ctx, ChannelPromise promise, boolean closeOnFailedFlush, Throwable cause) {
- promise.setFailure(cause);
- if (closeOnFailedFlush) {
- ctx.close();
+ if (promise.tryFailure(cause)) {
+ if (closeOnFailedFlush) {
+ ctx.close();
+ }
+ } else {
+ logger.warn("endFlush() raised a masked exception due to failed flush().", cause);
}
}
diff --git a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java
index 5059bf15de..2c5d622926 100644
--- a/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java
+++ b/transport/src/main/java/io/netty/channel/ChannelPropertyAccess.java
@@ -58,4 +58,22 @@ interface ChannelPropertyAccess {
* every call of blocking methods will just return without blocking.
*/
ChannelFuture newFailedFuture(Throwable cause);
+
+ /**
+ * Return a special ChannelPromise which can be reused for different operations.
+ *
+ * It's only supported to use
+ * it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)} ,
+ * {@link ChannelOutboundInvoker#flush(ChannelPromise)} and
+ * {@link ChannelOutboundInvoker#sendFile(FileRegion, ChannelPromise)}.
+ *
+ *
+ * Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used
+ * if you want to safe object allocation for every operation. You will not be able to detect if the operation
+ * was complete, only if it failed as the implementation will call
+ * {@link ChannelPipeline#fireExceptionCaught(Throwable)} in this case.
+ *
+ * Be aware this is an expert feature and should be used with care!
+ */
+ ChannelPromise voidPromise();
}
diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
index ac89678fe9..9d27172791 100755
--- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
+++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java
@@ -1141,7 +1141,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
- validateFuture(promise);
+ validateFuture(promise, false);
return findContextOutbound().invokeBind(localAddress, promise);
}
@@ -1182,7 +1182,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
- validateFuture(promise);
+ validateFuture(promise, false);
return findContextOutbound().invokeConnect(remoteAddress, localAddress, promise);
}
@@ -1217,7 +1217,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
- validateFuture(promise);
+ validateFuture(promise, false);
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
@@ -1258,7 +1258,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture close(ChannelPromise promise) {
- validateFuture(promise);
+ validateFuture(promise, false);
return findContextOutbound().invokeClose(promise);
}
@@ -1292,7 +1292,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture deregister(ChannelPromise promise) {
- validateFuture(promise);
+ validateFuture(promise, false);
return findContextOutbound().invokeDeregister(promise);
}
@@ -1361,7 +1361,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture flush(final ChannelPromise promise) {
- validateFuture(promise);
+ validateFuture(promise, true);
EventExecutor executor = executor();
Thread currentThread = Thread.currentThread();
@@ -1453,7 +1453,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (region == null) {
throw new NullPointerException("region");
}
- validateFuture(promise);
+ validateFuture(promise, true);
return findContextOutbound().invokeSendFile(region, promise);
}
@@ -1499,7 +1499,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (message == null) {
throw new NullPointerException("message");
}
- validateFuture(promise);
+ validateFuture(promise, true);
DefaultChannelHandlerContext ctx = prev;
EventExecutor executor;
@@ -1631,7 +1631,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return new FailedChannelFuture(channel(), executor(), cause);
}
- private void validateFuture(ChannelFuture future) {
+ private void validateFuture(ChannelFuture future, boolean allowUnsafe) {
if (future == null) {
throw new NullPointerException("future");
}
@@ -1642,8 +1642,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (future.isDone()) {
throw new IllegalArgumentException("future already done");
}
- if (future instanceof ChannelFuture.Unsafe) {
- throw new IllegalArgumentException("internal use only future not allowed");
+ if (!allowUnsafe && future instanceof VoidChannelPromise) {
+ throw new IllegalArgumentException("VoidChannelPromise not allowed for this operation");
+ }
+ if (future instanceof AbstractChannel.CloseFuture) {
+ throw new IllegalArgumentException("AbstractChannel.CloseFuture may not send through the pipeline");
}
}
@@ -1811,4 +1814,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return bridge;
}
}
+
+ @Override
+ public ChannelPromise voidPromise() {
+ return channel.voidPromise();
+ }
}
diff --git a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java
index b2aec5fb66..63fccea25e 100644
--- a/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java
+++ b/transport/src/main/java/io/netty/channel/ThreadPerChannelEventLoop.java
@@ -57,7 +57,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
Channel ch = this.ch;
if (isShuttingDown()) {
if (ch != null) {
- ch.unsafe().close(ch.unsafe().voidFuture());
+ ch.unsafe().close(ch.voidPromise());
}
if (confirmShutdown()) {
break;
diff --git a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java
index b01aee9450..b4cc69e488 100644
--- a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java
+++ b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java
@@ -21,7 +21,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.TimeUnit;
-final class VoidChannelPromise extends AbstractFuture implements ChannelFuture.Unsafe, ChannelPromise {
+final class VoidChannelPromise extends AbstractFuture implements ChannelPromise {
private final Channel channel;
@@ -38,31 +38,31 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelFu
}
@Override
- public ChannelPromise addListener(GenericFutureListener extends Future> listener) {
+ public VoidChannelPromise addListener(GenericFutureListener extends Future> listener) {
fail();
return this;
}
@Override
- public ChannelPromise addListeners(GenericFutureListener extends Future>... listeners) {
+ public VoidChannelPromise addListeners(GenericFutureListener extends Future>... listeners) {
fail();
return this;
}
@Override
- public ChannelPromise removeListener(GenericFutureListener extends Future> listener) {
+ public VoidChannelPromise removeListener(GenericFutureListener extends Future> listener) {
// NOOP
return this;
}
@Override
- public ChannelPromise removeListeners(GenericFutureListener extends Future>... listeners) {
+ public VoidChannelPromise removeListeners(GenericFutureListener extends Future>... listeners) {
// NOOP
return this;
}
@Override
- public ChannelPromise await() throws InterruptedException {
+ public VoidChannelPromise await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
@@ -82,7 +82,7 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelFu
}
@Override
- public ChannelPromise awaitUninterruptibly() {
+ public VoidChannelPromise awaitUninterruptibly() {
fail();
return this;
}
@@ -120,28 +120,30 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelFu
}
@Override
- public ChannelPromise sync() {
+ public VoidChannelPromise sync() {
fail();
return this;
}
@Override
- public ChannelPromise syncUninterruptibly() {
+ public VoidChannelPromise syncUninterruptibly() {
fail();
return this;
}
@Override
- public ChannelPromise setFailure(Throwable cause) {
+ public VoidChannelPromise setFailure(Throwable cause) {
+ channel.pipeline().fireExceptionCaught(cause);
return this;
}
@Override
- public ChannelPromise setSuccess() {
+ public VoidChannelPromise setSuccess() {
return this;
}
@Override
public boolean tryFailure(Throwable cause) {
+ channel.pipeline().fireExceptionCaught(cause);
return false;
}
@@ -155,7 +157,7 @@ final class VoidChannelPromise extends AbstractFuture implements ChannelFu
}
@Override
- public ChannelPromise setSuccess(Void result) {
+ public VoidChannelPromise setSuccess(Void result) {
return this;
}
diff --git a/transport/src/main/java/io/netty/channel/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/aio/AbstractAioChannel.java
index b7021247aa..5802545aac 100755
--- a/transport/src/main/java/io/netty/channel/aio/AbstractAioChannel.java
+++ b/transport/src/main/java/io/netty/channel/aio/AbstractAioChannel.java
@@ -117,7 +117,7 @@ public abstract class AbstractAioChannel extends AbstractChannel {
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectFuture != null && connectFuture.tryFailure(cause)) {
- close(voidFuture());
+ close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
diff --git a/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java b/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java
index a371953be7..4f116c20c1 100644
--- a/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java
+++ b/transport/src/main/java/io/netty/channel/aio/AioEventLoop.java
@@ -97,7 +97,7 @@ final class AioEventLoop extends SingleThreadEventLoop {
}
for (Channel ch: channels) {
- ch.unsafe().close(ch.unsafe().voidFuture());
+ ch.unsafe().close(ch.voidPromise());
}
}
diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java
index 6f4e87b802..7142304223 100755
--- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java
+++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java
@@ -47,7 +47,7 @@ public class LocalChannel extends AbstractChannel {
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
- unsafe().close(unsafe().voidFuture());
+ unsafe().close(voidPromise());
}
};
@@ -198,7 +198,7 @@ public class LocalChannel extends AbstractChannel {
protected void doClose() throws Exception {
LocalChannel peer = this.peer;
if (peer != null && peer.isActive()) {
- peer.unsafe().close(peer.unsafe().voidFuture());
+ peer.unsafe().close(voidPromise());
this.peer = null;
}
}
@@ -206,7 +206,7 @@ public class LocalChannel extends AbstractChannel {
@Override
protected Runnable doDeregister() throws Exception {
if (isOpen()) {
- unsafe().close(unsafe().voidFuture());
+ unsafe().close(voidPromise());
}
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
return null;
@@ -307,7 +307,7 @@ public class LocalChannel extends AbstractChannel {
} catch (Throwable t) {
promise.setFailure(t);
pipeline().fireExceptionCaught(t);
- close(voidFuture());
+ close(voidPromise());
return;
}
}
@@ -316,7 +316,7 @@ public class LocalChannel extends AbstractChannel {
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause = new ChannelException("connection refused");
promise.setFailure(cause);
- close(voidFuture());
+ close(voidPromise());
return;
}
diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
index 2ff36af2d9..c0c36ba09a 100755
--- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
+++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java
@@ -36,7 +36,7 @@ public class LocalServerChannel extends AbstractServerChannel {
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
- unsafe().close(unsafe().voidFuture());
+ unsafe().close(voidPromise());
}
};
diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
index 2d333432b7..a9518c85b6 100755
--- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
+++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java
@@ -19,9 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelProgressivePromise;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.FileRegion;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import java.io.IOException;
@@ -123,7 +120,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
- close(voidFuture());
+ close(voidPromise());
}
}
} else if (!firedChannelReadSuspended) {
@@ -149,9 +146,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
@Override
- protected void doFlushFileRegion(final FileRegion region, final ChannelPromise promise) throws Exception {
+ protected void doFlushFileRegion(final FlushTask task) throws Exception {
if (javaChannel() instanceof WritableByteChannel) {
- TransferTask transferTask = new TransferTask(region, (WritableByteChannel) javaChannel(), promise);
+ TransferTask transferTask = new TransferTask(task, (WritableByteChannel) javaChannel());
transferTask.transfer();
} else {
throw new UnsupportedOperationException("Underlying Channel is not of instance "
@@ -161,44 +158,39 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private final class TransferTask implements NioTask {
private long writtenBytes;
- private final FileRegion region;
+ private final FlushTask task;
private final WritableByteChannel wch;
- private final ChannelPromise promise;
- TransferTask(FileRegion region, WritableByteChannel wch, ChannelPromise promise) {
- this.region = region;
+ TransferTask(FlushTask task, WritableByteChannel wch) {
+ this.task = task;
this.wch = wch;
- this.promise = promise;
}
void transfer() {
try {
for (;;) {
- long localWrittenBytes = region.transferTo(wch, writtenBytes);
+ long localWrittenBytes = task.region().transferTo(wch, writtenBytes);
if (localWrittenBytes == 0) {
// reschedule for write once the channel is writable again
eventLoop().executeWhenWritable(
AbstractNioByteChannel.this, this);
return;
} else if (localWrittenBytes == -1) {
- checkEOF(region, writtenBytes);
- promise.setSuccess();
+ checkEOF(task.region(), writtenBytes);
+ task.setSuccess();
return;
} else {
writtenBytes += localWrittenBytes;
- if (promise instanceof ChannelProgressivePromise) {
- ((ChannelProgressivePromise) promise).setProgress(writtenBytes, region.count());
- }
- if (writtenBytes >= region.count()) {
- region.release();
- promise.setSuccess();
+ task.setProgress(writtenBytes);
+
+ if (writtenBytes >= task.region().count()) {
+ task.setSuccess();
return;
}
}
}
} catch (Throwable cause) {
- region.release();
- promise.setFailure(cause);
+ task.setFailure(cause);
}
}
@@ -210,16 +202,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public void channelUnregistered(SelectableChannel ch, Throwable cause) throws Exception {
if (cause != null) {
- promise.setFailure(cause);
+ task.setFailure(cause);
return;
}
- if (writtenBytes < region.count()) {
- region.release();
+ if (writtenBytes < task.region().count()) {
if (!isOpen()) {
- promise.setFailure(new ClosedChannelException());
+ task.setFailure(new ClosedChannelException());
} else {
- promise.setFailure(new IllegalStateException(
+ task.setFailure(new IllegalStateException(
"Channel was unregistered before the region could be fully written"));
}
}
diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
index f914b8189a..cf8f0896b5 100755
--- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
+++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java
@@ -186,7 +186,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
- close(voidFuture());
+ close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
index 7afe047214..97f109d082 100755
--- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
+++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java
@@ -87,7 +87,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
- close(voidFuture());
+ close(voidPromise());
} else if (!firedChannelReadSuspended) {
pipeline.fireChannelReadSuspended();
}
diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
index 66551d556a..24188b5521 100644
--- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
+++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java
@@ -243,7 +243,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
- ch.unsafe().close(ch.unsafe().voidFuture());
+ ch.unsafe().close(ch.voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask task = (NioTask) a;
@@ -418,7 +418,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
- unsafe.close(unsafe.voidFuture());
+ unsafe.close(ch.voidPromise());
return;
}
@@ -448,7 +448,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
if (readyOps != -1 && (readyOps & SelectionKey.OP_WRITE) != 0) {
unregisterWritableTasks(ch);
}
- unsafe.close(unsafe.voidFuture());
+ unsafe.close(ch.voidPromise());
}
}
@@ -518,7 +518,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
for (AbstractNioChannel ch: channels) {
unregisterWritableTasks(ch);
- ch.unsafe().close(ch.unsafe().voidFuture());
+ ch.unsafe().close(ch.voidPromise());
}
}
diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java
index 933a657b24..d0f855ed43 100755
--- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java
+++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java
@@ -128,7 +128,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
firedInboundBufferSuspeneded = true;
pipeline.fireChannelReadSuspended();
pipeline.fireExceptionCaught(t);
- unsafe().close(unsafe().voidFuture());
+ unsafe().close(voidPromise());
}
} finally {
if (read) {
@@ -140,7 +140,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
- unsafe().close(unsafe().voidFuture());
+ unsafe().close(voidPromise());
}
}
} else if (!firedInboundBufferSuspeneded) {
diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java
index f231f46871..3826f80460 100755
--- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java
+++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java
@@ -56,7 +56,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
pipeline.fireChannelReadSuspended();
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
- unsafe().close(unsafe().voidFuture());
+ unsafe().close(voidPromise());
}
} finally {
if (read) {
@@ -66,7 +66,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
pipeline.fireChannelReadSuspended();
}
if (closed && isOpen()) {
- unsafe().close(unsafe().voidFuture());
+ unsafe().close(voidPromise());
}
}
}
diff --git a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java
index c2b252d6b8..b71f900348 100644
--- a/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java
+++ b/transport/src/main/java/io/netty/channel/oio/OioByteStreamChannel.java
@@ -17,9 +17,6 @@ package io.netty.channel.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelProgressivePromise;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.FileRegion;
import java.io.IOException;
import java.io.InputStream;
@@ -98,7 +95,7 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
}
@Override
- protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
+ protected void doFlushFileRegion(FlushTask task) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
@@ -109,20 +106,17 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
long written = 0;
for (;;) {
- long localWritten = region.transferTo(outChannel, written);
+ long localWritten = task.region().transferTo(outChannel, written);
if (localWritten == -1) {
- checkEOF(region, written);
- region.release();
- promise.setSuccess();
+ checkEOF(task.region(), written);
+ task.setSuccess();
return;
}
written += localWritten;
- if (promise instanceof ChannelProgressivePromise) {
- final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
- pp.setProgress(written, region.count());
- }
- if (written >= region.count()) {
- promise.setSuccess();
+ task.setProgress(written);
+
+ if (written >= task.region().count()) {
+ task.setSuccess();
return;
}
}
diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
index 28082d89ca..c954eaf176 100755
--- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
+++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java
@@ -23,10 +23,8 @@ import io.netty.channel.ChannelFlushPromiseNotifier;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
-import io.netty.channel.FileRegion;
import io.netty.channel.aio.AbstractAioChannel;
import io.netty.channel.aio.AioCompletionHandler;
import io.netty.channel.aio.AioEventLoopGroup;
@@ -315,8 +313,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
@Override
- protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
- region.transferTo(new WritableByteChannelAdapter(region, promise), 0);
+ protected void doFlushFileRegion(FlushTask task) throws Exception {
+ task.region().transferTo(new WritableByteChannelAdapter(task), 0);
}
@Override
@@ -427,7 +425,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (cause instanceof InterruptedByTimeoutException) {
- channel.unsafe().close(channel.unsafe().voidFuture());
+ channel.unsafe().close(channel.voidPromise());
}
}
}
@@ -488,7 +486,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (channel.config().isAllowHalfClosure()) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
- channel.unsafe().close(channel.unsafe().voidFuture());
+ channel.unsafe().close(channel.voidPromise());
}
}
} else if (!firedChannelReadSuspended) {
@@ -512,7 +510,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
- channel.unsafe().close(channel.unsafe().voidFuture());
+ channel.unsafe().close(channel.voidPromise());
}
}
}
@@ -536,13 +534,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
private final class WritableByteChannelAdapter implements WritableByteChannel {
- private final FileRegion region;
- private final ChannelPromise promise;
+ private final FlushTask task;
private long written;
- public WritableByteChannelAdapter(FileRegion region, ChannelPromise promise) {
- this.region = region;
- this.promise = promise;
+ public WritableByteChannelAdapter(FlushTask task) {
+ this.task = task;
}
@Override
@@ -557,36 +553,31 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return;
}
if (result == -1) {
- checkEOF(region, written);
- promise.setSuccess();
+ checkEOF(task.region(), written);
+ task.setSuccess();
return;
}
written += result;
- if (promise instanceof ChannelProgressivePromise) {
- ((ChannelProgressivePromise) promise).setProgress(written, region.count());
- }
+ task.setProgress(written);
- if (written >= region.count()) {
- region.release();
- promise.setSuccess();
+ if (written >= task.region().count()) {
+ task.setSuccess();
return;
}
if (src.hasRemaining()) {
javaChannel().write(src, AioSocketChannel.this, this);
} else {
- region.transferTo(WritableByteChannelAdapter.this, written);
+ task.region().transferTo(WritableByteChannelAdapter.this, written);
}
} catch (Throwable cause) {
- region.release();
- promise.setFailure(cause);
+ task.setFailure(cause);
}
}
@Override
public void failed0(Throwable exc, Channel attachment) {
- region.release();
- promise.setFailure(exc);
+ task.setFailure(exc);
}
});
return 0;