Fields in ReplayingDecoder does not need atomic nature. See #108
This commit is contained in:
parent
27f9a0cee2
commit
229c8734ed
@ -16,8 +16,6 @@
|
|||||||
package org.jboss.netty.handler.codec.replay;
|
package org.jboss.netty.handler.codec.replay;
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.buffer.ChannelBufferFactory;
|
import org.jboss.netty.buffer.ChannelBufferFactory;
|
||||||
@ -294,10 +292,8 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
extends SimpleChannelUpstreamHandler {
|
extends SimpleChannelUpstreamHandler {
|
||||||
|
|
||||||
|
|
||||||
private final AtomicReference<ChannelBuffer> cumulation =
|
private ChannelBuffer cumulation;
|
||||||
new AtomicReference<ChannelBuffer>();
|
private boolean needsCleanup = false;
|
||||||
private final AtomicBoolean needsCleanup =
|
|
||||||
new AtomicBoolean(false);
|
|
||||||
private final boolean unfold;
|
private final boolean unfold;
|
||||||
private ReplayingDecoderBuffer replayable;
|
private ReplayingDecoderBuffer replayable;
|
||||||
private T state;
|
private T state;
|
||||||
@ -330,7 +326,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
* Stores the internal cumulative buffer's reader position.
|
* Stores the internal cumulative buffer's reader position.
|
||||||
*/
|
*/
|
||||||
protected void checkpoint() {
|
protected void checkpoint() {
|
||||||
ChannelBuffer cumulation = this.cumulation.get();
|
ChannelBuffer cumulation = this.cumulation;
|
||||||
if (cumulation != null) {
|
if (cumulation != null) {
|
||||||
checkpoint = cumulation.readerIndex();
|
checkpoint = cumulation.readerIndex();
|
||||||
} else {
|
} else {
|
||||||
@ -381,7 +377,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
* Use it only when you must use it at your own risk.
|
* Use it only when you must use it at your own risk.
|
||||||
*/
|
*/
|
||||||
protected ChannelBuffer internalBuffer() {
|
protected ChannelBuffer internalBuffer() {
|
||||||
ChannelBuffer buf = cumulation.get();
|
ChannelBuffer buf = this.cumulation;
|
||||||
if (buf == null) {
|
if (buf == null) {
|
||||||
return ChannelBuffers.EMPTY_BUFFER;
|
return ChannelBuffers.EMPTY_BUFFER;
|
||||||
}
|
}
|
||||||
@ -439,7 +435,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
}
|
}
|
||||||
|
|
||||||
ChannelBuffer cumulation = cumulation(ctx);
|
ChannelBuffer cumulation = cumulation(ctx);
|
||||||
needsCleanup.set(true);
|
needsCleanup = true;
|
||||||
cumulation.discardReadBytes();
|
cumulation.discardReadBytes();
|
||||||
cumulation.writeBytes(input);
|
cumulation.writeBytes(input);
|
||||||
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
|
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
|
||||||
@ -508,7 +504,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
unfoldAndFireMessageReceived(context, result, remoteAddress);
|
unfoldAndFireMessageReceived(context, result, remoteAddress);
|
||||||
|
|
||||||
if (!cumulation.readable()) {
|
if (!cumulation.readable()) {
|
||||||
this.cumulation.set(null);
|
this.cumulation = null;
|
||||||
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
|
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -536,11 +532,14 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
|
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
try {
|
try {
|
||||||
if (!needsCleanup.getAndSet(false)) {
|
if (!needsCleanup) {
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
needsCleanup = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelBuffer cumulation = this.cumulation.getAndSet(null);
|
ChannelBuffer cumulation = this.cumulation;
|
||||||
|
this.cumulation = null;
|
||||||
replayable.terminate();
|
replayable.terminate();
|
||||||
|
|
||||||
if (cumulation != null && cumulation.readable()) {
|
if (cumulation != null && cumulation.readable()) {
|
||||||
@ -564,14 +563,15 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
|
|||||||
}
|
}
|
||||||
|
|
||||||
private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
|
private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
|
||||||
ChannelBuffer buf = cumulation.get();
|
ChannelBuffer buf = this.cumulation;
|
||||||
if (buf == null) {
|
if (buf == null) {
|
||||||
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
|
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
|
||||||
buf = new UnsafeDynamicChannelBuffer(factory);
|
buf = new UnsafeDynamicChannelBuffer(factory);
|
||||||
if (cumulation.compareAndSet(null, buf)) {
|
if (cumulation == null) {
|
||||||
|
cumulation = buf;
|
||||||
replayable = new ReplayingDecoderBuffer(buf);
|
replayable = new ReplayingDecoderBuffer(buf);
|
||||||
} else {
|
} else {
|
||||||
buf = cumulation.get();
|
buf = cumulation;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
|
Loading…
Reference in New Issue
Block a user