Allow to use ChannelOutboundBuffer without AbstractChannel

Motivation:

We expose ChannelOutboundBuffer in Channel.Unsafe but it is not possible
to create a new ChannelOutboundBuffer without an AbstractChannel.  This
makes it impossible to write a Channel implementation that does not
extend AbstractChannel.

Modifications:

- Change ChannelOutboundBuffer to take a Channel as constructor argument.
- Add javadocs

Result:

ChannelOutboundBuffer can be used with a Channel implemention that does
not extend AbstractChannel.
This commit is contained in:
Norman Maurer 2014-08-04 20:51:01 +02:00 committed by Trustin Lee
parent d0e4a85830
commit 3f3e66c31a
3 changed files with 64 additions and 18 deletions

View File

@ -83,7 +83,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.getWritable();
return buf != null && buf.isWritable();
}
@Override
@ -649,7 +649,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, promise);
int size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
outboundBuffer.addMessage(msg, size, promise);
}
@Override

View File

@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
* outbound write requests.
*
* All the methods should only be called by the {@link EventLoop} of the {@link Channel}.
*/
public final class ChannelOutboundBuffer {
@ -61,7 +63,7 @@ public final class ChannelOutboundBuffer {
}
};
private final AbstractChannel channel;
private final Channel channel;
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
@ -109,12 +111,11 @@ public final class ChannelOutboundBuffer {
this.channel = channel;
}
void addMessage(Object msg, ChannelPromise promise) {
int size = channel.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
/**
* Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
* the message was written.
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
@ -133,7 +134,11 @@ public final class ChannelOutboundBuffer {
incrementPendingOutboundBytes(size);
}
void addFlush() {
/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
@ -206,10 +211,18 @@ public final class ChannelOutboundBuffer {
return -1;
}
/**
* Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
*/
public Object current() {
return current(true);
}
/**
* Return the current message to write or {@code null} if nothing was flushed before and so is ready to be written.
* If {@code true} is specified a direct {@link ByteBuf} or {@link ByteBufHolder} is prefered and
* so the current message may be copied into a direct buffer.
*/
public Object current(boolean preferDirect) {
// TODO: Think of a smart way to handle ByteBufHolder messages
Entry entry = flushedEntry;
@ -250,7 +263,7 @@ public final class ChannelOutboundBuffer {
/**
* Replace the current msg with the given one.
* The replaced msg will automatically be released
* {@link ReferenceCountUtil#release(Object)} will automatically be called on the replaced message.
*/
public void current(Object msg) {
Entry entry = flushedEntry;
@ -259,6 +272,9 @@ public final class ChannelOutboundBuffer {
entry.msg = msg;
}
/**
* Notify the {@link ChannelPromise} of the current message about writing progress.
*/
public void progress(long amount) {
Entry e = flushedEntry;
assert e != null;
@ -270,6 +286,11 @@ public final class ChannelOutboundBuffer {
}
}
/**
* Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
* flushed message exists at the time this method is called it will return {@code false} to signal that no more
* messages are ready to be handled.
*/
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
@ -295,6 +316,11 @@ public final class ChannelOutboundBuffer {
return true;
}
/**
* Will remove the current message, mark its {@link ChannelPromise} as failure using the given {@link Throwable}
* and return {@code true}. If no flushed message exists at the time this method is called it will return
* {@code false} to signal that no more messages are ready to be handled.
*/
public boolean remove(Throwable cause) {
Entry e = flushedEntry;
if (e == null) {
@ -366,9 +392,8 @@ public final class ChannelOutboundBuffer {
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and
* {@link #nioBufferSize()} will return the number of NIO buffers in the returned array and the total number
* of readable bytes of the NIO buffers respectively.
* {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
* array and the total number of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
@ -479,22 +504,39 @@ public final class ChannelOutboundBuffer {
return newArray;
}
/**
* Returns the number of {@link ByteBuffer} that can be written out of the {@link ByteBuffer} array that was
* obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
* was called.
*/
public int nioBufferCount() {
return nioBufferCount;
}
/**
* Returns the number of bytes that can be written out of the {@link ByteBuffer} array that was
* obtained via {@link #nioBuffers()}. This method <strong>MUST</strong> be called after {@link #nioBuffers()}
* was called.
*/
public long nioBufferSize() {
return nioBufferSize;
}
boolean getWritable() {
boolean isWritable() {
return writable != 0;
}
/**
* Returns the number of flushed messages in this {@link ChannelOutboundBuffer}.
*/
public int size() {
return flushed;
}
/**
* Returns {@code true} if there are flushed messages in this {@link ChannelOutboundBuffer} or {@code false}
* otherwise.
*/
public boolean isEmpty() {
return flushed == 0;
}

View File

@ -51,7 +51,7 @@ public class ChannelOutboundBufferTest {
ByteBuf buf = copiedBuffer("buf1", CharsetUtil.US_ASCII);
ByteBuffer nioBuf = buf.internalNioBuffer(0, buf.readableBytes());
buffer.addMessage(buf, channel.voidPromise());
buffer.addMessage(buf, buf.readableBytes(), channel.voidPromise());
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
buffer.addFlush();
ByteBuffer[] buffers = buffer.nioBuffers();
@ -75,7 +75,7 @@ public class ChannelOutboundBufferTest {
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 64; i++) {
buffer.addMessage(buf.copy(), channel.voidPromise());
buffer.addMessage(buf.copy(), buf.readableBytes(), channel.voidPromise());
}
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
buffer.addFlush();
@ -99,7 +99,7 @@ public class ChannelOutboundBufferTest {
for (int i = 0; i < 65; i++) {
comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes());
}
buffer.addMessage(comp, channel.voidPromise());
buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise());
assertEquals("Should still be 0 as not flushed yet", 0, buffer.nioBufferCount());
buffer.addFlush();