Merge pull request #1929 from wgallagher/netty3spdybuf

avoid holding onto temporary buffers in SpdyFrameCodec
This commit is contained in:
Jeff Pinner 2013-10-17 11:45:37 -07:00
commit f739579339
4 changed files with 80 additions and 66 deletions

View File

@ -20,6 +20,7 @@ import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.frame.FrameDecoder; import org.jboss.netty.handler.codec.frame.FrameDecoder;
@ -89,17 +90,6 @@ public class SpdyFrameDecoder extends FrameDecoder {
state = State.READ_COMMON_HEADER; state = State.READ_COMMON_HEADER;
} }
@Override
protected Object decodeLast(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
throws Exception {
try {
return decode(ctx, channel, buffer);
} finally {
headerBlockDecoder.end();
}
}
@Override @Override
protected Object decode( protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
@ -521,6 +511,15 @@ public class SpdyFrameDecoder extends FrameDecoder {
} }
} }
@Override
protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
try {
super.cleanup(ctx, e);
} finally {
headerBlockDecoder.end();
}
}
private static void fireInvalidFrameException(ChannelHandlerContext ctx) { private static void fireInvalidFrameException(ChannelHandlerContext ctx) {
Channels.fireExceptionCaught(ctx, INVALID_FRAME); Channels.fireExceptionCaught(ctx, INVALID_FRAME);
} }

View File

@ -16,19 +16,19 @@
package org.jboss.netty.handler.codec.spdy; package org.jboss.netty.handler.codec.spdy;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*; import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder { public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
private static final int LENGTH_FIELD_SIZE = 4;
private final int version; private final int version;
private final int maxHeaderSize; private final int maxHeaderSize;
private final int lengthFieldSize;
// Header block decoding fields // Header block decoding fields
private int headerSize; private int headerSize;
private int numHeaders; private int numHeaders = -1;
public SpdyHeaderBlockRawDecoder(SpdyVersion spdyVersion, int maxHeaderSize) { public SpdyHeaderBlockRawDecoder(SpdyVersion spdyVersion, int maxHeaderSize) {
if (spdyVersion == null) { if (spdyVersion == null) {
@ -37,19 +37,11 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
this.version = spdyVersion.getVersion(); this.version = spdyVersion.getVersion();
this.maxHeaderSize = maxHeaderSize; this.maxHeaderSize = maxHeaderSize;
lengthFieldSize = version < 3 ? 2 : 4;
reset();
} }
private int readLengthField(ChannelBuffer buffer) { private int readLengthField(ChannelBuffer buffer) {
int length; int length = getSignedInt(buffer, buffer.readerIndex());
if (version < 3) { buffer.skipBytes(LENGTH_FIELD_SIZE);
length = getUnsignedShort(buffer, buffer.readerIndex());
buffer.skipBytes(2);
} else {
length = getSignedInt(buffer, buffer.readerIndex());
buffer.skipBytes(4);
}
return length; return length;
} }
@ -64,7 +56,7 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
if (numHeaders == -1) { if (numHeaders == -1) {
// Read number of Name/Value pairs // Read number of Name/Value pairs
if (encoded.readableBytes() < lengthFieldSize) { if (encoded.readableBytes() < LENGTH_FIELD_SIZE) {
return; return;
} }
numHeaders = readLengthField(encoded); numHeaders = readLengthField(encoded);
@ -79,7 +71,7 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
encoded.markReaderIndex(); encoded.markReaderIndex();
// Try to read length of name // Try to read length of name
if (encoded.readableBytes() < lengthFieldSize) { if (encoded.readableBytes() < LENGTH_FIELD_SIZE) {
encoded.resetReaderIndex(); encoded.resetReaderIndex();
return; return;
} }
@ -112,7 +104,7 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
} }
// Try to read length of value // Try to read length of value
if (encoded.readableBytes() < lengthFieldSize) { if (encoded.readableBytes() < LENGTH_FIELD_SIZE) {
encoded.resetReaderIndex(); encoded.resetReaderIndex();
return; return;
} }
@ -126,15 +118,10 @@ public class SpdyHeaderBlockRawDecoder extends SpdyHeaderBlockDecoder {
// SPDY/3 allows zero-length (empty) header values // SPDY/3 allows zero-length (empty) header values
if (valueLength == 0) { if (valueLength == 0) {
if (version < 3) { frame.addHeader(name, "");
frame.setInvalid(); numHeaders --;
return; this.headerSize = headerSize;
} else { continue;
frame.addHeader(name, "");
numHeaders --;
this.headerSize = headerSize;
continue;
}
} }
headerSize += valueLength; headerSize += valueLength;

View File

@ -23,12 +23,11 @@ import java.util.zip.Inflater;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder { final class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
private final byte[] out = new byte[8192];
private final Inflater decompressor = new Inflater(); private final Inflater decompressor = new Inflater();
private ChannelBuffer decompressed; private final ChannelBuffer decompressed = ChannelBuffers.buffer(4096);
public SpdyHeaderBlockZlibDecoder(SpdyVersion spdyVersion, int maxHeaderSize) { public SpdyHeaderBlockZlibDecoder(SpdyVersion spdyVersion, int maxHeaderSize) {
super(spdyVersion, maxHeaderSize); super(spdyVersion, maxHeaderSize);
@ -36,52 +35,64 @@ class SpdyHeaderBlockZlibDecoder extends SpdyHeaderBlockRawDecoder {
@Override @Override
void decode(ChannelBuffer encoded, SpdyHeadersFrame frame) throws Exception { void decode(ChannelBuffer encoded, SpdyHeadersFrame frame) throws Exception {
setInput(encoded); int len = setInput(encoded);
int numBytes; int numBytes;
do { do {
numBytes = decompress(frame); numBytes = decompress(frame);
} while (!decompressed.readable() && numBytes > 0); } while (!decompressed.readable() && numBytes > 0);
if (decompressor.getRemaining() != 0) {
throw new SpdyProtocolException("client sent extra data beyond headers");
}
encoded.skipBytes(len);
} }
private void setInput(ChannelBuffer compressed) { private int setInput(ChannelBuffer compressed) {
byte[] in = new byte[compressed.readableBytes()]; int len = compressed.readableBytes();
compressed.readBytes(in);
decompressor.setInput(in); if (compressed.hasArray()) {
decompressor.setInput(compressed.array(), compressed.arrayOffset() + compressed.readerIndex(), len);
} else {
byte[] in = new byte[len];
compressed.getBytes(compressed.readerIndex(), in);
decompressor.setInput(in, 0, in.length);
}
return len;
} }
private int decompress(SpdyHeadersFrame frame) throws Exception { private int decompress(SpdyHeadersFrame frame) throws Exception {
if (decompressed == null) { byte[] out = decompressed.array();
decompressed = ChannelBuffers.dynamicBuffer(8192); int off = decompressed.arrayOffset() + decompressed.writerIndex();
}
try { try {
int numBytes = decompressor.inflate(out); int numBytes = decompressor.inflate(out, off, decompressed.writableBytes());
if (numBytes == 0 && decompressor.needsDictionary()) { if (numBytes == 0 && decompressor.needsDictionary()) {
decompressor.setDictionary(SPDY_DICT); decompressor.setDictionary(SPDY_DICT);
numBytes = decompressor.inflate(out); numBytes = decompressor.inflate(out, off, decompressed.writableBytes());
} }
if (frame != null) { if (frame != null) {
decompressed.writeBytes(out, 0, numBytes); decompressed.writerIndex(decompressed.writerIndex() + numBytes);
super.decode(decompressed, frame); super.decode(decompressed, frame);
decompressed.discardReadBytes(); decompressed.discardReadBytes();
} }
return numBytes; return numBytes;
} catch (DataFormatException e) { } catch (DataFormatException e) {
throw new SpdyProtocolException( throw new SpdyProtocolException("Received invalid header block", e);
"Received invalid header block", e);
} }
} }
@Override @Override
void reset() { void reset() {
decompressed = null; decompressed.clear();
super.reset(); super.reset();
} }
@Override @Override
public void end() { public void end() {
decompressed = null; decompressed.clear();
decompressor.end(); decompressor.end();
super.end(); super.end();
} }

View File

@ -24,7 +24,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder { class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder {
private final byte[] out = new byte[8192];
private final Deflater compressor; private final Deflater compressor;
private boolean finished; private boolean finished;
@ -39,20 +38,36 @@ class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder {
compressor.setDictionary(SPDY_DICT); compressor.setDictionary(SPDY_DICT);
} }
private void setInput(ChannelBuffer decompressed) { private int setInput(ChannelBuffer decompressed) {
byte[] in = new byte[decompressed.readableBytes()]; int len = decompressed.readableBytes();
decompressed.readBytes(in);
compressor.setInput(in); if (decompressed.hasArray()) {
compressor.setInput(decompressed.array(), decompressed.arrayOffset() + decompressed.readerIndex(), len);
} else {
byte[] in = new byte[len];
decompressed.getBytes(decompressed.readerIndex(), in);
compressor.setInput(in, 0, in.length);
}
return len;
} }
private void encode(ChannelBuffer compressed) { private void encode(ChannelBuffer compressed) {
int numBytes = out.length; while (compressInto(compressed)) {
while (numBytes == out.length) { // Although unlikely, it's possible that the compressed size is larger than the decompressed size
numBytes = compressor.deflate(out, 0, out.length, Deflater.SYNC_FLUSH); compressed.ensureWritableBytes(compressed.capacity() << 1);
compressed.writeBytes(out, 0, numBytes);
} }
} }
private boolean compressInto(ChannelBuffer compressed) {
byte[] out = compressed.array();
int off = compressed.arrayOffset() + compressed.writerIndex();
int toWrite = compressed.writableBytes();
int numBytes = compressor.deflate(out, off, toWrite, Deflater.SYNC_FLUSH);
compressed.writerIndex(compressed.writerIndex() + numBytes);
return numBytes == toWrite;
}
@Override @Override
public synchronized ChannelBuffer encode(SpdyHeadersFrame frame) throws Exception { public synchronized ChannelBuffer encode(SpdyHeadersFrame frame) throws Exception {
if (frame == null) { if (frame == null) {
@ -68,9 +83,11 @@ class SpdyHeaderBlockZlibEncoder extends SpdyHeaderBlockRawEncoder {
return ChannelBuffers.EMPTY_BUFFER; return ChannelBuffers.EMPTY_BUFFER;
} }
ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(); ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(decompressed.readableBytes());
setInput(decompressed); int len = setInput(decompressed);
encode(compressed); encode(compressed);
decompressed.skipBytes(len);
return compressed; return compressed;
} }