Make ChannelOutboundMessageHandlerAdapter similar to ChannelInboundMessageHandlerAdapter

This commit is contained in:
Trustin Lee 2013-02-08 17:07:01 +09:00
parent 1640b1fea6
commit 76eb40a4d2
26 changed files with 219 additions and 290 deletions

View File

@ -48,13 +48,6 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
@SuppressWarnings("RedundantFieldInitialization") @SuppressWarnings("RedundantFieldInitialization")
private int state = ST_INIT; private int state = ST_INIT;
/**
* Creates a new instance.
*/
protected HttpObjectEncoder() {
super(HttpObject.class);
}
@Override @Override
protected void encode(ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, HttpObject msg, ByteBuf out) throws Exception {
if (msg instanceof HttpMessage) { if (msg instanceof HttpMessage) {

View File

@ -28,8 +28,8 @@ public class HttpRequestEncoder extends HttpObjectEncoder<HttpRequest> {
private static final char SLASH = '/'; private static final char SLASH = '/';
@Override @Override
public boolean isEncodable(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return super.isEncodable(msg) && !(msg instanceof HttpResponse); return super.acceptOutboundMessage(msg) && !(msg instanceof HttpResponse);
} }
@Override @Override

View File

@ -27,8 +27,8 @@ import static io.netty.handler.codec.http.HttpConstants.*;
public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> { public class HttpResponseEncoder extends HttpObjectEncoder<HttpResponse> {
@Override @Override
public boolean isEncodable(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return super.isEncodable(msg) && !(msg instanceof HttpRequest); return super.acceptOutboundMessage(msg) && !(msg instanceof HttpRequest);
} }
@Override @Override

View File

@ -32,14 +32,8 @@ import io.netty.handler.codec.MessageToByteEncoder;
@Sharable @Sharable
public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame> { public class WebSocket00FrameEncoder extends MessageToByteEncoder<WebSocketFrame> {
public WebSocket00FrameEncoder() {
super(WebSocketFrame.class);
}
@Override @Override
public void encode( public void encode(ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception {
ChannelHandlerContext ctx,
WebSocketFrame msg, ByteBuf out) throws Exception {
if (msg instanceof TextWebSocketFrame) { if (msg instanceof TextWebSocketFrame) {
// Text frame // Text frame
ByteBuf data = msg.data(); ByteBuf data = msg.data();

View File

@ -90,13 +90,11 @@ public class WebSocket08FrameEncoder extends MessageToByteEncoder<WebSocketFrame
* false. * false.
*/ */
public WebSocket08FrameEncoder(boolean maskPayload) { public WebSocket08FrameEncoder(boolean maskPayload) {
super(WebSocketFrame.class);
this.maskPayload = maskPayload; this.maskPayload = maskPayload;
} }
@Override @Override
public void encode( protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception {
ChannelHandlerContext ctx, WebSocketFrame msg, ByteBuf out) throws Exception {
byte[] mask; byte[] mask;

View File

@ -38,7 +38,7 @@ public abstract class RtspObjectEncoder<H extends HttpMessage> extends HttpObjec
} }
@Override @Override
public boolean isEncodable(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof FullHttpMessage; return msg instanceof FullHttpMessage;
} }
} }

View File

@ -28,7 +28,7 @@ import io.netty.util.CharsetUtil;
public class RtspRequestEncoder extends RtspObjectEncoder<HttpRequest> { public class RtspRequestEncoder extends RtspObjectEncoder<HttpRequest> {
@Override @Override
public boolean isEncodable(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof FullHttpRequest; return msg instanceof FullHttpRequest;
} }

View File

@ -28,7 +28,7 @@ import io.netty.util.CharsetUtil;
public class RtspResponseEncoder extends RtspObjectEncoder<HttpResponse> { public class RtspResponseEncoder extends RtspObjectEncoder<HttpResponse> {
@Override @Override
public boolean isEncodable(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof FullHttpResponse; return msg instanceof FullHttpResponse;
} }

View File

@ -50,8 +50,6 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
* Creates a new instance with the specified parameters. * Creates a new instance with the specified parameters.
*/ */
public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) {
super(SpdyDataFrame.class, SpdyControlFrame.class);
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"unknown version: " + version); "unknown version: " + version);
@ -61,6 +59,11 @@ public class SpdyFrameEncoder extends MessageToByteEncoder<Object> {
version, compressionLevel, windowBits, memLevel); version, compressionLevel, windowBits, memLevel);
} }
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof SpdyDataFrame || msg instanceof SpdyControlFrame;
}
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
ctx.channel().closeFuture().addListener(new ChannelFutureListener() { ctx.channel().closeFuture().addListener(new ChannelFutureListener() {

View File

@ -131,8 +131,6 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<HttpObject> {
* @param version the protocol version * @param version the protocol version
*/ */
public SpdyHttpEncoder(int version) { public SpdyHttpEncoder(int version) {
super(HttpObject.class);
if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) { if (version < SpdyConstants.SPDY_MIN_VERSION || version > SpdyConstants.SPDY_MAX_VERSION) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"unsupported version: " + version); "unsupported version: " + version);

View File

@ -16,10 +16,12 @@
package io.netty.handler.codec.http.websocketx; package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOperationHandlerAdapter;
import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedMessageChannel; import io.netty.channel.embedded.EmbeddedMessageChannel;
import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest;
@ -138,7 +140,19 @@ public class WebSocketServerProtocolHandlerTest {
return (FullHttpResponse) outbound.poll(); return (FullHttpResponse) outbound.poll();
} }
private static class MockOutboundHandler extends ChannelOutboundMessageHandlerAdapter<Object> { private static class MockOutboundHandler
extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<Object> {
@Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer();
}
@Override
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free();
}
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { public void flush(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
//NoOp //NoOp

View File

@ -34,12 +34,8 @@ public class SocksMessageEncoder extends MessageToByteEncoder<SocksMessage> {
return name; return name;
} }
public SocksMessageEncoder() {
super(SocksMessage.class);
}
@Override @Override
public void encode(ChannelHandlerContext ctx, SocksMessage msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, SocksMessage msg, ByteBuf out) throws Exception {
msg.encodeAsByteBuf(out); msg.encodeAsByteBuf(out);
} }
} }

View File

@ -19,43 +19,49 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.internal.TypeParameterFinder;
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<I> { implements ChannelInboundByteHandler, ChannelOutboundMessageHandler<I> {
private final Class<?>[] encodableMessageTypes; private final Class<?> acceptedOutboundMsgType;
private final MessageToByteEncoder<I> encoder; private final MessageToByteEncoder<I> encoder = new MessageToByteEncoder<I>() {
private final ByteToMessageDecoder decoder; @Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
return ByteToMessageCodec.this.acceptOutboundMessage(msg);
}
protected ByteToMessageCodec(Class<?>... encodableMessageTypes) { @Override
this.encodableMessageTypes = ChannelHandlerUtil.acceptedMessageTypes(encodableMessageTypes); protected void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception {
encoder = new MessageToByteEncoder<I>() { ByteToMessageCodec.this.encode(ctx, msg, out);
@Override }
public boolean isEncodable(Object msg) throws Exception { };
return ByteToMessageCodec.this.isEncodable(msg);
}
@Override private final ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
protected void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception { @Override
ByteToMessageCodec.this.encode(ctx, msg, out); public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
} return ByteToMessageCodec.this.decode(ctx, in);
}; }
decoder = new ByteToMessageDecoder() { @Override
@Override protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { return ByteToMessageCodec.this.decodeLast(ctx, in);
return ByteToMessageCodec.this.decode(ctx, in); }
} };
@Override protected ByteToMessageCodec() {
protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { this(ByteToMessageCodec.class, 0);
return ByteToMessageCodec.this.decodeLast(ctx, in); }
}
}; protected ByteToMessageCodec(
@SuppressWarnings("rawtypes")
Class<? extends ByteToMessageCodec> parameterizedHandlerType,
int messageTypeParamIndex) {
acceptedOutboundMsgType =
TypeParameterFinder.findActualTypeParameter(this, parameterizedHandlerType, messageTypeParamIndex);
} }
@Override @Override
@ -98,8 +104,8 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
encoder.flush(ctx, promise); encoder.flush(ctx, promise);
} }
public boolean isEncodable(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return ChannelHandlerUtil.acceptMessage(encodableMessageTypes, msg); return acceptedOutboundMsgType.isInstance(msg);
} }
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

View File

@ -78,10 +78,7 @@ public class LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
* @throws IllegalArgumentException * @throws IllegalArgumentException
* if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8 * if {@code lengthFieldLength} is not 1, 2, 3, 4, or 8
*/ */
public LengthFieldPrepender( public LengthFieldPrepender(int lengthFieldLength, boolean lengthIncludesLengthFieldLength) {
int lengthFieldLength, boolean lengthIncludesLengthFieldLength) {
super(ByteBuf.class);
if (lengthFieldLength != 1 && lengthFieldLength != 2 && if (lengthFieldLength != 1 && lengthFieldLength != 2 &&
lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 3 && lengthFieldLength != 4 &&
lengthFieldLength != 8) { lengthFieldLength != 8) {

View File

@ -21,8 +21,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelPromise;
/** /**
@ -44,55 +42,30 @@ import io.netty.channel.ChannelPromise;
*/ */
public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
private final Class<?>[] acceptedMsgTypes;
/** /**
* The types which will be accepted by the encoder. If a received message is an other type it will be just forwared * The types which will be accepted by the encoder. If a received message is an other type it will be just forwared
* to the next {@link ChannelOutboundMessageHandler} in the {@link ChannelPipeline} * to the next {@link ChannelOutboundMessageHandler} in the {@link ChannelPipeline}
*/ */
protected MessageToByteEncoder(Class<?>... acceptedMsgTypes) { protected MessageToByteEncoder() {
this.acceptedMsgTypes = ChannelHandlerUtil.acceptedMessageTypes(acceptedMsgTypes); this(MessageToByteEncoder.class, 0);
}
protected MessageToByteEncoder(
@SuppressWarnings("rawtypes")
Class<? extends MessageToByteEncoder> parameterizedHandlerType,
int messageTypeParamIndex) {
super(parameterizedHandlerType, messageTypeParamIndex);
} }
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { protected void flush(ChannelHandlerContext ctx, I msg) throws Exception {
MessageBuf<I> in = ctx.outboundMessageBuffer(); try {
ByteBuf out = ctx.nextOutboundByteBuffer(); encode(ctx, msg, ctx.nextOutboundByteBuffer());
} catch (CodecException e) {
for (;;) { throw e;
Object msg = in.poll(); } catch (Exception e) {
if (msg == null) { throw new CodecException(e);
break;
}
if (!isEncodable(msg)) {
ChannelHandlerUtil.addToNextOutboundBuffer(ctx, msg);
continue;
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
try {
encode(ctx, imsg, out);
} catch (Throwable t) {
if (t instanceof CodecException) {
ctx.fireExceptionCaught(t);
} else {
ctx.fireExceptionCaught(new EncoderException(t));
}
}
} }
ctx.flush(promise);
}
/**
* Returns {@code true} if and only if the specified message can be encoded by this encoder.
*
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return ChannelHandlerUtil.acceptMessage(acceptedMsgTypes, msg);
} }
/** /**

View File

@ -59,7 +59,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
private final MessageToMessageEncoder<Object> encoder = private final MessageToMessageEncoder<Object> encoder =
new MessageToMessageEncoder<Object>() { new MessageToMessageEncoder<Object>() {
@Override @Override
public boolean isEncodable(Object msg) throws Exception { public boolean acceptOutboundMessage(Object msg) throws Exception {
return MessageToMessageCodec.this.acceptOutboundMessage(msg); return MessageToMessageCodec.this.acceptOutboundMessage(msg);
} }

View File

@ -17,12 +17,9 @@ package io.netty.handler.codec;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PartialFlushException;
/** /**
* {@link ChannelOutboundMessageHandlerAdapter} which encodes from one message to an other message * {@link ChannelOutboundMessageHandlerAdapter} which encodes from one message to an other message
@ -47,77 +44,32 @@ import io.netty.channel.PartialFlushException;
*/ */
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
private final Class<?>[] acceptedMsgTypes;
/** /**
* The types which will be accepted by the decoder. If a received message is an other type it will be just forwared * The types which will be accepted by the decoder. If a received message is an other type it will be just forwared
* to the next {@link ChannelOutboundMessageHandler} in the {@link ChannelPipeline} * to the next {@link ChannelOutboundMessageHandler} in the {@link ChannelPipeline}
*/ */
protected MessageToMessageEncoder(Class<?>... acceptedMsgTypes) { protected MessageToMessageEncoder() {
this.acceptedMsgTypes = ChannelHandlerUtil.acceptedMessageTypes(acceptedMsgTypes); super(MessageToMessageEncoder.class, 0);
}
protected MessageToMessageEncoder(
@SuppressWarnings("rawtypes")
Class<? extends ChannelOutboundMessageHandlerAdapter> parameterizedHandlerType,
int messageTypeParamIndex) {
super(parameterizedHandlerType, messageTypeParamIndex);
} }
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { protected final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
MessageBuf<I> in = ctx.outboundMessageBuffer(); try {
boolean encoded = false; Object encoded = encode(ctx, msg);
ctx.nextOutboundMessageBuffer().add(encoded);
for (;;) { } catch (CodecException e) {
try { throw e;
Object msg = in.poll(); } catch (Exception e) {
if (msg == null) { throw new CodecException(e);
break;
}
if (!isEncodable(msg)) {
ChannelHandlerUtil.addToNextOutboundBuffer(ctx, msg);
continue;
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
boolean free = true;
try {
Object omsg = encode(ctx, imsg);
if (omsg == null) {
// encode() might be waiting for more inbound messages to generate
// an aggregated message - keep polling.
continue;
}
if (omsg == imsg) {
free = false;
}
encoded = true;
ChannelHandlerUtil.unfoldAndAdd(ctx, omsg, false);
} finally {
if (free) {
freeOutboundMessage(imsg);
}
}
} catch (Throwable t) {
Throwable cause;
if (t instanceof CodecException) {
cause = t;
} else {
cause = new EncoderException(t);
}
if (encoded) {
cause = new PartialFlushException("Unable to encoded all messages", cause);
}
promise.setFailure(cause);
return;
}
} }
ctx.flush(promise);
}
/**
* Returns {@code true} if and only if the specified message can be encoded by this encoder.
*
* @param msg the message
*/
public boolean isEncodable(Object msg) throws Exception {
return ChannelHandlerUtil.acceptMessage(acceptedMsgTypes, msg);
} }
/** /**
@ -131,13 +83,4 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
* @throws Exception is thrown if an error accour * @throws Exception is thrown if an error accour
*/ */
protected abstract Object encode(ChannelHandlerContext ctx, I msg) throws Exception; protected abstract Object encode(ChannelHandlerContext ctx, I msg) throws Exception;
/**
* Is called after a message was processed via {@link #encode(ChannelHandlerContext, Object)} to free
* up any resources that is held by the inbound message. You may want to override this if your implementation
* just pass-through the input message or need it for later usage.
*/
protected void freeOutboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg);
}
} }

View File

@ -54,8 +54,6 @@ public class Base64Encoder extends MessageToMessageEncoder<ByteBuf> {
} }
public Base64Encoder(boolean breakLines, Base64Dialect dialect) { public Base64Encoder(boolean breakLines, Base64Dialect dialect) {
super(ByteBuf.class);
if (dialect == null) { if (dialect == null) {
throw new NullPointerException("dialect"); throw new NullPointerException("dialect");
} }

View File

@ -17,12 +17,10 @@ package io.netty.handler.codec.bytes;
import io.netty.buffer.BufType; import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.LengthFieldPrepender;
@ -62,41 +60,20 @@ public class ByteArrayEncoder extends ChannelOutboundMessageHandlerAdapter<byte[
} }
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { protected void flush(ChannelHandlerContext ctx, byte[] msg) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer(); if (msg.length == 0) {
MessageBuf<Object> msgOut = ctx.nextOutboundMessageBuffer(); return;
ByteBuf byteOut = ctx.nextOutboundByteBuffer(); }
try { switch (nextBufferType) {
for (;;) { case BYTE:
Object m = in.poll(); ctx.nextOutboundByteBuffer().writeBytes(msg);
if (m == null) { break;
break; case MESSAGE:
} ctx.nextOutboundMessageBuffer().add(Unpooled.wrappedBuffer(msg));
break;
if (!(m instanceof byte[])) { default:
msgOut.add(m); throw new Error();
continue;
}
byte[] a = (byte[]) m;
if (a.length == 0) {
continue;
}
switch (nextBufferType) {
case BYTE:
byteOut.writeBytes(a);
break;
case MESSAGE:
msgOut.add(Unpooled.wrappedBuffer(a));
break;
default:
throw new Error();
}
}
} finally {
ctx.flush(promise);
} }
} }
} }

View File

@ -15,7 +15,9 @@
*/ */
package io.netty.handler.codec.protobuf; package io.netty.handler.codec.protobuf;
import static io.netty.buffer.Unpooled.*; import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -24,8 +26,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.MessageToMessageEncoder;
import com.google.protobuf.Message; import static io.netty.buffer.Unpooled.*;
import com.google.protobuf.MessageLite;
/** /**
* Encodes the requested <a href="http://code.google.com/p/protobuf/">Google * Encodes the requested <a href="http://code.google.com/p/protobuf/">Google
@ -56,14 +57,10 @@ import com.google.protobuf.MessageLite;
* @apiviz.landmark * @apiviz.landmark
*/ */
@Sharable @Sharable
public class ProtobufEncoder extends MessageToMessageEncoder<Object> { public class ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> {
public ProtobufEncoder() {
super(MessageLite.class, MessageLite.Builder.class);
}
@Override @Override
protected Object encode(ChannelHandlerContext ctx, Object msg) throws Exception { protected Object encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg) throws Exception {
if (msg instanceof MessageLite) { if (msg instanceof MessageLite) {
return wrappedBuffer(((MessageLite) msg).toByteArray()); return wrappedBuffer(((MessageLite) msg).toByteArray());
} }

View File

@ -39,13 +39,6 @@ import io.netty.handler.codec.MessageToByteEncoder;
@Sharable @Sharable
public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> { public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
/**
* Creates a new instance.
*/
public ProtobufVarint32LengthFieldPrepender() {
super(ByteBuf.class);
}
@Override @Override
protected void encode( protected void encode(
ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {

View File

@ -34,7 +34,7 @@ import java.io.Serializable;
* This encoder is interoperable with the standard Java object streams such as * This encoder is interoperable with the standard Java object streams such as
* {@link ObjectInputStream} and {@link ObjectOutputStream}. * {@link ObjectInputStream} and {@link ObjectOutputStream}.
*/ */
public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> { public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable> {
private static final AttributeKey<ObjectOutputStream> OOS = private static final AttributeKey<ObjectOutputStream> OOS =
new AttributeKey<ObjectOutputStream>(CompatibleObjectEncoder.class.getName() + ".oos"); new AttributeKey<ObjectOutputStream>(CompatibleObjectEncoder.class.getName() + ".oos");
@ -59,8 +59,6 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> {
* the long term. * the long term.
*/ */
public CompatibleObjectEncoder(int resetInterval) { public CompatibleObjectEncoder(int resetInterval) {
super(Serializable.class);
if (resetInterval < 0) { if (resetInterval < 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"resetInterval: " + resetInterval); "resetInterval: " + resetInterval);
@ -78,7 +76,7 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Object> {
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
Attribute<ObjectOutputStream> oosAttr = ctx.attr(OOS); Attribute<ObjectOutputStream> oosAttr = ctx.attr(OOS);
ObjectOutputStream oos = oosAttr.get(); ObjectOutputStream oos = oosAttr.get();
if (oos == null) { if (oos == null) {

View File

@ -36,15 +36,11 @@ import java.io.Serializable;
* @apiviz.has io.netty.handler.codec.serialization.ObjectEncoderOutputStream - - - compatible with * @apiviz.has io.netty.handler.codec.serialization.ObjectEncoderOutputStream - - - compatible with
*/ */
@Sharable @Sharable
public class ObjectEncoder extends MessageToByteEncoder<Object> { public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
public ObjectEncoder() {
super(Serializable.class);
}
@Override @Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
int startIdx = out.writerIndex(); int startIdx = out.writerIndex();
ByteBufOutputStream bout = new ByteBufOutputStream(out); ByteBufOutputStream bout = new ByteBufOutputStream(out);

View File

@ -17,13 +17,11 @@ package io.netty.handler.codec.string;
import io.netty.buffer.BufType; import io.netty.buffer.BufType;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -79,39 +77,18 @@ public class StringEncoder extends ChannelOutboundMessageHandlerAdapter<CharSequ
} }
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { protected void flush(ChannelHandlerContext ctx, CharSequence msg) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer(); ByteBuf encoded = Unpooled.copiedBuffer(msg, charset);
MessageBuf<Object> msgOut = ctx.nextOutboundMessageBuffer();
ByteBuf byteOut = ctx.nextOutboundByteBuffer();
try { switch (nextBufferType) {
for (;;) { case BYTE:
Object m = in.poll(); ctx.nextOutboundByteBuffer().writeBytes(encoded);
if (m == null) { break;
break; case MESSAGE:
} ctx.nextOutboundMessageBuffer().add(encoded);
break;
if (!(m instanceof CharSequence)) { default:
msgOut.add(m); throw new Error();
continue;
}
CharSequence s = (CharSequence) m;
ByteBuf encoded = Unpooled.copiedBuffer(s, charset);
switch (nextBufferType) {
case BYTE:
byteOut.writeBytes(encoded);
break;
case MESSAGE:
msgOut.add(encoded);
break;
default:
throw new Error();
}
}
} finally {
ctx.flush(promise);
} }
} }
} }

View File

@ -28,13 +28,8 @@ import java.math.BigInteger;
*/ */
public class NumberEncoder extends MessageToByteEncoder<Number> { public class NumberEncoder extends MessageToByteEncoder<Number> {
public NumberEncoder() {
super(Number.class);
}
@Override @Override
public void encode( public void encode(ChannelHandlerContext ctx, Number msg, ByteBuf out) throws Exception {
ChannelHandlerContext ctx, Number msg, ByteBuf out) throws Exception {
// Convert to a BigInteger first for easier implementation. // Convert to a BigInteger first for easier implementation.
BigInteger v; BigInteger v;
if (msg instanceof BigInteger) { if (msg instanceof BigInteger) {

View File

@ -17,6 +17,7 @@ package io.netty.channel;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.util.internal.TypeParameterFinder;
/** /**
* Abstract base class which handles messages of a specific type. * Abstract base class which handles messages of a specific type.
@ -26,6 +27,20 @@ import io.netty.buffer.Unpooled;
public abstract class ChannelOutboundMessageHandlerAdapter<I> public abstract class ChannelOutboundMessageHandlerAdapter<I>
extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> { extends ChannelOperationHandlerAdapter implements ChannelOutboundMessageHandler<I> {
private final Class<?> acceptedMsgType;
protected ChannelOutboundMessageHandlerAdapter() {
this(ChannelOutboundMessageHandlerAdapter.class, 0);
}
protected ChannelOutboundMessageHandlerAdapter(
@SuppressWarnings("rawtypes")
Class<? extends ChannelOutboundMessageHandlerAdapter> parameterizedHandlerType,
int messageTypeParamIndex) {
acceptedMsgType = TypeParameterFinder.findActualTypeParameter(
this, parameterizedHandlerType, messageTypeParamIndex);
}
@Override @Override
public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public MessageBuf<I> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.messageBuffer(); return Unpooled.messageBuffer();
@ -35,4 +50,72 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
ctx.outboundMessageBuffer().free(); ctx.outboundMessageBuffer().free();
} }
/**
* Returns {@code true} if and only if the specified message can be handled by this handler.
*
* @param msg the message
*/
public boolean acceptOutboundMessage(Object msg) throws Exception {
return acceptedMsgType.isInstance(msg);
}
@Override
public final void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
MessageBuf<Object> in = ctx.outboundMessageBuffer();
MessageBuf<Object> out = null;
ChannelPromise nextPromise = promise;
try {
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
try {
if (!acceptOutboundMessage(msg)) {
if (out == null) {
out = ctx.nextOutboundMessageBuffer();
}
out.add(msg);
continue;
}
@SuppressWarnings("unchecked")
I imsg = (I) msg;
try {
flush(ctx, imsg);
} finally {
freeOutboundMessage(imsg);
}
} catch (Throwable t) {
if (!promise.isDone()) {
promise.setFailure(new PartialFlushException(
"faied to encode all messages associated with the future", t));
nextPromise = ctx.newPromise();
}
}
}
} finally {
ctx.flush(nextPromise);
}
}
/**
* Is called once a message is being flushed.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
* @param msg the message to handle
* @throws Exception thrown when an error accour
*/
protected abstract void flush(ChannelHandlerContext ctx, I msg) throws Exception;
/**
* Is called after a message was processed via {@link #flush(ChannelHandlerContext, Object)} to free
* up any resources that is held by the outbound message. You may want to override this if your implementation
* just pass-through the input message or need it for later usage.
*/
protected void freeOutboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg);
}
} }