Fix for issue #442: SpdyFrameEncoder compressor state race condition

This commit is contained in:
Jeff Pinner 2012-07-10 01:44:17 -07:00
parent 7180146084
commit fa4ea1894a

View File

@ -23,15 +23,17 @@ import java.util.Set;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
/** /**
* Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}. * Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}.
*/ */
public class SpdyFrameEncoder extends OneToOneEncoder { public class SpdyFrameEncoder implements ChannelDownstreamHandler {
private final int version; private final int version;
private volatile boolean finished; private volatile boolean finished;
@ -69,7 +71,6 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
* Creates a new instance with the specified parameters. * Creates a new instance with the specified parameters.
*/ */
public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) {
super();
if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"unknown version: " + version); "unknown version: " + version);
@ -79,9 +80,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
version, compressionLevel, windowBits, memLevel); version, compressionLevel, windowBits, memLevel);
} }
@Override
public void handleDownstream( public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { final ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (evt instanceof ChannelStateEvent) { if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt; ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.getState()) { switch (e.getState()) {
@ -96,13 +96,14 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
} }
} }
} }
super.handleDownstream(ctx, evt);
}
@Override if (!(evt instanceof MessageEvent)) {
protected Object encode( ctx.sendDownstream(evt);
ChannelHandlerContext ctx, Channel channel, Object msg) return;
throws Exception { }
final MessageEvent e = (MessageEvent) evt;
Object msg = e.getMessage();
if (msg instanceof SpdyDataFrame) { if (msg instanceof SpdyDataFrame) {
@ -114,75 +115,95 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
header.writeInt(spdyDataFrame.getStreamId() & 0x7FFFFFFF); header.writeInt(spdyDataFrame.getStreamId() & 0x7FFFFFFF);
header.writeByte(flags); header.writeByte(flags);
header.writeMedium(data.readableBytes()); header.writeMedium(data.readableBytes());
return ChannelBuffers.wrappedBuffer(header, data); ChannelBuffer frame = ChannelBuffers.wrappedBuffer(header, data);
Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress());
return;
} else if (msg instanceof SpdySynStreamFrame) { } else if (msg instanceof SpdySynStreamFrame) {
SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; synchronized (headerBlockCompressor) {
ChannelBuffer data = compressHeaderBlock( SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
encodeHeaderBlock(version, spdySynStreamFrame)); ChannelBuffer data = compressHeaderBlock(
byte flags = spdySynStreamFrame.isLast() ? SPDY_FLAG_FIN : 0; encodeHeaderBlock(version, spdySynStreamFrame));
if (spdySynStreamFrame.isUnidirectional()) { byte flags = spdySynStreamFrame.isLast() ? SPDY_FLAG_FIN : 0;
flags |= SPDY_FLAG_UNIDIRECTIONAL; if (spdySynStreamFrame.isUnidirectional()) {
} flags |= SPDY_FLAG_UNIDIRECTIONAL;
int headerBlockLength = data.readableBytes();
int length;
if (version < 3) {
length = headerBlockLength == 0 ? 12 : 10 + headerBlockLength;
} else {
length = 10 + headerBlockLength;
}
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(version | 0x8000);
frame.writeShort(SPDY_SYN_STREAM_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdySynStreamFrame.getStreamId());
frame.writeInt(spdySynStreamFrame.getAssociatedToStreamId());
if (version < 3) {
// Restrict priorities for SPDY/2 to between 0 and 3
byte priority = spdySynStreamFrame.getPriority();
if (priority > 3) {
priority = 3;
} }
frame.writeShort((priority & 0xFF) << 14); int headerBlockLength = data.readableBytes();
} else { int length;
frame.writeShort((spdySynStreamFrame.getPriority() & 0xFF) << 13); if (version < 3) {
length = headerBlockLength == 0 ? 12 : 10 + headerBlockLength;
} else {
length = 10 + headerBlockLength;
}
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 12);
frame.writeShort(version | 0x8000);
frame.writeShort(SPDY_SYN_STREAM_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdySynStreamFrame.getStreamId());
frame.writeInt(spdySynStreamFrame.getAssociatedToStreamId());
if (version < 3) {
// Restrict priorities for SPDY/2 to between 0 and 3
byte priority = spdySynStreamFrame.getPriority();
if (priority > 3) {
priority = 3;
}
frame.writeShort((priority & 0xFF) << 14);
} else {
frame.writeShort((spdySynStreamFrame.getPriority() & 0xFF) << 13);
}
if (version < 3 && data.readableBytes() == 0) {
frame.writeShort(0);
}
// Writes of compressed data must occur in order
final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data);
e.getChannel().getPipeline().execute(new Runnable() {
public void run() {
Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
}
});
} }
if (version < 3 && data.readableBytes() == 0) { return;
frame.writeShort(0);
}
return ChannelBuffers.wrappedBuffer(frame, data);
} else if (msg instanceof SpdySynReplyFrame) { } else if (msg instanceof SpdySynReplyFrame) {
SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg; synchronized (headerBlockCompressor) {
ChannelBuffer data = compressHeaderBlock( SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
encodeHeaderBlock(version, spdySynReplyFrame)); ChannelBuffer data = compressHeaderBlock(
byte flags = spdySynReplyFrame.isLast() ? SPDY_FLAG_FIN : 0; encodeHeaderBlock(version, spdySynReplyFrame));
int headerBlockLength = data.readableBytes(); byte flags = spdySynReplyFrame.isLast() ? SPDY_FLAG_FIN : 0;
int length; int headerBlockLength = data.readableBytes();
if (version < 3) { int length;
length = headerBlockLength == 0 ? 8 : 6 + headerBlockLength; if (version < 3) {
} else { length = headerBlockLength == 0 ? 8 : 6 + headerBlockLength;
length = 4 + headerBlockLength;
}
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(version | 0x8000);
frame.writeShort(SPDY_SYN_REPLY_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdySynReplyFrame.getStreamId());
if (version < 3) {
if (data.readableBytes() == 0) {
frame.writeInt(0);
} else { } else {
frame.writeShort(0); length = 4 + headerBlockLength;
} }
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 8);
frame.writeShort(version | 0x8000);
frame.writeShort(SPDY_SYN_REPLY_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdySynReplyFrame.getStreamId());
if (version < 3) {
if (data.readableBytes() == 0) {
frame.writeInt(0);
} else {
frame.writeShort(0);
}
}
// Writes of compressed data must occur in order
final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data);
e.getChannel().getPipeline().execute(new Runnable() {
public void run() {
Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
}
});
} }
return ChannelBuffers.wrappedBuffer(frame, data); return;
} else if (msg instanceof SpdyRstStreamFrame) { } else if (msg instanceof SpdyRstStreamFrame) {
@ -194,7 +215,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
frame.writeInt(8); frame.writeInt(8);
frame.writeInt(spdyRstStreamFrame.getStreamId()); frame.writeInt(spdyRstStreamFrame.getStreamId());
frame.writeInt(spdyRstStreamFrame.getStatus().getCode()); frame.writeInt(spdyRstStreamFrame.getStatus().getCode());
return frame; Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress());
return;
} else if (msg instanceof SpdySettingsFrame) { } else if (msg instanceof SpdySettingsFrame) {
@ -234,7 +256,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
} }
frame.writeInt(spdySettingsFrame.getValue(id)); frame.writeInt(spdySettingsFrame.getValue(id));
} }
return frame; Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress());
return;
} else if (msg instanceof SpdyNoOpFrame) { } else if (msg instanceof SpdyNoOpFrame) {
@ -243,7 +266,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
frame.writeShort(version | 0x8000); frame.writeShort(version | 0x8000);
frame.writeShort(SPDY_NOOP_FRAME); frame.writeShort(SPDY_NOOP_FRAME);
frame.writeInt(0); frame.writeInt(0);
return frame; Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress());
return;
} else if (msg instanceof SpdyPingFrame) { } else if (msg instanceof SpdyPingFrame) {
@ -254,7 +278,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
frame.writeShort(SPDY_PING_FRAME); frame.writeShort(SPDY_PING_FRAME);
frame.writeInt(4); frame.writeInt(4);
frame.writeInt(spdyPingFrame.getId()); frame.writeInt(spdyPingFrame.getId());
return frame; Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress());
return;
} else if (msg instanceof SpdyGoAwayFrame) { } else if (msg instanceof SpdyGoAwayFrame) {
@ -269,32 +294,42 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
if (version >= 3) { if (version >= 3) {
frame.writeInt(spdyGoAwayFrame.getStatus().getCode()); frame.writeInt(spdyGoAwayFrame.getStatus().getCode());
} }
return frame; Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress());
return;
} else if (msg instanceof SpdyHeadersFrame) { } else if (msg instanceof SpdyHeadersFrame) {
SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg; synchronized (headerBlockCompressor) {
ChannelBuffer data = compressHeaderBlock( SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
encodeHeaderBlock(version, spdyHeadersFrame)); ChannelBuffer data = compressHeaderBlock(
byte flags = spdyHeadersFrame.isLast() ? SPDY_FLAG_FIN : 0; encodeHeaderBlock(version, spdyHeadersFrame));
int headerBlockLength = data.readableBytes(); byte flags = spdyHeadersFrame.isLast() ? SPDY_FLAG_FIN : 0;
int length; int headerBlockLength = data.readableBytes();
if (version < 3) { int length;
length = headerBlockLength == 0 ? 4 : 6 + headerBlockLength; if (version < 3) {
} else { length = headerBlockLength == 0 ? 4 : 6 + headerBlockLength;
length = 4 + headerBlockLength; } else {
length = 4 + headerBlockLength;
}
ChannelBuffer frame = ChannelBuffers.buffer(
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(version | 0x8000);
frame.writeShort(SPDY_HEADERS_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdyHeadersFrame.getStreamId());
if (version < 3 && data.readableBytes() != 0) {
frame.writeShort(0);
}
// Writes of compressed data must occur in order
final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data);
e.getChannel().getPipeline().execute(new Runnable() {
public void run() {
Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
}
});
} }
ChannelBuffer frame = ChannelBuffers.buffer( return;
ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length);
frame.writeShort(version | 0x8000);
frame.writeShort(SPDY_HEADERS_FRAME);
frame.writeByte(flags);
frame.writeMedium(length);
frame.writeInt(spdyHeadersFrame.getStreamId());
if (version < 3 && data.readableBytes() != 0) {
frame.writeShort(0);
}
return ChannelBuffers.wrappedBuffer(frame, data);
} else if (msg instanceof SpdyWindowUpdateFrame) { } else if (msg instanceof SpdyWindowUpdateFrame) {
@ -306,11 +341,12 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
frame.writeInt(8); frame.writeInt(8);
frame.writeInt(spdyWindowUpdateFrame.getStreamId()); frame.writeInt(spdyWindowUpdateFrame.getStreamId());
frame.writeInt(spdyWindowUpdateFrame.getDeltaWindowSize()); frame.writeInt(spdyWindowUpdateFrame.getDeltaWindowSize());
return frame; Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress());
return;
} }
// Unknown message type // Unknown message type
return msg; ctx.sendDownstream(evt);
} }
private static void writeLengthField(int version, ChannelBuffer buffer, int length) { private static void writeLengthField(int version, ChannelBuffer buffer, int length) {
@ -367,17 +403,16 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
return headerBlock; return headerBlock;
} }
private synchronized ChannelBuffer compressHeaderBlock( // always called while synchronized on headerBlockCompressor
ChannelBuffer uncompressed) throws Exception { private ChannelBuffer compressHeaderBlock(ChannelBuffer uncompressed)
throws Exception {
if (uncompressed.readableBytes() == 0) { if (uncompressed.readableBytes() == 0) {
return ChannelBuffers.EMPTY_BUFFER; return ChannelBuffers.EMPTY_BUFFER;
} }
ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(); ChannelBuffer compressed = ChannelBuffers.dynamicBuffer();
synchronized (headerBlockCompressor) { if (!finished) {
if (!finished) { headerBlockCompressor.setInput(uncompressed);
headerBlockCompressor.setInput(uncompressed); headerBlockCompressor.encode(compressed);
headerBlockCompressor.encode(compressed);
}
} }
return compressed; return compressed;
} }