Move shared methods to the AbstractEmbeddedChannel class and add javadocs
This commit is contained in:
parent
7db47dd0d0
commit
e4ed551490
@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.MessageBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
@ -37,7 +38,12 @@ import io.netty.logging.InternalLoggerFactory;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
|
||||
public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
/**
|
||||
* Base class for {@link Channel} implementations that are used in an embedded fashion.
|
||||
*
|
||||
* @param <O> the type of data that can be written to this {@link Channel}
|
||||
*/
|
||||
public abstract class AbstractEmbeddedChannel<O> extends AbstractChannel {
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEmbeddedChannel.class);
|
||||
|
||||
@ -51,6 +57,12 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
private Throwable lastException;
|
||||
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
|
||||
|
||||
/**
|
||||
* Create a new instance
|
||||
*
|
||||
* @param lastOutboundBuffer the last outbound buffer which will hold all the written data
|
||||
* @param handlers the @link ChannelHandler}s which will be add in the {@link ChannelPipeline}
|
||||
*/
|
||||
AbstractEmbeddedChannel(Object lastOutboundBuffer, ChannelHandler... handlers) {
|
||||
super(null, null);
|
||||
|
||||
@ -101,14 +113,25 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
return state == 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the last inbound {@link MessageBuf} which will hold all the {@link Object}s that where received
|
||||
* by this {@link Channel}
|
||||
*/
|
||||
public MessageBuf<Object> lastInboundMessageBuffer() {
|
||||
return lastInboundMessageBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the last inbound {@link ByteBuf} which will hold all the bytes that where received
|
||||
* by this {@link Channel}
|
||||
*/
|
||||
public ByteBuf lastInboundByteBuffer() {
|
||||
return lastInboundByteBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return received data from this {@link Channel}
|
||||
*/
|
||||
public Object readInbound() {
|
||||
if (lastInboundByteBuffer.readable()) {
|
||||
try {
|
||||
@ -120,6 +143,9 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
return lastInboundMessageBuffer.poll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Run all tasks that are pending in the {@link EventLoop} for this {@link Channel}
|
||||
*/
|
||||
public void runPendingTasks() {
|
||||
try {
|
||||
loop.runTasks();
|
||||
@ -138,6 +164,9 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if there was any {@link Throwable} received and if so rethrow it.
|
||||
*/
|
||||
public void checkException() {
|
||||
Throwable t = lastException;
|
||||
if (t == null) {
|
||||
@ -156,7 +185,7 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
throw new ChannelException(t);
|
||||
}
|
||||
|
||||
protected void ensureOpen() {
|
||||
protected final void ensureOpen() {
|
||||
if (!isOpen()) {
|
||||
recordException(new ClosedChannelException());
|
||||
checkException();
|
||||
@ -214,6 +243,74 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read data froum the outbound. This may return {@code null} if nothing is readable.
|
||||
*/
|
||||
public abstract O readOutbound();
|
||||
|
||||
/**
|
||||
* Return the inbound buffer in which inbound messages are stored.
|
||||
*/
|
||||
public abstract Buf inboundBuffer();
|
||||
|
||||
/**
|
||||
* Return the last outbound buffer in which all the written outbound messages are stored.
|
||||
*/
|
||||
public abstract Buf lastOutboundBuffer();
|
||||
|
||||
/**
|
||||
* Mark this {@link Channel} as finished. Any futher try to write data to it will fail.
|
||||
*
|
||||
*
|
||||
* @return bufferReadable returns {@code true} if any of the used buffers has something left to read
|
||||
*/
|
||||
public boolean finish() {
|
||||
close();
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() ||
|
||||
hasReadableOutboundBuffer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to the inbound of this {@link Channel}.
|
||||
*
|
||||
* @param data data that should be written
|
||||
* @return bufferReadable returns {@code true} if the write operation did add something to the the inbound buffer
|
||||
*/
|
||||
public boolean writeInbound(O data) {
|
||||
ensureOpen();
|
||||
writeInbound0(data);
|
||||
pipeline().fireInboundBufferUpdated();
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to the outbound of this {@link Channel}.
|
||||
*
|
||||
* @param data data that should be written
|
||||
* @return bufferReadable returns {@code true} if the write operation did add something to the the outbound buffer
|
||||
*/
|
||||
public boolean writeOutbound(Object data) {
|
||||
ensureOpen();
|
||||
write(data);
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
return hasReadableOutboundBuffer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the outbound buffer hold some data which can be read
|
||||
*/
|
||||
protected abstract boolean hasReadableOutboundBuffer();
|
||||
|
||||
/**
|
||||
* Add the data to the inbound buffer.
|
||||
*/
|
||||
protected abstract void writeInbound0(O data);
|
||||
|
||||
private class DefaultUnsafe extends AbstractUnsafe {
|
||||
@Override
|
||||
public void connect(SocketAddress remoteAddress,
|
||||
|
@ -18,13 +18,22 @@ package io.netty.channel.embedded;
|
||||
import io.netty.buffer.BufType;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
|
||||
public class EmbeddedByteChannel extends AbstractEmbeddedChannel {
|
||||
|
||||
/**
|
||||
* Embedded {@link Channel} which operates on bytes
|
||||
*/
|
||||
public class EmbeddedByteChannel extends AbstractEmbeddedChannel<ByteBuf> {
|
||||
|
||||
private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false);
|
||||
|
||||
/**
|
||||
* Create a new instance with the given {@link ChannelHandler}s in the {@link ChannelPipeline}
|
||||
*/
|
||||
public EmbeddedByteChannel(ChannelHandler... handlers) {
|
||||
super(Unpooled.buffer(), handlers);
|
||||
}
|
||||
@ -34,14 +43,17 @@ public class EmbeddedByteChannel extends AbstractEmbeddedChannel {
|
||||
return METADATA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf inboundBuffer() {
|
||||
return pipeline().inboundByteBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf lastOutboundBuffer() {
|
||||
return (ByteBuf) lastOutboundBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf readOutbound() {
|
||||
if (!lastOutboundBuffer().readable()) {
|
||||
return null;
|
||||
@ -53,31 +65,16 @@ public class EmbeddedByteChannel extends AbstractEmbeddedChannel {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean writeInbound(ByteBuf data) {
|
||||
ensureOpen();
|
||||
@Override
|
||||
protected void writeInbound0(ByteBuf data) {
|
||||
inboundBuffer().writeBytes(data);
|
||||
pipeline().fireInboundBufferUpdated();
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty();
|
||||
}
|
||||
|
||||
public boolean writeOutbound(Object msg) {
|
||||
ensureOpen();
|
||||
write(msg);
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
@Override
|
||||
protected boolean hasReadableOutboundBuffer() {
|
||||
return lastOutboundBuffer().readable();
|
||||
}
|
||||
|
||||
public boolean finish() {
|
||||
close();
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() ||
|
||||
lastOutboundBuffer().readable();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
|
||||
if (!lastOutboundBuffer().readable()) {
|
||||
|
@ -18,13 +18,21 @@ package io.netty.channel.embedded;
|
||||
import io.netty.buffer.BufType;
|
||||
import io.netty.buffer.MessageBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
|
||||
public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
|
||||
/**
|
||||
* Embedded {@@link Channel} which operates on messages which can be of any time.
|
||||
*/
|
||||
public class EmbeddedMessageChannel extends AbstractEmbeddedChannel<Object> {
|
||||
|
||||
private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.MESSAGE, false);
|
||||
|
||||
/**
|
||||
* Create a new instance with the given {@link ChannelHandler}s in the {@link ChannelPipeline}
|
||||
*/
|
||||
public EmbeddedMessageChannel(ChannelHandler... handlers) {
|
||||
super(Unpooled.messageBuffer(), handlers);
|
||||
}
|
||||
@ -34,6 +42,7 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
|
||||
return METADATA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageBuf<Object> inboundBuffer() {
|
||||
return pipeline().inboundMessageBuffer();
|
||||
}
|
||||
@ -43,35 +52,21 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
|
||||
return (MessageBuf<Object>) lastOutboundBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object readOutbound() {
|
||||
return lastOutboundBuffer().poll();
|
||||
}
|
||||
|
||||
public boolean writeInbound(Object msg) {
|
||||
ensureOpen();
|
||||
inboundBuffer().add(msg);
|
||||
pipeline().fireInboundBufferUpdated();
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty();
|
||||
@Override
|
||||
protected void writeInbound0(Object data) {
|
||||
inboundBuffer().add(data);
|
||||
}
|
||||
|
||||
public boolean writeOutbound(Object msg) {
|
||||
ensureOpen();
|
||||
write(msg);
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
@Override
|
||||
protected boolean hasReadableOutboundBuffer() {
|
||||
return !lastOutboundBuffer().isEmpty();
|
||||
}
|
||||
|
||||
public boolean finish() {
|
||||
close();
|
||||
runPendingTasks();
|
||||
checkException();
|
||||
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() ||
|
||||
!lastOutboundBuffer().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
|
||||
buf.drainTo(lastOutboundBuffer());
|
||||
|
@ -17,6 +17,6 @@ package io.netty.channel.embedded;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
class EmbeddedSocketAddress extends SocketAddress {
|
||||
final class EmbeddedSocketAddress extends SocketAddress {
|
||||
private static final long serialVersionUID = 1400788804624980619L;
|
||||
}
|
||||
|
@ -15,7 +15,7 @@
|
||||
*/
|
||||
|
||||
/**
|
||||
* A virtual {@link io.netty.channel.Channel} that helps wrapping a series of handlers to
|
||||
* A virtual {@link Channel} that helps wrapping a series of handlers to
|
||||
* unit test the handlers or use them in non-I/O context.
|
||||
*/
|
||||
package io.netty.channel.embedded;
|
||||
|
Loading…
x
Reference in New Issue
Block a user