Add a replace(..) method to FrameDecoder and also to ReplayDecoder as it now extend FrameDecoder. This also fix #332

This commit is contained in:
Norman Maurer 2012-05-19 16:35:22 +02:00
parent ed538209e5
commit 3ca2a53e91
4 changed files with 113 additions and 97 deletions

View File

@ -240,10 +240,10 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder",
new WebSocket00FrameDecoder(this.getMaxFramePayloadLength()));
setHandshakeComplete();
channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder",
new WebSocket00FrameDecoder(this.getMaxFramePayloadLength()));
}
private String insertRandomCharacters(String key) {

View File

@ -224,9 +224,10 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL);
setActualSubprotocol(subprotocol);
channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder",
setHandshakeComplete();
channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder",
new WebSocket13FrameDecoder(false, allowExtensions, this.getMaxFramePayloadLength()));
setHandshakeComplete();
}
}

View File

@ -28,9 +28,9 @@ import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.LifeCycleAwareChannelHandler;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.replay.ReplayingDecoder;
/**
* Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
@ -174,11 +174,12 @@ import io.netty.handler.codec.replay.ReplayingDecoder;
* </pre>
* @apiviz.landmark
*/
public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
private final boolean unfold;
private ChannelBuffer cumulation;
protected ChannelBuffer cumulation;
private volatile ChannelHandlerContext ctx;
protected FrameDecoder() {
this(false);
}
@ -335,7 +336,7 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
}
}
private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
protected final void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
@ -353,7 +354,11 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
}
}
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
/**
* Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)}
*
*/
protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
try {
ChannelBuffer cumulation = this.cumulation;
@ -392,4 +397,76 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return factory.getBuffer(Math.max(minimumCapacity, 256));
}
/**
* Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All
* remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used
* as replacement
*
*/
public void replace(String handlerName, ChannelHandler handler) {
if (ctx == null) {
throw new IllegalStateException("Replace cann only be called once the FrameDecoder is added to the ChannelPipeline");
}
ChannelPipeline pipeline = ctx.getPipeline();
pipeline.addAfter(ctx.getName(), handlerName, handler);
try {
if (cumulation != null) {
Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes()));
}
} finally {
pipeline.remove(this);
}
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you muse use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected ChannelBuffer internalBuffer() {
ChannelBuffer buf = this.cumulation;
if (buf == null) {
return ChannelBuffers.EMPTY_BUFFER;
}
return buf;
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Nothing to do..
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Nothing to do..
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Nothing to do..
}
}

View File

@ -15,22 +15,18 @@
*/
package io.netty.handler.codec.replay;
import java.net.SocketAddress;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.handler.codec.frame.FrameDecoder;
import java.net.SocketAddress;
/**
* A specialized variation of {@link FrameDecoder} which enables implementation
* of a non-blocking decoder in the blocking I/O paradigm.
@ -285,11 +281,9 @@ import io.netty.handler.codec.frame.FrameDecoder;
* @apiviz.has io.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
*/
public abstract class ReplayingDecoder<T extends Enum<T>>
extends SimpleChannelUpstreamHandler {
extends FrameDecoder {
private ChannelBuffer cumulation;
private final boolean unfold;
private ReplayingDecoderBuffer replayable;
private T state;
private int checkpoint;
@ -315,8 +309,8 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
}
protected ReplayingDecoder(T initialState, boolean unfold) {
super(unfold);
this.state = initialState;
this.unfold = unfold;
}
/**
@ -358,29 +352,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
return oldState;
}
/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you muse use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}
/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected ChannelBuffer internalBuffer() {
ChannelBuffer buf = this.cumulation;
if (buf == null) {
return ChannelBuffers.EMPTY_BUFFER;
}
return buf;
}
/**
* Decodes the received packets so far into a frame.
*
@ -416,6 +387,21 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
return decode(ctx, channel, buffer, state);
}
/**
* Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method should be never used by {@link ReplayingDecoder} itself.
* But to be safe we should handle it anyway
*/
@Override
protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
return decode(ctx, channel, buffer, state);
}
@Override
protected final Object decodeLast(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
return decodeLast(ctx, channel, buffer, state);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
@ -520,24 +506,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
}
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void channelClosed(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
ctx.sendUpstream(e);
}
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
while (input.readable()) {
int oldReaderIndex = checkpoint = input.readerIndex();
@ -580,30 +548,12 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
}
// A successful decode
unfoldAndFireMessageReceived(context, result, remoteAddress);
unfoldAndFireMessageReceived(context, remoteAddress, result);
}
}
private void unfoldAndFireMessageReceived(
ChannelHandlerContext context, Object result, SocketAddress remoteAddress) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else if (result instanceof Iterable<?>) {
for (Object r: (Iterable<?>) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
}
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
@Override
protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
try {
ChannelBuffer cumulation = this.cumulation;
@ -626,7 +576,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
// notify a user that the connection was closed explicitly.
Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
if (partiallyDecoded != null) {
unfoldAndFireMessageReceived(ctx, partiallyDecoded, null);
unfoldAndFireMessageReceived(ctx, null, partiallyDecoded);
}
} catch (ReplayError replay) {
// Ignore
@ -635,17 +585,5 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
ctx.sendUpstream(e);
}
}
/**
* Create a new {@link ChannelBuffer} which is used for the cumulation.
* Sub-classes may override this.
*
* @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation
*/
protected ChannelBuffer newCumulationBuffer(
ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return factory.getBuffer(Math.max(minimumCapacity, 256));
}
}