From 44ea0a116f08a49e727c6b18957b4662ff6da9c8 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 8 Feb 2013 23:07:20 +0900 Subject: [PATCH] Replace ChannelHandlerUtil.unfoldAndAdd() with MessageBuf.unfoldAndAdd() / Remove unused methods in ChannelHandlerUtil --- .../io/netty/buffer/AbstractMessageBuf.java | 23 ++++ .../main/java/io/netty/buffer/MessageBuf.java | 11 ++ .../handler/codec/ByteToMessageDecoder.java | 7 +- .../codec/MessageToMessageDecoder.java | 5 +- .../codec/MessageToMessageEncoder.java | 3 +- .../netty/handler/codec/ReplayingDecoder.java | 7 +- .../io/netty/channel/ChannelHandlerUtil.java | 120 ------------------ 7 files changed, 44 insertions(+), 132 deletions(-) diff --git a/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java index 274c8b437a..3c69ceceb5 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractMessageBuf.java @@ -108,6 +108,29 @@ public abstract class AbstractMessageBuf extends AbstractQueue implements return super.element(); } + @Override + @SuppressWarnings("unchecked") + public boolean unfoldAndAdd(Object o) { + if (o == null) { + return false; + } + + if (o instanceof Object[]) { + Object[] a = (Object[]) o; + int i; + for (i = 0; i < a.length; i ++) { + Object m = a[i]; + if (m == null) { + break; + } + add((T) m); + } + return i != 0; + } + + return add((T) o); + } + @Override public int drainTo(Collection c) { checkUnfreed(); diff --git a/buffer/src/main/java/io/netty/buffer/MessageBuf.java b/buffer/src/main/java/io/netty/buffer/MessageBuf.java index 517a01841f..37318ff8c2 100644 --- a/buffer/src/main/java/io/netty/buffer/MessageBuf.java +++ b/buffer/src/main/java/io/netty/buffer/MessageBuf.java @@ -25,6 +25,17 @@ import java.util.Queue; */ public interface MessageBuf extends Buf, Queue { + /** + * Unfold the specified object if necessary, and then add the unfolded objects (or the specified object if + * unfonding was not necessary) to this buffer. If the specified object is an object array ({@code Object[]}), + * this method adds the elements of the array to this buffer until {@code null} is encountered. If the specified + * object is {@code null}, nothing is added to this buffer. Otherwise, the specified object is added to this + * buffer as-is. + * + * @return {@code true} if one or more messages were added to this buffer. {@code false} otherwise. + */ + boolean unfoldAndAdd(Object o); + /** * Drain the content of te {@link MessageBuf} to the given {@link Collection}. * diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 8c4c7dd61c..cf53bf02ad 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -16,8 +16,8 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandlerAdapter; @@ -88,7 +88,7 @@ public abstract class ByteToMessageDecoder } try { - if (ChannelHandlerUtil.unfoldAndAdd(ctx, decodeLast(ctx, in), true)) { + if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, in))) { ctx.fireInboundBufferUpdated(); } } catch (Throwable t) { @@ -106,6 +106,7 @@ public abstract class ByteToMessageDecoder boolean wasNull = false; boolean decoded = false; + MessageBuf out = ctx.nextInboundMessageBuffer(); while (in.isReadable()) { try { int oldInputLength = in.readableBytes(); @@ -124,7 +125,7 @@ public abstract class ByteToMessageDecoder "decode() did not read anything but decoded a message."); } - if (ChannelHandlerUtil.unfoldAndAdd(ctx, o, true)) { + if (out.unfoldAndAdd(o)) { decoded = true; if (isSingleDecode()) { break; diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index d18a315de5..a3bd2795dc 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -57,10 +57,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa @Override protected final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { - Object decoded = decode(ctx, msg); - if (decoded != null) { - ctx.nextInboundMessageBuffer().add(decoded); - } + ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg)); } /** diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 8277de3334..72a6d3ecfe 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -63,8 +63,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH @Override protected final void flush(ChannelHandlerContext ctx, I msg) throws Exception { try { - Object encoded = encode(ctx, msg); - ctx.nextOutboundMessageBuffer().add(encoded); + ctx.nextOutboundMessageBuffer().unfoldAndAdd(encode(ctx, msg)); } catch (CodecException e) { throw e; } catch (Exception e) { diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 797266b924..18e7e49457 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -16,9 +16,9 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelPipeline; import io.netty.util.internal.Signal; @@ -374,7 +374,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } try { - if (ChannelHandlerUtil.unfoldAndAdd(ctx, decodeLast(ctx, replayable), true)) { + if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, replayable))) { ctx.fireInboundBufferUpdated(); } } catch (Signal replay) { @@ -396,6 +396,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { boolean wasNull = false; ByteBuf in = cumulation; + MessageBuf out = ctx.nextInboundMessageBuffer(); boolean decoded = false; while (in.isReadable()) { try { @@ -443,7 +444,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } // A successful decode - if (ChannelHandlerUtil.unfoldAndAdd(ctx, result, true)) { + if (out.unfoldAndAdd(result)) { decoded = true; if (isSingleDecode()) { break; diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java index 1c431b9e4b..3146478a1f 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java @@ -15,133 +15,13 @@ */ package io.netty.channel; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Freeable; -import io.netty.buffer.MessageBuf; /** * Utilities for {@link ChannelHandler} implementations. */ public final class ChannelHandlerUtil { - /** - * Unfold the given msg and pass it to the next buffer depending on the msg type. - * - * @param ctx - * the {@link ChannelHandlerContext} on which to operate - * @param msg - * the msg to unfold and pass to the next buffer - * @param inbound - * {@code true} if it is an inbound message, {@code false} otherwise - * @return added - * {@code true} if the message was added to the next {@link ByteBuf} or {@link MessageBuf} - * @throws Exception - * thrown if an error accour - */ - public static boolean unfoldAndAdd( - ChannelHandlerContext ctx, Object msg, boolean inbound) throws Exception { - if (msg == null) { - return false; - } - - // Note we only recognize Object[] because Iterable is often implemented by user messages. - if (msg instanceof Object[]) { - Object[] array = (Object[]) msg; - if (array.length == 0) { - return false; - } - - boolean added = false; - for (Object m: array) { - if (m == null) { - break; - } - if (unfoldAndAdd(ctx, m, inbound)) { - added = true; - } - } - return added; - } - - if (inbound) { - ctx.nextInboundMessageBuffer().add(msg); - return true; - } - - ctx.nextOutboundMessageBuffer().add(msg); - return true; - } - - private static final Class[] EMPTY_TYPES = new Class[0]; - - /** - * Creates a safe copy of the given array and return it. - */ - public static Class[] acceptedMessageTypes(Class[] acceptedMsgTypes) { - if (acceptedMsgTypes == null) { - return EMPTY_TYPES; - } - - int numElem = 0; - for (Class c: acceptedMsgTypes) { - if (c == null) { - break; - } - numElem ++; - } - - Class[] newAllowedMsgTypes = new Class[numElem]; - System.arraycopy(acceptedMsgTypes, 0, newAllowedMsgTypes, 0, numElem); - - return newAllowedMsgTypes; - } - - /** - * Return {@code true} if the given msg is compatible with one of the given acceptedMessageTypes or if - * acceptedMessageTypes is null / empty. - */ - public static boolean acceptMessage(Class[] acceptedMsgTypes, Object msg) { - if (acceptedMsgTypes == null || acceptedMsgTypes.length == 0) { - return true; - } - - for (Class c: acceptedMsgTypes) { - if (c.isInstance(msg)) { - return true; - } - } - - return false; - } - - /** - * Add the given msg to the next outbound {@link MessageBuf}. - */ - public static void addToNextOutboundBuffer(ChannelHandlerContext ctx, Object msg) { - try { - ctx.nextOutboundMessageBuffer().add(msg); - } catch (NoSuchBufferException e) { - NoSuchBufferException newE = - new NoSuchBufferException(e.getMessage() + " (msg: " + msg + ')'); - newE.setStackTrace(e.getStackTrace()); - throw newE; - } - } - - /** - * Add the given msg to the next inbound {@link MessageBuf}. - */ - public static void addToNextInboundBuffer(ChannelHandlerContext ctx, Object msg) { - try { - ctx.nextInboundMessageBuffer().add(msg); - } catch (NoSuchBufferException e) { - NoSuchBufferException newE = - new NoSuchBufferException(e.getMessage() + " (msg: " + msg + ')'); - newE.setStackTrace(e.getStackTrace()); - throw newE; - } - } - /** * Try to free up resources that are held by the message. */