Re-add Unsafe.voidPromise() which can be used for Unsafe operations for which no notification should be done [#1375]

This commit is contained in:
Norman Maurer 2013-05-25 14:35:22 +02:00
parent d9c700e9fe
commit f7931af704
11 changed files with 40 additions and 20 deletions

View File

@ -79,7 +79,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this);
private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
protected final ChannelFlushPromiseNotifier flushFutureNotifier = new ChannelFlushPromiseNotifier();
@ -936,6 +937,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public ChannelPromise voidPromise() {
return unsafeVoidPromise;
}
protected final boolean ensureOpen(ChannelPromise promise) {
if (isOpen()) {
return true;

View File

@ -177,6 +177,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li>
* <li>{@link #register(EventLoop, ChannelPromise)}</li>
* <li>{@link #voidPromise()}</li>
* </ul>
*/
interface Unsafe {
@ -265,5 +266,12 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* automaticly call {@link FileRegion#release()}.
*/
void sendFile(FileRegion region, ChannelPromise promise);
/**
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
* It will never be notified of a success or error and so is only a placeholder for operations
* that take a {@link ChannelPromise} as argument but for which you not want to get notified.
*/
ChannelPromise voidPromise();
}
}

View File

@ -57,7 +57,7 @@ public class ThreadPerChannelEventLoop extends SingleThreadEventLoop {
Channel ch = this.ch;
if (isShuttingDown()) {
if (ch != null) {
ch.unsafe().close(ch.voidPromise());
ch.unsafe().close(ch.unsafe().voidPromise());
}
if (confirmShutdown()) {
break;

View File

@ -24,17 +24,19 @@ import java.util.concurrent.TimeUnit;
final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise {
private final Channel channel;
private final boolean fireException;
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
*/
public VoidChannelPromise(Channel channel) {
public VoidChannelPromise(Channel channel, boolean fireException) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
this.fireException = fireException;
}
@Override
@ -132,7 +134,9 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPr
}
@Override
public VoidChannelPromise setFailure(Throwable cause) {
channel.pipeline().fireExceptionCaught(cause);
if (fireException) {
channel.pipeline().fireExceptionCaught(cause);
}
return this;
}
@ -143,7 +147,9 @@ final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPr
@Override
public boolean tryFailure(Throwable cause) {
channel.pipeline().fireExceptionCaught(cause);
if (fireException) {
channel.pipeline().fireExceptionCaught(cause);
}
return false;
}

View File

@ -97,7 +97,7 @@ final class AioEventLoop extends SingleThreadEventLoop {
}
for (Channel ch: channels) {
ch.unsafe().close(ch.voidPromise());
ch.unsafe().close(ch.unsafe().voidPromise());
}
}

View File

@ -64,7 +64,7 @@ public class LocalChannel extends AbstractChannel {
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
unsafe().close(voidPromise());
unsafe().close(unsafe().voidPromise());
}
};
@ -215,7 +215,7 @@ public class LocalChannel extends AbstractChannel {
protected void doClose() throws Exception {
LocalChannel peer = this.peer;
if (peer != null && peer.isActive()) {
peer.unsafe().close(voidPromise());
peer.unsafe().close(unsafe().voidPromise());
this.peer = null;
}
}
@ -223,7 +223,7 @@ public class LocalChannel extends AbstractChannel {
@Override
protected Runnable doDeregister() throws Exception {
if (isOpen()) {
unsafe().close(voidPromise());
unsafe().close(unsafe().voidPromise());
}
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
return null;

View File

@ -36,7 +36,7 @@ public class LocalServerChannel extends AbstractServerChannel {
private final Runnable shutdownHook = new Runnable() {
@Override
public void run() {
unsafe().close(voidPromise());
unsafe().close(unsafe().voidPromise());
}
};

View File

@ -243,7 +243,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.voidPromise());
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
@ -418,7 +418,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(ch.voidPromise());
unsafe.close(unsafe.voidPromise());
return;
}
@ -448,7 +448,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
if (readyOps != -1 && (readyOps & SelectionKey.OP_WRITE) != 0) {
unregisterWritableTasks(ch);
}
unsafe.close(ch.voidPromise());
unsafe.close(unsafe.voidPromise());
}
}
@ -518,7 +518,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
for (AbstractNioChannel ch: channels) {
unregisterWritableTasks(ch);
ch.unsafe().close(ch.voidPromise());
ch.unsafe().close(ch.unsafe().voidPromise());
}
}

View File

@ -143,7 +143,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
unsafe().close(voidPromise());
unsafe().close(unsafe().voidPromise());
}
}
} else if (!firedInboundBufferSuspeneded) {

View File

@ -56,7 +56,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
pipeline.fireChannelReadSuspended();
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
unsafe().close(voidPromise());
unsafe().close(unsafe().voidPromise());
}
} finally {
if (read) {
@ -66,7 +66,7 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
pipeline.fireChannelReadSuspended();
}
if (closed && isOpen()) {
unsafe().close(voidPromise());
unsafe().close(unsafe().voidPromise());
}
}
}

View File

@ -432,7 +432,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (cause instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.voidPromise());
channel.unsafe().close(channel.unsafe().voidPromise());
}
}
}
@ -493,7 +493,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (channel.config().isAllowHalfClosure()) {
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
channel.unsafe().close(channel.voidPromise());
channel.unsafe().close(channel.unsafe().voidPromise());
}
}
} else if (!firedChannelReadSuspended) {
@ -517,7 +517,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.voidPromise());
channel.unsafe().close(channel.unsafe().voidPromise());
}
}
}