[#1317] Allow to use VoidPromise for flush(...), write(...) and sendFile(...)

* This also move rename Channel.Unsafe.voidFuture() to ChannelPropertyAccess.voidPromise()
This commit is contained in:
Norman Maurer 2013-05-15 15:10:41 +02:00
parent fd1d31e7d8
commit 699ef0784e
22 changed files with 290 additions and 192 deletions

View File

@ -64,7 +64,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false);
testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false, false);
}
@Test(timeout = 30000)
@ -73,7 +73,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithBridge(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true);
testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true, false);
}
@Test(timeout = 30000)
@ -82,7 +82,7 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, 32, false);
testSimpleEcho0(sb, cb, 32, false, false);
}
@Test(timeout = 30000)
@ -91,11 +91,30 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEchoWithBridgedBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, 32, true);
testSimpleEcho0(sb, cb, 32, true, false);
}
@Test(timeout = 30000)
public void testSimpleEchoWithVoidPromise() throws Throwable {
run();
}
public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, Integer.MAX_VALUE, false, true);
}
@Test(timeout = 30000)
public void testSimpleEchoWithBridgeAndVoidPromise() throws Throwable {
run();
}
public void testSimpleEchoWithBridgeAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, Integer.MAX_VALUE, true, true);
}
private static void testSimpleEcho0(
ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge) throws Throwable {
ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize, boolean bridge, boolean voidPromise)
throws Throwable {
final EchoHandler sh = new EchoHandler(maxInboundBufferSize);
final EchoHandler ch = new EchoHandler(maxInboundBufferSize);
@ -123,7 +142,12 @@ public class SocketEchoTest extends AbstractSocketTest {
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
cc.write(Unpooled.wrappedBuffer(data, i, length));
ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
if (voidPromise) {
assertEquals(cc.voidPromise(), cc.write(buf, cc.voidPromise()));
} else {
assertNotEquals(cc.voidPromise(), cc.write(buf));
}
i += length;
}

View File

@ -19,10 +19,10 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.FileRegion;
import org.junit.Test;
import java.io.File;
@ -48,7 +48,20 @@ public class SocketFileRegionTest extends AbstractSocketTest {
run();
}
@Test
public void testFileRegionVoidPromise() throws Throwable {
run();
}
public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, false);
}
public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testFileRegion0(sb, cb, true);
}
private void testFileRegion0(ServerBootstrap sb, Bootstrap cb, boolean voidPromise) throws Throwable {
File file = File.createTempFile("netty-", ".tmp");
file.deleteOnExit();
@ -75,9 +88,13 @@ public class SocketFileRegionTest extends AbstractSocketTest {
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
ChannelFuture future = cc.sendFile(new DefaultFileRegion(new FileInputStream(file).getChannel(),
0L, file.length())).syncUninterruptibly();
assertTrue(future.isSuccess());
FileRegion region = new DefaultFileRegion(new FileInputStream(file).getChannel(),
0L, file.length());
if (voidPromise) {
assertEquals(cc.voidPromise(), cc.sendFile(region, cc.voidPromise()));
} else {
assertNotEquals(cc.voidPromise(), cc.sendFile(region));
}
while (sh.counter < data.length) {
if (sh.exception.get() != null) {
break;

View File

@ -92,6 +92,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ClosedChannelException closedChannelException;
private boolean inFlushNow;
private boolean flushNowPending;
private FlushTask flushTaskInProgress;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -436,56 +437,124 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return strVal;
}
@Override
public final ChannelPromise voidPromise() {
return voidPromise;
}
/**
* {@link Unsafe} implementation which sub-classes must extend and use.
* Task which will flush a {@link FileRegion}
*/
protected abstract class AbstractUnsafe implements Unsafe {
protected final class FlushTask {
private final FileRegion region;
private final ChannelPromise promise;
private FlushTask next;
private final AbstractUnsafe unsafe;
private final class FlushTask {
final FileRegion region;
final ChannelPromise promise;
FlushTask next;
FlushTask(AbstractUnsafe unsafe, FileRegion region, ChannelPromise promise) {
this.region = region;
this.promise = promise;
this.unsafe = unsafe;
}
FlushTask(FileRegion region, ChannelPromise promise) {
this.region = region;
this.promise = promise;
promise.addListener(new ChannelFutureListener() {
/**
* Mark the task as success. Multiple calls if this will throw a {@link IllegalStateException}.
*
* This also will call {@link FileRegion#release()}.
*/
public void setSuccess() {
if (eventLoop().inEventLoop()) {
promise.setSuccess();
complete();
} else {
eventLoop().execute(new Runnable() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
flushTaskInProgress = next;
if (next != null) {
try {
FileRegion region = next.region;
if (region == null) {
// no region present means the next flush task was to directly flush
// the outbound buffer
flushNotifierAndFlush(next.promise);
} else {
// flush the region now
doFlushFileRegion(region, next.promise);
}
} catch (Throwable cause) {
next.promise.setFailure(cause);
}
} else {
// notify the flush futures
flushFutureNotifier.notifyFlushFutures();
}
public void run() {
setSuccess();
}
});
}
}
/**
* Notify the task of progress in transfer of the {@link FileRegion}.
*/
public void setProgress(long progress) {
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).setProgress(progress, region.count());
}
}
/**
* Mark the task as failure. Multiple calls if this will throw a {@link IllegalStateException}.
*
* This also will call {@link FileRegion#release()}.
*/
public void setFailure(final Throwable cause) {
if (eventLoop().inEventLoop()) {
promise.setFailure(cause);
complete();
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
setFailure(cause);
}
});
}
}
/**
* Return the {@link FileRegion} which should be flushed
*/
public FileRegion region() {
return region;
}
private void complete() {
region.release();
flushTaskInProgress = next;
if (next != null) {
try {
FileRegion region = next.region;
if (region == null) {
// no region present means the next flush task was to directly flush
// the outbound buffer
unsafe.flushNotifierAndFlush(next.promise);
} else {
// flush the region now
doFlushFileRegion(next);
}
} catch (Throwable cause) {
next.promise.setFailure(cause);
}
} else {
// notify the flush futures
flushFutureNotifier.notifyFlushFutures();
}
}
}
/**
* {@link Unsafe} implementation which sub-classes must extend and use.
*/
protected abstract class AbstractUnsafe implements Unsafe {
private final Runnable beginReadTask = new Runnable() {
@Override
public void run() {
beginRead();
}
};
private final Runnable flushLaterTask = new Runnable() {
@Override
public void run() {
flushNowPending = false;
flush(voidFuture());
flush(voidPromise());
}
};
private FlushTask flushTaskInProgress;
@Override
public final void sendFile(final FileRegion region, final ChannelPromise promise) {
if (outboundBufSize() > 0) {
@ -504,10 +573,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private void sendFile0(FileRegion region, ChannelPromise promise) {
FlushTask task = flushTaskInProgress;
if (task == null) {
flushTaskInProgress = new FlushTask(region, promise);
flushTaskInProgress = task = new FlushTask(this, region, promise);
try {
// the first FileRegion to flush so trigger it now!
doFlushFileRegion(region, promise);
doFlushFileRegion(task);
} catch (Throwable cause) {
region.release();
promise.setFailure(cause);
@ -523,7 +592,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
task = next;
}
// there is something that needs to get flushed first so add it as next in the chain
task.next = new FlushTask(region, promise);
task.next = new FlushTask(this, region, promise);
}
@Override
@ -531,11 +600,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.head;
}
@Override
public final ChannelPromise voidFuture() {
return voidPromise;
}
@Override
public final SocketAddress localAddress() {
return localAddress0();
@ -606,7 +670,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
"Swallowing the cause of the registration failure:", t);
}
closeFuture.setClosed();
}
@ -691,7 +755,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
});
}
deregister(voidFuture());
deregister(voidPromise());
} else {
// Closed already.
promise.setSuccess();
@ -757,7 +821,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
pipeline.fireExceptionCaught(e);
}
});
close(unsafe().voidFuture());
close(voidPromise());
}
}
@ -775,7 +839,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
task = t.next;
}
task.next = new FlushTask(null, promise);
task.next = new FlushTask(this, null, promise);
}
}
@ -815,7 +879,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) {
flushFutureNotifier.notifyFlushFutures(t);
if (t instanceof IOException) {
close(voidFuture());
close(voidPromise());
}
}
} else {
@ -864,7 +928,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} else {
flushFutureNotifier.notifyFlushFutures(cause);
if (cause instanceof IOException) {
close(voidFuture());
close(voidPromise());
}
}
} finally {
@ -886,7 +950,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (isOpen()) {
return;
}
close(voidFuture());
close(voidPromise());
}
private void invokeLater(Runnable task) {
@ -987,11 +1051,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
/**
* Flush the content of the given {@link FileRegion} to the remote peer.
* Flush the content of the given {@link FlushTask} to the remote peer.
*
* Sub-classes may override this as this implementation will just thrown an {@link UnsupportedOperationException}
*/
protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
protected void doFlushFileRegion(FlushTask task) throws Exception {
throw new UnsupportedOperationException();
}
@ -1008,7 +1072,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
*/
protected abstract boolean isFlushPending();
private final class CloseFuture extends DefaultChannelPromise implements ChannelFuture.Unsafe {
final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) {
super(ch);

View File

@ -173,7 +173,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* following methods:
* <ul>
* <li>{@link #headContext()}</li>
* <li>{@link #voidFuture()}</li>
* <li>{@link #localAddress()}</li>
* <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li>
@ -186,11 +185,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
*/
ChannelHandlerContext headContext();
/**
* Return a {@link VoidChannelPromise}. This method always return the same instance.
*/
ChannelPromise voidFuture();
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.

View File

@ -193,12 +193,4 @@ public interface ChannelFuture extends Future<Void> {
@Override
ChannelFuture awaitUninterruptibly();
/**
* A {@link ChannelFuture} which is not allowed to be sent to {@link ChannelPipeline} due to
* implementation details.
*/
interface Unsafe extends ChannelFuture {
// Tag interface
}
}

View File

@ -91,6 +91,7 @@ public final class ChannelHandlerUtil {
return;
}
boolean failed = false;
int processed = 0;
try {
if (!handler.beginFlush(ctx)) {
@ -120,6 +121,7 @@ public final class ChannelHandlerUtil {
}
}
} catch (Throwable t) {
failed = true;
IncompleteFlushException pfe;
if (t instanceof IncompleteFlushException) {
pfe = (IncompleteFlushException) t;
@ -138,24 +140,25 @@ public final class ChannelHandlerUtil {
try {
handler.endFlush(ctx);
} catch (Throwable t) {
if (promise.isDone()) {
logger.warn("endFlush() raised a masked exception due to failed flush().", t);
} else {
fail(ctx, promise, closeOnFailedFlush, t);
}
failed = true;
fail(ctx, promise, closeOnFailedFlush, t);
}
if (!promise.isDone()) {
if (!failed) {
ctx.flush(promise);
}
}
private static void fail(
ChannelHandlerContext ctx, ChannelPromise promise, boolean closeOnFailedFlush, Throwable cause) {
promise.setFailure(cause);
if (closeOnFailedFlush) {
ctx.close();
if (promise.tryFailure(cause)) {
if (closeOnFailedFlush) {
ctx.close();
}
} else {
logger.warn("endFlush() raised a masked exception due to failed flush().", cause);
}
}

View File

@ -58,4 +58,22 @@ interface ChannelPropertyAccess {
* every call of blocking methods will just return without blocking.
*/
ChannelFuture newFailedFuture(Throwable cause);
/**
* Return a special ChannelPromise which can be reused for different operations.
* <p>
* It's only supported to use
* it for {@link ChannelOutboundInvoker#write(Object, ChannelPromise)} ,
* {@link ChannelOutboundInvoker#flush(ChannelPromise)} and
* {@link ChannelOutboundInvoker#sendFile(FileRegion, ChannelPromise)}.
* </p>
* <p>
* Be aware that the returned {@link ChannelPromise} will not support most operations and should only be used
* if you want to safe object allocation for every operation. You will not be able to detect if the operation
* was complete, only if it failed as the implementation will call
* {@link ChannelPipeline#fireExceptionCaught(Throwable)} in this case.
* </p>
* <strong>Be aware this is an expert feature and should be used with care!</strong>
*/
ChannelPromise voidPromise();
}

View File

@ -1141,7 +1141,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validateFuture(promise);
validateFuture(promise, false);
return findContextOutbound().invokeBind(localAddress, promise);
}
@ -1182,7 +1182,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validateFuture(promise);
validateFuture(promise, false);
return findContextOutbound().invokeConnect(remoteAddress, localAddress, promise);
}
@ -1217,7 +1217,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
validateFuture(promise);
validateFuture(promise, false);
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
// So far, UDP/IP is the only transport that has such behavior.
@ -1258,7 +1258,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture close(ChannelPromise promise) {
validateFuture(promise);
validateFuture(promise, false);
return findContextOutbound().invokeClose(promise);
}
@ -1292,7 +1292,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture deregister(ChannelPromise promise) {
validateFuture(promise);
validateFuture(promise, false);
return findContextOutbound().invokeDeregister(promise);
}
@ -1361,7 +1361,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelFuture flush(final ChannelPromise promise) {
validateFuture(promise);
validateFuture(promise, true);
EventExecutor executor = executor();
Thread currentThread = Thread.currentThread();
@ -1453,7 +1453,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (region == null) {
throw new NullPointerException("region");
}
validateFuture(promise);
validateFuture(promise, true);
return findContextOutbound().invokeSendFile(region, promise);
}
@ -1499,7 +1499,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (message == null) {
throw new NullPointerException("message");
}
validateFuture(promise);
validateFuture(promise, true);
DefaultChannelHandlerContext ctx = prev;
EventExecutor executor;
@ -1631,7 +1631,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return new FailedChannelFuture(channel(), executor(), cause);
}
private void validateFuture(ChannelFuture future) {
private void validateFuture(ChannelFuture future, boolean allowUnsafe) {
if (future == null) {
throw new NullPointerException("future");
}
@ -1642,8 +1642,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (future.isDone()) {
throw new IllegalArgumentException("future already done");
}
if (future instanceof ChannelFuture.Unsafe) {
throw new IllegalArgumentException("internal use only future not allowed");
if (!allowUnsafe && future instanceof VoidChannelPromise) {
throw new IllegalArgumentException("VoidChannelPromise not allowed for this operation");
}
if (future instanceof AbstractChannel.CloseFuture) {
throw new IllegalArgumentException("AbstractChannel.CloseFuture may not send through the pipeline");
}
}
@ -1811,4 +1814,9 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
return bridge;
}
}
@Override
public ChannelPromise voidPromise() {
return channel.voidPromise();
}
}

View File

@ -57,7 +57,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
Channel ch = this.ch;
if (isShuttingDown()) {
if (ch != null) {
ch.unsafe().close(ch.unsafe().voidFuture());
ch.unsafe().close(ch.voidPromise());
}
if (confirmShutdown()) {
break;

View File

@ -21,7 +21,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.TimeUnit;
final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelFuture.Unsafe, ChannelPromise {
final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise {
private final Channel channel;
@ -38,31 +38,31 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelFu
}
@Override
public ChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<Void>> listener) {
fail();
return this;
}
@Override
public ChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public VoidChannelPromise addListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
fail();
return this;
}
@Override
public ChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
public VoidChannelPromise removeListener(GenericFutureListener<? extends Future<Void>> listener) {
// NOOP
return this;
}
@Override
public ChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
public VoidChannelPromise removeListeners(GenericFutureListener<? extends Future<Void>>... listeners) {
// NOOP
return this;
}
@Override
public ChannelPromise await() throws InterruptedException {
public VoidChannelPromise await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
@ -82,7 +82,7 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelFu
}
@Override
public ChannelPromise awaitUninterruptibly() {
public VoidChannelPromise awaitUninterruptibly() {
fail();
return this;
}
@ -120,28 +120,30 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelFu
}
@Override
public ChannelPromise sync() {
public VoidChannelPromise sync() {
fail();
return this;
}
@Override
public ChannelPromise syncUninterruptibly() {
public VoidChannelPromise syncUninterruptibly() {
fail();
return this;
}
@Override
public ChannelPromise setFailure(Throwable cause) {
public VoidChannelPromise setFailure(Throwable cause) {
channel.pipeline().fireExceptionCaught(cause);
return this;
}
@Override
public ChannelPromise setSuccess() {
public VoidChannelPromise setSuccess() {
return this;
}
@Override
public boolean tryFailure(Throwable cause) {
channel.pipeline().fireExceptionCaught(cause);
return false;
}
@ -155,7 +157,7 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelFu
}
@Override
public ChannelPromise setSuccess(Void result) {
public VoidChannelPromise setSuccess(Void result) {
return this;
}

View File

@ -117,7 +117,7 @@ public abstract class AbstractAioChannel extends AbstractChannel {
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectFuture != null && connectFuture.tryFailure(cause)) {
close(voidFuture());
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);

View File

@ -97,7 +97,7 @@ final class AioEventLoop extends SingleThreadEventLoop {
}
for (Channel ch: channels) {
ch.unsafe().close(ch.unsafe().voidFuture());
ch.unsafe().close(ch.voidPromise());
}
}

View File

@ -47,7 +47,7 @@ public class LocalChannel extends AbstractChannel {
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
unsafe().close(unsafe().voidFuture());
unsafe().close(voidPromise());
}
};
@ -198,7 +198,7 @@ public class LocalChannel extends AbstractChannel {
protected void doClose() throws Exception {
LocalChannel peer = this.peer;
if (peer != null && peer.isActive()) {
peer.unsafe().close(peer.unsafe().voidFuture());
peer.unsafe().close(voidPromise());
this.peer = null;
}
}
@ -206,7 +206,7 @@ public class LocalChannel extends AbstractChannel {
@Override
protected Runnable doDeregister() throws Exception {
if (isOpen()) {
unsafe().close(unsafe().voidFuture());
unsafe().close(voidPromise());
}
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
return null;
@ -307,7 +307,7 @@ public class LocalChannel extends AbstractChannel {
} catch (Throwable t) {
promise.setFailure(t);
pipeline().fireExceptionCaught(t);
close(voidFuture());
close(voidPromise());
return;
}
}
@ -316,7 +316,7 @@ public class LocalChannel extends AbstractChannel {
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause = new ChannelException("connection refused");
promise.setFailure(cause);
close(voidFuture());
close(voidPromise());
return;
}

View File

@ -36,7 +36,7 @@ public class LocalServerChannel extends AbstractServerChannel {
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
unsafe().close(unsafe().voidFuture());
unsafe().close(voidPromise());
}
};

View File

@ -19,9 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import java.io.IOException;
@ -123,7 +120,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidFuture());
close(voidPromise());
}
}
} else if (!firedChannelReadSuspended) {
@ -149,9 +146,9 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
@Override
protected void doFlushFileRegion(final FileRegion region, final ChannelPromise promise) throws Exception {
protected void doFlushFileRegion(final FlushTask task) throws Exception {
if (javaChannel() instanceof WritableByteChannel) {
TransferTask transferTask = new TransferTask(region, (WritableByteChannel) javaChannel(), promise);
TransferTask transferTask = new TransferTask(task, (WritableByteChannel) javaChannel());
transferTask.transfer();
} else {
throw new UnsupportedOperationException("Underlying Channel is not of instance "
@ -161,44 +158,39 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private final class TransferTask implements NioTask<SelectableChannel> {
private long writtenBytes;
private final FileRegion region;
private final FlushTask task;
private final WritableByteChannel wch;
private final ChannelPromise promise;
TransferTask(FileRegion region, WritableByteChannel wch, ChannelPromise promise) {
this.region = region;
TransferTask(FlushTask task, WritableByteChannel wch) {
this.task = task;
this.wch = wch;
this.promise = promise;
}
void transfer() {
try {
for (;;) {
long localWrittenBytes = region.transferTo(wch, writtenBytes);
long localWrittenBytes = task.region().transferTo(wch, writtenBytes);
if (localWrittenBytes == 0) {
// reschedule for write once the channel is writable again
eventLoop().executeWhenWritable(
AbstractNioByteChannel.this, this);
return;
} else if (localWrittenBytes == -1) {
checkEOF(region, writtenBytes);
promise.setSuccess();
checkEOF(task.region(), writtenBytes);
task.setSuccess();
return;
} else {
writtenBytes += localWrittenBytes;
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).setProgress(writtenBytes, region.count());
}
if (writtenBytes >= region.count()) {
region.release();
promise.setSuccess();
task.setProgress(writtenBytes);
if (writtenBytes >= task.region().count()) {
task.setSuccess();
return;
}
}
}
} catch (Throwable cause) {
region.release();
promise.setFailure(cause);
task.setFailure(cause);
}
}
@ -210,16 +202,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public void channelUnregistered(SelectableChannel ch, Throwable cause) throws Exception {
if (cause != null) {
promise.setFailure(cause);
task.setFailure(cause);
return;
}
if (writtenBytes < region.count()) {
region.release();
if (writtenBytes < task.region().count()) {
if (!isOpen()) {
promise.setFailure(new ClosedChannelException());
task.setFailure(new ClosedChannelException());
} else {
promise.setFailure(new IllegalStateException(
task.setFailure(new IllegalStateException(
"Channel was unregistered before the region could be fully written"));
}
}

View File

@ -186,7 +186,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidFuture());
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);

View File

@ -87,7 +87,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
pipeline.fireInboundBufferUpdated();
}
if (closed && isOpen()) {
close(voidFuture());
close(voidPromise());
} else if (!firedChannelReadSuspended) {
pipeline.fireChannelReadSuspended();
}

View File

@ -243,7 +243,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidFuture());
ch.unsafe().close(ch.voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
@ -418,7 +418,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidFuture());
unsafe.close(ch.voidPromise());
return;
}
@ -448,7 +448,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
if (readyOps != -1 && (readyOps & SelectionKey.OP_WRITE) != 0) {
unregisterWritableTasks(ch);
}
unsafe.close(unsafe.voidFuture());
unsafe.close(ch.voidPromise());
}
}
@ -518,7 +518,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
for (AbstractNioChannel ch: channels) {
unregisterWritableTasks(ch);
ch.unsafe().close(ch.unsafe().voidFuture());
ch.unsafe().close(ch.voidPromise());
}
}

View File

@ -128,7 +128,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
firedInboundBufferSuspeneded = true;
pipeline.fireChannelReadSuspended();
pipeline.fireExceptionCaught(t);
unsafe().close(unsafe().voidFuture());
unsafe().close(voidPromise());
}
} finally {
if (read) {
@ -140,7 +140,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
unsafe().close(unsafe().voidFuture());
unsafe().close(voidPromise());
}
}
} else if (!firedInboundBufferSuspeneded) {

View File

@ -56,7 +56,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
pipeline.fireChannelReadSuspended();
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
unsafe().close(unsafe().voidFuture());
unsafe().close(voidPromise());
}
} finally {
if (read) {
@ -66,7 +66,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
pipeline.fireChannelReadSuspended();
}
if (closed && isOpen()) {
unsafe().close(unsafe().voidFuture());
unsafe().close(voidPromise());
}
}
}

View File

@ -17,9 +17,6 @@ package io.netty.channel.oio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import java.io.IOException;
import java.io.InputStream;
@ -98,7 +95,7 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
}
@Override
protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
protected void doFlushFileRegion(FlushTask task) throws Exception {
OutputStream os = this.os;
if (os == null) {
throw new NotYetConnectedException();
@ -109,20 +106,17 @@ public abstract class OioByteStreamChannel extends AbstractOioByteChannel {
long written = 0;
for (;;) {
long localWritten = region.transferTo(outChannel, written);
long localWritten = task.region().transferTo(outChannel, written);
if (localWritten == -1) {
checkEOF(region, written);
region.release();
promise.setSuccess();
checkEOF(task.region(), written);
task.setSuccess();
return;
}
written += localWritten;
if (promise instanceof ChannelProgressivePromise) {
final ChannelProgressivePromise pp = (ChannelProgressivePromise) promise;
pp.setProgress(written, region.count());
}
if (written >= region.count()) {
promise.setSuccess();
task.setProgress(written);
if (written >= task.region().count()) {
task.setSuccess();
return;
}
}

View File

@ -23,10 +23,8 @@ import io.netty.channel.ChannelFlushPromiseNotifier;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.FileRegion;
import io.netty.channel.aio.AbstractAioChannel;
import io.netty.channel.aio.AioCompletionHandler;
import io.netty.channel.aio.AioEventLoopGroup;
@ -315,8 +313,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
@Override
protected void doFlushFileRegion(FileRegion region, ChannelPromise promise) throws Exception {
region.transferTo(new WritableByteChannelAdapter(region, promise), 0);
protected void doFlushFileRegion(FlushTask task) throws Exception {
task.region().transferTo(new WritableByteChannelAdapter(task), 0);
}
@Override
@ -427,7 +425,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (cause instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture());
channel.unsafe().close(channel.voidPromise());
}
}
}
@ -488,7 +486,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (channel.config().isAllowHalfClosure()) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
channel.unsafe().close(channel.unsafe().voidFuture());
channel.unsafe().close(channel.voidPromise());
}
}
} else if (!firedChannelReadSuspended) {
@ -512,7 +510,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture());
channel.unsafe().close(channel.voidPromise());
}
}
}
@ -536,13 +534,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
private final class WritableByteChannelAdapter implements WritableByteChannel {
private final FileRegion region;
private final ChannelPromise promise;
private final FlushTask task;
private long written;
public WritableByteChannelAdapter(FileRegion region, ChannelPromise promise) {
this.region = region;
this.promise = promise;
public WritableByteChannelAdapter(FlushTask task) {
this.task = task;
}
@Override
@ -557,36 +553,31 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return;
}
if (result == -1) {
checkEOF(region, written);
promise.setSuccess();
checkEOF(task.region(), written);
task.setSuccess();
return;
}
written += result;
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).setProgress(written, region.count());
}
task.setProgress(written);
if (written >= region.count()) {
region.release();
promise.setSuccess();
if (written >= task.region().count()) {
task.setSuccess();
return;
}
if (src.hasRemaining()) {
javaChannel().write(src, AioSocketChannel.this, this);
} else {
region.transferTo(WritableByteChannelAdapter.this, written);
task.region().transferTo(WritableByteChannelAdapter.this, written);
}
} catch (Throwable cause) {
region.release();
promise.setFailure(cause);
task.setFailure(cause);
}
}
@Override
public void failed0(Throwable exc, Channel attachment) {
region.release();
promise.setFailure(exc);
task.setFailure(exc);
}
});
return 0;