Automatically detect the message types in MessageToMessageCodec
This commit is contained in:
parent
71136390f1
commit
1640b1fea6
@ -47,22 +47,13 @@ import java.util.Queue;
|
||||
* so that this handler can intercept HTTP responses before {@link HttpObjectEncoder}
|
||||
* converts them into {@link ByteBuf}s.
|
||||
*/
|
||||
public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessage, Object> {
|
||||
public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessage, HttpObject> {
|
||||
|
||||
private final Queue<String> acceptEncodingQueue = new ArrayDeque<String>();
|
||||
private EmbeddedByteChannel encoder;
|
||||
private HttpMessage message;
|
||||
private boolean encodeStarted;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*/
|
||||
protected HttpContentEncoder() {
|
||||
super(
|
||||
new Class<?>[] { HttpMessage.class },
|
||||
new Class<?>[] { HttpObject.class });
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, HttpMessage msg)
|
||||
throws Exception {
|
||||
@ -76,7 +67,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object encode(ChannelHandlerContext ctx, Object msg)
|
||||
public Object encode(ChannelHandlerContext ctx, HttpObject msg)
|
||||
throws Exception {
|
||||
|
||||
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().code() == 100) {
|
||||
@ -176,7 +167,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void freeOutboundMessage(Object msg) throws Exception {
|
||||
protected void freeOutboundMessage(HttpObject msg) throws Exception {
|
||||
if (encoder == null) {
|
||||
// if the decoder was null we returned the original message so we are not allowed to free it
|
||||
return;
|
||||
|
@ -32,8 +32,9 @@ public class SpdyHttpResponseStreamIdHandler extends
|
||||
private static final Integer NO_ID = -1;
|
||||
private final Queue<Integer> ids = new LinkedList<Integer>();
|
||||
|
||||
public SpdyHttpResponseStreamIdHandler() {
|
||||
super(new Class<?>[] { HttpMessage.class, SpdyRstStreamFrame.class }, new Class<?>[] { HttpMessage.class });
|
||||
@Override
|
||||
public boolean acceptInboundMessage(Object msg) throws Exception {
|
||||
return msg instanceof HttpMessage || msg instanceof SpdyRstStreamFrame;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -20,8 +20,11 @@ import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelHandlerUtil;
|
||||
import io.netty.channel.ChannelInboundMessageHandler;
|
||||
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||
import io.netty.channel.ChannelOutboundMessageHandler;
|
||||
import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.util.internal.TypeParameterFinder;
|
||||
|
||||
/**
|
||||
* A Codec for on-the-fly encoding/decoding of message.
|
||||
@ -53,59 +56,73 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
|
||||
implements ChannelInboundMessageHandler<INBOUND_IN>,
|
||||
ChannelOutboundMessageHandler<OUTBOUND_IN> {
|
||||
|
||||
private final MessageToMessageEncoder<OUTBOUND_IN> encoder =
|
||||
new MessageToMessageEncoder<OUTBOUND_IN>() {
|
||||
private final MessageToMessageEncoder<Object> encoder =
|
||||
new MessageToMessageEncoder<Object>() {
|
||||
@Override
|
||||
public boolean isEncodable(Object msg) throws Exception {
|
||||
return MessageToMessageCodec.this.isEncodable(msg);
|
||||
return MessageToMessageCodec.this.acceptOutboundMessage(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception {
|
||||
return MessageToMessageCodec.this.encode(ctx, msg);
|
||||
@SuppressWarnings("unchecked")
|
||||
public Object encode(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
return MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void freeOutboundMessage(OUTBOUND_IN msg) throws Exception {
|
||||
MessageToMessageCodec.this.freeOutboundMessage(msg);
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void freeOutboundMessage(Object msg) throws Exception {
|
||||
MessageToMessageCodec.this.freeOutboundMessage((OUTBOUND_IN) msg);
|
||||
}
|
||||
};
|
||||
|
||||
private final MessageToMessageDecoder<INBOUND_IN> decoder =
|
||||
new MessageToMessageDecoder<INBOUND_IN>() {
|
||||
private final MessageToMessageDecoder<Object> decoder =
|
||||
new MessageToMessageDecoder<Object>() {
|
||||
|
||||
@Override
|
||||
public boolean acceptInboundMessage(Object msg) throws Exception {
|
||||
return isDecodable(msg);
|
||||
return MessageToMessageCodec.this.acceptInboundMessage(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object decode(ChannelHandlerContext ctx, INBOUND_IN msg) throws Exception {
|
||||
return MessageToMessageCodec.this.decode(ctx, msg);
|
||||
@SuppressWarnings("unchecked")
|
||||
public Object decode(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
return MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void freeInboundMessage(INBOUND_IN msg) throws Exception {
|
||||
MessageToMessageCodec.this.freeInboundMessage(msg);
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void freeInboundMessage(Object msg) throws Exception {
|
||||
MessageToMessageCodec.this.freeInboundMessage((INBOUND_IN) msg);
|
||||
}
|
||||
};
|
||||
|
||||
private final Class<?>[] acceptedInboundMsgTypes;
|
||||
private final Class<?>[] acceptedOutboundMsgTypes;
|
||||
private final Class<?> acceptedInboundMsgType;
|
||||
private final Class<?> acceptedOutboundMsgType;
|
||||
|
||||
protected MessageToMessageCodec() {
|
||||
this(null, null);
|
||||
acceptedInboundMsgType = TypeParameterFinder.findActualTypeParameter(this, MessageToMessageCodec.class, 0);
|
||||
acceptedOutboundMsgType = TypeParameterFinder.findActualTypeParameter(this, MessageToMessageCodec.class, 1);
|
||||
}
|
||||
|
||||
protected MessageToMessageCodec(
|
||||
Class<?>[] acceptedInboundMsgTypes, Class<?>[] acceptedOutboundMsgTypes) {
|
||||
this.acceptedInboundMsgTypes = ChannelHandlerUtil.acceptedMessageTypes(acceptedInboundMsgTypes);
|
||||
this.acceptedOutboundMsgTypes = ChannelHandlerUtil.acceptedMessageTypes(acceptedOutboundMsgTypes);
|
||||
@SuppressWarnings("rawtypes")
|
||||
Class<? extends ChannelInboundMessageHandlerAdapter> parameterizedInboundHandlerType,
|
||||
int inboundMessageTypeParamIndex,
|
||||
@SuppressWarnings("rawtypes")
|
||||
Class<? extends ChannelOutboundMessageHandlerAdapter> parameterizedOutboundHandlerType,
|
||||
int outboundMessageTypeParamIndex) {
|
||||
|
||||
acceptedInboundMsgType = TypeParameterFinder.findActualTypeParameter(
|
||||
this, parameterizedInboundHandlerType, inboundMessageTypeParamIndex);
|
||||
acceptedOutboundMsgType = TypeParameterFinder.findActualTypeParameter(
|
||||
this, parameterizedOutboundHandlerType, outboundMessageTypeParamIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public MessageBuf<INBOUND_IN> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return decoder.newInboundBuffer(ctx);
|
||||
return (MessageBuf<INBOUND_IN>) decoder.newInboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -114,8 +131,9 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public MessageBuf<OUTBOUND_IN> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
|
||||
return encoder.newOutboundBuffer(ctx);
|
||||
return (MessageBuf<OUTBOUND_IN>) encoder.newOutboundBuffer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -139,8 +157,8 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
|
||||
*
|
||||
* @param msg the message
|
||||
*/
|
||||
public boolean isDecodable(Object msg) throws Exception {
|
||||
return ChannelHandlerUtil.acceptMessage(acceptedInboundMsgTypes, msg);
|
||||
public boolean acceptInboundMessage(Object msg) throws Exception {
|
||||
return acceptedInboundMsgType.isInstance(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -148,8 +166,8 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
|
||||
*
|
||||
* @param msg the message
|
||||
*/
|
||||
public boolean isEncodable(Object msg) throws Exception {
|
||||
return ChannelHandlerUtil.acceptMessage(acceptedOutboundMsgTypes, msg);
|
||||
public boolean acceptOutboundMessage(Object msg) throws Exception {
|
||||
return acceptedOutboundMsgType.isInstance(msg);
|
||||
}
|
||||
|
||||
protected abstract Object encode(ChannelHandlerContext ctx, OUTBOUND_IN msg) throws Exception;
|
||||
|
@ -52,6 +52,5 @@ public final class TypeParameterFinder {
|
||||
return messageType;
|
||||
}
|
||||
|
||||
|
||||
private TypeParameterFinder() { }
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user