Add a replace(..) method to FrameDecoder and also to ReplayDecoder as it now extend FrameDecoder. This also fix #332.

This commit is contained in:
norman 2012-05-18 08:49:45 +02:00
parent e841e85bdc
commit 288ed13b6b
5 changed files with 111 additions and 94 deletions

View File

@ -28,9 +28,9 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
/**
* Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
@ -175,10 +175,11 @@ import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
*
* @apiviz.landmark
*/
public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
private final boolean unfold;
private ChannelBuffer cumulation;
protected ChannelBuffer cumulation;
private volatile ChannelHandlerContext ctx;
protected FrameDecoder() {
this(false);
@ -336,7 +337,7 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
}
}
private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
protected final void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
@ -354,7 +355,11 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
}
}
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
/**
* Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
*
*/
protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
try {
ChannelBuffer cumulation = this.cumulation;
@ -393,4 +398,72 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return factory.getBuffer(Math.max(minimumCapacity, 256));
}
/**
* Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All
* remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
* as replacement
*
*/
public void replace(String handlerName, ChannelHandler handler) {
if (ctx == null) {
throw new IllegalStateException("Replace cann only be called once the FrameDecoder is added to the ChannelPipeline");
}
ChannelPipeline pipeline = ctx.getPipeline();
pipeline.addAfter(ctx.getName(), handlerName, handler);
try {
if (cumulation != null) {
Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes()));
}
} finally {
pipeline.remove(this);
}
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you muse use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected ChannelBuffer internalBuffer() {
ChannelBuffer buf = this.cumulation;
if (buf == null) {
return ChannelBuffers.EMPTY_BUFFER;
}
return buf;
}
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Nothing to do..
}
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Nothing to do..
}
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Nothing to do..
}
}

View File

@ -240,10 +240,12 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder",
channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder",
new WebSocket00FrameDecoder(getMaxFramePayloadLength()));
setHandshakeComplete();
}
private static String insertRandomCharacters(String key) {

View File

@ -227,10 +227,12 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder",
channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder",
new WebSocket08FrameDecoder(false, allowExtensions, this.getMaxFramePayloadLength()));
setHandshakeComplete();
}
}

View File

@ -224,9 +224,11 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder",
channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder",
new WebSocket13FrameDecoder(false, allowExtensions, this.getMaxFramePayloadLength()));
setHandshakeComplete();
}
}

View File

@ -18,17 +18,13 @@ package org.jboss.netty.handler.codec.replay;
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
@ -286,11 +282,9 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
* @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
*/
public abstract class ReplayingDecoder<T extends Enum<T>>
extends SimpleChannelUpstreamHandler {
extends FrameDecoder {
private ChannelBuffer cumulation;
private final boolean unfold;
private ReplayingDecoderBuffer replayable;
private T state;
private int checkpoint;
@ -316,8 +310,8 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
}
protected ReplayingDecoder(T initialState, boolean unfold) {
super(unfold);
this.state = initialState;
this.unfold = unfold;
}
/**
@ -359,29 +353,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
return oldState;
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you muse use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected ChannelBuffer internalBuffer() {
ChannelBuffer buf = this.cumulation;
if (buf == null) {
return ChannelBuffers.EMPTY_BUFFER;
}
return buf;
}
/**
* Decodes the received packets so far into a frame.
*
@ -417,6 +388,21 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
return decode(ctx, channel, buffer, state);
}
/**
* Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method should be never used by {@link ReplayingDecoder} itself.
* But to be safe we should handle it anyway
*/
@Override
protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
return decode(ctx, channel, buffer, state);
}
@Override
protected final Object decodeLast(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
return decodeLast(ctx, channel, buffer, state);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
@ -521,24 +507,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
}
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void channelClosed(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
ctx.sendUpstream(e);
}
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
while (input.readable()) {
int oldReaderIndex = checkpoint = input.readerIndex();
@ -581,30 +549,12 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
}
// A successful decode
unfoldAndFireMessageReceived(context, result, remoteAddress);
unfoldAndFireMessageReceived(context, remoteAddress, result);
}
}
private void unfoldAndFireMessageReceived(
ChannelHandlerContext context, Object result, SocketAddress remoteAddress) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else if (result instanceof Iterable<?>) {
for (Object r: (Iterable<?>) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
}
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
@Override
protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
try {
ChannelBuffer cumulation = this.cumulation;
@ -627,7 +577,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
// notify a user that the connection was closed explicitly.
Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
if (partiallyDecoded != null) {
unfoldAndFireMessageReceived(ctx, partiallyDecoded, null);
unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
}
} catch (ReplayError replay) {
// Ignore
@ -636,17 +586,5 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
ctx.sendUpstream(e);
}
}
/**
* Create a new {@link ChannelBuffer} which is used for the cumulation.
* Sub-classes may override this.
*
* @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation
*/
protected ChannelBuffer newCumulationBuffer(
ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return factory.getBuffer(Math.max(minimumCapacity, 256));
}
}