* Fixed compilation errors

* Made sure cumulative buffers are initialized as early as possible
This commit is contained in:
Trustin Lee 2008-12-09 07:17:37 +00:00
parent 5eec9ac58c
commit f5fb85a0af
3 changed files with 64 additions and 11 deletions

View File

@ -355,7 +355,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
h.beforeAdd(ctx); h.beforeAdd(ctx);
} catch (Throwable t) { } catch (Throwable t) {
throw new ChannelHandlerLifeCycleException( throw new ChannelHandlerLifeCycleException(
h.getName() + h.getClass().getName() +
".beforeAdd() has thrown an exception; not adding.", t); ".beforeAdd() has thrown an exception; not adding.", t);
} }
} }
@ -385,11 +385,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
if (removed) { if (removed) {
throw new ChannelHandlerLifeCycleException( throw new ChannelHandlerLifeCycleException(
h.getName() + h.getClass().getName() +
".afterAdd() has thrown an exception; removed.", t); ".afterAdd() has thrown an exception; removed.", t);
} else { } else {
throw new ChannelHandlerLifeCycleException( throw new ChannelHandlerLifeCycleException(
h.getName() + h.getClass().getName() +
".afterAdd() has thrown an exception; also failed to remove.", t); ".afterAdd() has thrown an exception; also failed to remove.", t);
} }
} }
@ -411,7 +411,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
h.beforeRemove(ctx); h.beforeRemove(ctx);
} catch (Throwable t) { } catch (Throwable t) {
throw new ChannelHandlerLifeCycleException( throw new ChannelHandlerLifeCycleException(
h.getName() + h.getClass().getName() +
".beforeRemove() has thrown an exception; not removing.", t); ".beforeRemove() has thrown an exception; not removing.", t);
} }
} }
@ -432,7 +432,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
h.afterRemove(ctx); h.afterRemove(ctx);
} catch (Throwable t) { } catch (Throwable t) {
throw new ChannelHandlerLifeCycleException( throw new ChannelHandlerLifeCycleException(
h.getName() + h.getClass().getName() +
".afterRemove() has thrown an exception.", t); ".afterRemove() has thrown an exception.", t);
} }
} }

View File

@ -36,6 +36,7 @@ import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.SimpleChannelHandler;
@ -147,11 +148,36 @@ import org.jboss.netty.channel.SimpleChannelHandler;
* @apiviz.landmark * @apiviz.landmark
*/ */
@ChannelPipelineCoverage("one") @ChannelPipelineCoverage("one")
public abstract class FrameDecoder extends SimpleChannelHandler { public abstract class FrameDecoder
extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
private final AtomicReference<ChannelBuffer> cumulation = private final AtomicReference<ChannelBuffer> cumulation =
new AtomicReference<ChannelBuffer>(); new AtomicReference<ChannelBuffer>();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
cumulation(ctx);
super.handleUpstream(ctx, e);
}
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
cumulation(ctx);
}
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Unused
}
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Unused
}
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Unused
}
@Override @Override
public void messageReceived( public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@ -167,7 +193,7 @@ public abstract class FrameDecoder extends SimpleChannelHandler {
return; return;
} }
ChannelBuffer cumulation = cumulation(e); ChannelBuffer cumulation = cumulation(ctx);
if (cumulation.readable()) { if (cumulation.readable()) {
cumulation.discardReadBytes(); cumulation.discardReadBytes();
cumulation.writeBytes(input); cumulation.writeBytes(input);
@ -256,7 +282,7 @@ public abstract class FrameDecoder extends SimpleChannelHandler {
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
ChannelBuffer cumulation = cumulation(e); ChannelBuffer cumulation = cumulation(ctx);
try { try {
if (cumulation.readable()) { if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel. // Make sure all frames are read before notifying a closed channel.
@ -274,11 +300,11 @@ public abstract class FrameDecoder extends SimpleChannelHandler {
} }
} }
private ChannelBuffer cumulation(ChannelEvent e) { private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
ChannelBuffer buf = cumulation.get(); ChannelBuffer buf = cumulation.get();
if (buf == null) { if (buf == null) {
buf = ChannelBuffers.dynamicBuffer( buf = ChannelBuffers.dynamicBuffer(
e.getChannel().getConfig().getBufferFactory()); ctx.getChannel().getConfig().getBufferFactory());
if (!cumulation.compareAndSet(null, buf)) { if (!cumulation.compareAndSet(null, buf)) {
buf = cumulation.get(); buf = cumulation.get();
} }

View File

@ -30,11 +30,13 @@ import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.jboss.netty.handler.codec.frame.FrameDecoder;
@ -212,7 +214,9 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
* @apiviz.landmark * @apiviz.landmark
*/ */
@ChannelPipelineCoverage("one") @ChannelPipelineCoverage("one")
public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelHandler { public abstract class ReplayingDecoder<T extends Enum<T>>
extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
private final AtomicReference<ChannelBuffer> cumulation = private final AtomicReference<ChannelBuffer> cumulation =
new AtomicReference<ChannelBuffer>(); new AtomicReference<ChannelBuffer>();
@ -297,6 +301,29 @@ public abstract class ReplayingDecoder<T extends Enum<T>> extends SimpleChannelH
return decode(ctx, channel, buffer, state); return decode(ctx, channel, buffer, state);
} }
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
cumulation(ctx);
super.handleUpstream(ctx, e);
}
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
cumulation(ctx);
}
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Unused
}
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Unused
}
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Unused
}
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception { throws Exception {