[#1557] Make the contract of Channel.Unsafe.flush() more clear

* Remove boolean parameter from Channel.Unsafe.flush() method
* Move NIO related things to AbstractNioChannel.NioUnsafe
This commit is contained in:
Norman Maurer 2013-07-12 18:45:24 +02:00
parent a215ba6ef6
commit 2871079c4a
9 changed files with 33 additions and 47 deletions

View File

@ -54,7 +54,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile boolean registered; private volatile boolean registered;
private ClosedChannelException closedChannelException; private ClosedChannelException closedChannelException;
private boolean inFlushNow; private boolean inFlush0;
/** Cache for the string representation of this channel */ /** Cache for the string representation of this channel */
private boolean strValActive; private boolean strValActive;
@ -592,25 +592,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
@Override @Override
public void flush(boolean force) { public void flush() {
outboundBuffer.addFlush(); outboundBuffer.addFlush();
flush0(force); flush0();
} }
private void flush0(boolean force) { protected void flush0() {
if (inFlushNow) { if (inFlush0) {
// Avoid re-entrance // Avoid re-entrance
return; return;
} }
// Flush immediately only when there's no pending flush. inFlush0 = true;
// If there's a pending flush operation, event loop will call flushNow() later,
// and thus there's no need to call it now.
if (!force && isFlushPending()) {
return;
}
inFlushNow = true;
final ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer; final ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer;
@ -621,7 +614,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} else { } else {
outboundBuffer.fail(new ClosedChannelException()); outboundBuffer.fail(new ClosedChannelException());
} }
inFlushNow = false; inFlush0 = false;
return; return;
} }
@ -667,7 +660,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
close(voidPromise()); close(voidPromise());
} }
} finally { } finally {
inFlushNow = false; inFlush0 = false;
} }
} }
@ -803,11 +796,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return 0; return 0;
} }
/**
* Return {@code true} if a flush to the {@link Channel} is currently pending.
*/
protected abstract boolean isFlushPending();
final class CloseFuture extends DefaultChannelPromise { final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) { CloseFuture(AbstractChannel ch) {

View File

@ -61,11 +61,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
protected boolean isFlushPending() {
return false;
}
@Override @Override
protected AbstractUnsafe newUnsafe() { protected AbstractUnsafe newUnsafe() {
return new DefaultServerUnsafe(); return new DefaultServerUnsafe();
@ -84,7 +79,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
} }
@Override @Override
public void flush(boolean force) { public void flush() {
// ignore // ignore
} }

View File

@ -242,7 +242,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
/** /**
* Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}. * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
*/ */
void flush(boolean force); void flush();
/** /**
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}. * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.

View File

@ -1030,7 +1030,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush(false); unsafe.flush();
} }
@Override @Override

View File

@ -301,11 +301,6 @@ public class EmbeddedChannel extends AbstractChannel {
return new DefaultUnsafe(); return new DefaultUnsafe();
} }
@Override
protected boolean isFlushPending() {
return false;
}
@Override @Override
protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception {
for (int i = startIndex; i < msgsLength; i ++) { for (int i = startIndex; i < msgsLength; i ++) {

View File

@ -315,11 +315,6 @@ public class LocalChannel extends AbstractChannel {
} }
} }
@Override
protected boolean isFlushPending() {
return false;
}
private class LocalUnsafe extends AbstractUnsafe { private class LocalUnsafe extends AbstractUnsafe {
@Override @Override

View File

@ -145,6 +145,8 @@ public abstract class AbstractNioChannel extends AbstractChannel {
* Read from underlying {@link SelectableChannel} * Read from underlying {@link SelectableChannel}
*/ */
void read(); void read();
void forceFlush();
} }
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
@ -245,6 +247,23 @@ public abstract class AbstractNioChannel extends AbstractChannel {
connectPromise = null; connectPromise = null;
} }
} }
@Override
protected void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
super.flush0();
}
@Override
public void forceFlush() {
// directly call super.flush0() to force a flush now
super.flush0();
}
} }
@Override @Override
@ -252,8 +271,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return loop instanceof NioEventLoop; return loop instanceof NioEventLoop;
} }
@Override private boolean isFlushPending() {
protected boolean isFlushPending() {
SelectionKey selectionKey = this.selectionKey; SelectionKey selectionKey = this.selectionKey;
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
} }

View File

@ -528,8 +528,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
processSelectedKey(ch.selectionKey(), task); processSelectedKey(ch.selectionKey(), task);
} }
// Call flushNow which will also take care of clear the OP_WRITE once there is nothing left to write // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().flush(true); ch.unsafe().forceFlush();
} }
private static void unregisterWritableTasks(AbstractNioChannel ch) { private static void unregisterWritableTasks(AbstractNioChannel ch) {

View File

@ -91,11 +91,6 @@ public abstract class AbstractOioChannel extends AbstractChannel {
return loop instanceof ThreadPerChannelEventLoop; return loop instanceof ThreadPerChannelEventLoop;
} }
@Override
protected boolean isFlushPending() {
return false;
}
/** /**
* Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise. * Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
*/ */