Replace ChannelHandlerUtil.unfoldAndAdd() with MessageBuf.unfoldAndAdd() / Remove unused methods in ChannelHandlerUtil

This commit is contained in:
Trustin Lee 2013-02-08 23:07:20 +09:00
parent 9475f9aeea
commit 44ea0a116f
7 changed files with 44 additions and 132 deletions

View File

@ -108,6 +108,29 @@ public abstract class AbstractMessageBuf<T> extends AbstractQueue<T> implements
return super.element();
}
@Override
@SuppressWarnings("unchecked")
public boolean unfoldAndAdd(Object o) {
if (o == null) {
return false;
}
if (o instanceof Object[]) {
Object[] a = (Object[]) o;
int i;
for (i = 0; i < a.length; i ++) {
Object m = a[i];
if (m == null) {
break;
}
add((T) m);
}
return i != 0;
}
return add((T) o);
}
@Override
public int drainTo(Collection<? super T> c) {
checkUnfreed();

View File

@ -25,6 +25,17 @@ import java.util.Queue;
*/
public interface MessageBuf<T> extends Buf, Queue<T> {
/**
* Unfold the specified object if necessary, and then add the unfolded objects (or the specified object if
* unfonding was not necessary) to this buffer. If the specified object is an object array ({@code Object[]}),
* this method adds the elements of the array to this buffer until {@code null} is encountered. If the specified
* object is {@code null}, nothing is added to this buffer. Otherwise, the specified object is added to this
* buffer as-is.
*
* @return {@code true} if one or more messages were added to this buffer. {@code false} otherwise.
*/
boolean unfoldAndAdd(Object o);
/**
* Drain the content of te {@link MessageBuf} to the given {@link Collection}.
*

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
@ -88,7 +88,7 @@ public abstract class ByteToMessageDecoder
}
try {
if (ChannelHandlerUtil.unfoldAndAdd(ctx, decodeLast(ctx, in), true)) {
if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, in))) {
ctx.fireInboundBufferUpdated();
}
} catch (Throwable t) {
@ -106,6 +106,7 @@ public abstract class ByteToMessageDecoder
boolean wasNull = false;
boolean decoded = false;
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
while (in.isReadable()) {
try {
int oldInputLength = in.readableBytes();
@ -124,7 +125,7 @@ public abstract class ByteToMessageDecoder
"decode() did not read anything but decoded a message.");
}
if (ChannelHandlerUtil.unfoldAndAdd(ctx, o, true)) {
if (out.unfoldAndAdd(o)) {
decoded = true;
if (isSingleDecode()) {
break;

View File

@ -57,10 +57,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHa
@Override
protected final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
Object decoded = decode(ctx, msg);
if (decoded != null) {
ctx.nextInboundMessageBuffer().add(decoded);
}
ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg));
}
/**

View File

@ -63,8 +63,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
@Override
protected final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
try {
Object encoded = encode(ctx, msg);
ctx.nextOutboundMessageBuffer().add(encoded);
ctx.nextOutboundMessageBuffer().unfoldAndAdd(encode(ctx, msg));
} catch (CodecException e) {
throw e;
} catch (Exception e) {

View File

@ -16,9 +16,9 @@
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelPipeline;
import io.netty.util.internal.Signal;
@ -374,7 +374,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
}
try {
if (ChannelHandlerUtil.unfoldAndAdd(ctx, decodeLast(ctx, replayable), true)) {
if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, replayable))) {
ctx.fireInboundBufferUpdated();
}
} catch (Signal replay) {
@ -396,6 +396,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
boolean wasNull = false;
ByteBuf in = cumulation;
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
boolean decoded = false;
while (in.isReadable()) {
try {
@ -443,7 +444,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
}
// A successful decode
if (ChannelHandlerUtil.unfoldAndAdd(ctx, result, true)) {
if (out.unfoldAndAdd(result)) {
decoded = true;
if (isSingleDecode()) {
break;

View File

@ -15,133 +15,13 @@
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Freeable;
import io.netty.buffer.MessageBuf;
/**
* Utilities for {@link ChannelHandler} implementations.
*/
public final class ChannelHandlerUtil {
/**
* Unfold the given msg and pass it to the next buffer depending on the msg type.
*
* @param ctx
* the {@link ChannelHandlerContext} on which to operate
* @param msg
* the msg to unfold and pass to the next buffer
* @param inbound
* {@code true} if it is an inbound message, {@code false} otherwise
* @return added
* {@code true} if the message was added to the next {@link ByteBuf} or {@link MessageBuf}
* @throws Exception
* thrown if an error accour
*/
public static boolean unfoldAndAdd(
ChannelHandlerContext ctx, Object msg, boolean inbound) throws Exception {
if (msg == null) {
return false;
}
// Note we only recognize Object[] because Iterable is often implemented by user messages.
if (msg instanceof Object[]) {
Object[] array = (Object[]) msg;
if (array.length == 0) {
return false;
}
boolean added = false;
for (Object m: array) {
if (m == null) {
break;
}
if (unfoldAndAdd(ctx, m, inbound)) {
added = true;
}
}
return added;
}
if (inbound) {
ctx.nextInboundMessageBuffer().add(msg);
return true;
}
ctx.nextOutboundMessageBuffer().add(msg);
return true;
}
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
/**
* Creates a safe copy of the given array and return it.
*/
public static Class<?>[] acceptedMessageTypes(Class<?>[] acceptedMsgTypes) {
if (acceptedMsgTypes == null) {
return EMPTY_TYPES;
}
int numElem = 0;
for (Class<?> c: acceptedMsgTypes) {
if (c == null) {
break;
}
numElem ++;
}
Class<?>[] newAllowedMsgTypes = new Class[numElem];
System.arraycopy(acceptedMsgTypes, 0, newAllowedMsgTypes, 0, numElem);
return newAllowedMsgTypes;
}
/**
* Return {@code true} if the given msg is compatible with one of the given acceptedMessageTypes or if
* acceptedMessageTypes is null / empty.
*/
public static boolean acceptMessage(Class<?>[] acceptedMsgTypes, Object msg) {
if (acceptedMsgTypes == null || acceptedMsgTypes.length == 0) {
return true;
}
for (Class<?> c: acceptedMsgTypes) {
if (c.isInstance(msg)) {
return true;
}
}
return false;
}
/**
* Add the given msg to the next outbound {@link MessageBuf}.
*/
public static void addToNextOutboundBuffer(ChannelHandlerContext ctx, Object msg) {
try {
ctx.nextOutboundMessageBuffer().add(msg);
} catch (NoSuchBufferException e) {
NoSuchBufferException newE =
new NoSuchBufferException(e.getMessage() + " (msg: " + msg + ')');
newE.setStackTrace(e.getStackTrace());
throw newE;
}
}
/**
* Add the given msg to the next inbound {@link MessageBuf}.
*/
public static void addToNextInboundBuffer(ChannelHandlerContext ctx, Object msg) {
try {
ctx.nextInboundMessageBuffer().add(msg);
} catch (NoSuchBufferException e) {
NoSuchBufferException newE =
new NoSuchBufferException(e.getMessage() + " (msg: " + msg + ')');
newE.setStackTrace(e.getStackTrace());
throw newE;
}
}
/**
* Try to free up resources that are held by the message.
*/