From 5aae8279b9c7c54e30b019235b9a5ccdea64252b Mon Sep 17 00:00:00 2001 From: Jeff Pinner Date: Wed, 29 Feb 2012 12:38:52 -0800 Subject: [PATCH] SPDY: use jdk 1.7 java.util.zip instead of jzlib --- .../handler/codec/spdy/SpdyFrameDecoder.java | 60 ++++++++------ .../handler/codec/spdy/SpdyFrameEncoder.java | 44 +++++++--- .../handler/codec/spdy/SpdyZlibDecoder.java | 80 +++++++++++++++++++ .../handler/codec/spdy/SpdyZlibEncoder.java | 69 ++++++++++++++++ .../handler/codec/spdy/SpdyZlibDecoder.java | 80 +++++++++++++++++++ .../handler/codec/spdy/SpdyZlibEncoder.java | 69 ++++++++++++++++ 6 files changed, 367 insertions(+), 35 deletions(-) create mode 100644 codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibDecoder.java create mode 100644 codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibEncoder.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibDecoder.java create mode 100644 src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibEncoder.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java index 1d058aa1c3..997dc0ad74 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.spdy; import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.compression.ZlibDecoder; @@ -33,8 +34,7 @@ public class SpdyFrameDecoder extends FrameDecoder { private final int maxFrameSize; private final int maxHeaderSize; - private final DecoderEmbedder headerBlockDecompressor = - new DecoderEmbedder(new ZlibDecoder(SPDY_DICT)); + private final SpdyZlibDecoder headerBlockDecompressor = new SpdyZlibDecoder(); /** * Creates a new instance with the default {@code maxChunkSize (8192)}, @@ -114,7 +114,9 @@ public class SpdyFrameDecoder extends FrameDecoder { int type = getUnsignedShort(buffer, typeOffset); buffer.skipBytes(SPDY_HEADER_SIZE); - return decodeControlFrame(type, flags, buffer.readBytes(dataLength)); + int readerIndex = buffer.readerIndex(); + buffer.skipBytes(dataLength); + return decodeControlFrame(type, flags, buffer.slice(readerIndex, dataLength)); } else { // Decode data frame common header int streamID = getUnsignedInt(buffer, frameOffset); @@ -166,7 +168,7 @@ public class SpdyFrameDecoder extends FrameDecoder { spdySynStreamFrame.setLast(last); spdySynStreamFrame.setUnidirectional(unid); - decodeHeaderBlock(spdySynStreamFrame, decompress(data)); + decodeHeaderBlock(spdySynStreamFrame, data); return spdySynStreamFrame; @@ -184,7 +186,7 @@ public class SpdyFrameDecoder extends FrameDecoder { last = (flags & SPDY_FLAG_FIN) != 0; spdySynReplyFrame.setLast(last); - decodeHeaderBlock(spdySynReplyFrame, decompress(data)); + decodeHeaderBlock(spdySynReplyFrame, data); return spdySynReplyFrame; @@ -284,7 +286,7 @@ public class SpdyFrameDecoder extends FrameDecoder { SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(streamID); - decodeHeaderBlock(spdyHeadersFrame, decompress(data)); + decodeHeaderBlock(spdyHeadersFrame, data); return spdyHeadersFrame; @@ -296,31 +298,38 @@ public class SpdyFrameDecoder extends FrameDecoder { } } - private ChannelBuffer decompress(ChannelBuffer compressed) throws Exception { - if ((compressed.readableBytes() == 2) && - (compressed.getShort(compressed.readerIndex()) == 0)) { - return compressed; + private boolean ensureBytes(ChannelBuffer decompressed, int bytes) throws Exception { + if (decompressed.readableBytes() >= bytes) { + return true; } - headerBlockDecompressor.offer(compressed); - return headerBlockDecompressor.poll(); + decompressed.discardReadBytes(); + headerBlockDecompressor.decode(decompressed); + return decompressed.readableBytes() >= bytes; } private void decodeHeaderBlock(SpdyHeaderBlock headerFrame, ChannelBuffer headerBlock) throws Exception { - if (headerBlock.readableBytes() < 2) { + if ((headerBlock.readableBytes() == 2) && + (headerBlock.getShort(headerBlock.readerIndex()) == 0)) { + return; + } + + headerBlockDecompressor.setInput(headerBlock); + ChannelBuffer decompressed = ChannelBuffers.dynamicBuffer(8192); + headerBlockDecompressor.decode(decompressed); + + if (decompressed.readableBytes() < 2) { throw new SpdyProtocolException( "Received invalid header block"); } int headerSize = 0; - int numEntries = getUnsignedShort(headerBlock, headerBlock.readerIndex()); - headerBlock.skipBytes(2); + int numEntries = decompressed.readUnsignedShort(); for (int i = 0; i < numEntries; i ++) { - if (headerBlock.readableBytes() < 2) { + if (!ensureBytes(decompressed, 2)) { throw new SpdyProtocolException( "Received invalid header block"); } - int nameLength = getUnsignedShort(headerBlock, headerBlock.readerIndex()); - headerBlock.skipBytes(2); + int nameLength = decompressed.readUnsignedShort(); if (nameLength == 0) { headerFrame.setInvalid(); return; @@ -330,23 +339,22 @@ public class SpdyFrameDecoder extends FrameDecoder { throw new SpdyProtocolException( "Header block exceeds " + maxHeaderSize); } - if (headerBlock.readableBytes() < nameLength) { + if (!ensureBytes(decompressed, nameLength)) { throw new SpdyProtocolException( "Received invalid header block"); } byte[] nameBytes = new byte[nameLength]; - headerBlock.readBytes(nameBytes); + decompressed.readBytes(nameBytes); String name = new String(nameBytes, "UTF-8"); if (headerFrame.containsHeader(name)) { throw new SpdyProtocolException( "Received duplicate header name: " + name); } - if (headerBlock.readableBytes() < 2) { + if (!ensureBytes(decompressed, 2)) { throw new SpdyProtocolException( "Received invalid header block"); } - int valueLength = getUnsignedShort(headerBlock, headerBlock.readerIndex()); - headerBlock.skipBytes(2); + int valueLength = decompressed.readUnsignedShort(); if (valueLength == 0) { headerFrame.setInvalid(); return; @@ -356,15 +364,15 @@ public class SpdyFrameDecoder extends FrameDecoder { throw new SpdyProtocolException( "Header block exceeds " + maxHeaderSize); } - if (headerBlock.readableBytes() < valueLength) { + if (!ensureBytes(decompressed, valueLength)) { throw new SpdyProtocolException( "Received invalid header block"); } byte[] valueBytes = new byte[valueLength]; - headerBlock.readBytes(valueBytes); + decompressed.readBytes(valueBytes); int index = 0; int offset = 0; - while (index < valueBytes.length) { + while (index < valueLength) { while (index < valueBytes.length && valueBytes[index] != (byte) 0) { index ++; } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java index 083696d50c..750c8f9595 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -15,25 +15,26 @@ */ package io.netty.handler.codec.spdy; +import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; + import java.nio.ByteOrder; import java.util.Set; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; +import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.compression.ZlibEncoder; -import io.netty.handler.codec.embedder.EncoderEmbedder; +import io.netty.channel.ChannelStateEvent; import io.netty.handler.codec.oneone.OneToOneEncoder; -import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; - /** * Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}. */ public class SpdyFrameEncoder extends OneToOneEncoder { - private final EncoderEmbedder headerBlockCompressor; + private volatile boolean finished; + private final SpdyZlibEncoder headerBlockCompressor; /** * Creates a new instance with the default {@code compressionLevel (6)}, @@ -48,8 +49,27 @@ public class SpdyFrameEncoder extends OneToOneEncoder { */ public SpdyFrameEncoder(int compressionLevel, int windowBits, int memLevel) { super(); - headerBlockCompressor = new EncoderEmbedder( - new ZlibEncoder(compressionLevel, windowBits, memLevel, SPDY_DICT)); + headerBlockCompressor = new SpdyZlibEncoder(compressionLevel); + } + + @Override + public void handleDownstream( + ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { + if (evt instanceof ChannelStateEvent) { + ChannelStateEvent e = (ChannelStateEvent) evt; + switch (e.getState()) { + case OPEN: + case CONNECTED: + case BOUND: + if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) { + synchronized (headerBlockCompressor) { + finished = true; + headerBlockCompressor.end(); + } + } + } + } + super.handleDownstream(ctx, evt); } @Override @@ -263,7 +283,13 @@ public class SpdyFrameEncoder extends OneToOneEncoder { if (uncompressed.readableBytes() == 0) { return ChannelBuffers.EMPTY_BUFFER; } - headerBlockCompressor.offer(uncompressed); - return headerBlockCompressor.poll(); + ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(); + synchronized (headerBlockCompressor) { + if (!finished) { + headerBlockCompressor.setInput(uncompressed); + headerBlockCompressor.encode(compressed); + } + } + return compressed; } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibDecoder.java new file mode 100644 index 0000000000..33f2ed4b84 --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibDecoder.java @@ -0,0 +1,80 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.handler.codec.spdy; + +import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; + +import io.netty.buffer.ChannelBuffer; +import io.netty.handler.codec.compression.CompressionException; +import io.netty.util.internal.jzlib.JZlib; +import io.netty.util.internal.jzlib.ZStream; + +class SpdyZlibDecoder { + + private final byte[] out = new byte[8192]; + private final ZStream z = new ZStream(); + + public SpdyZlibDecoder() { + int resultCode; + resultCode = z.inflateInit(JZlib.W_ZLIB); + if (resultCode != JZlib.Z_OK) { + throw new CompressionException("ZStream initialization failure"); + } + z.next_out = out; + } + + public void setInput(ChannelBuffer compressed) { + byte[] in = new byte[compressed.readableBytes()]; + compressed.readBytes(in); + z.next_in = in; + z.next_in_index = 0; + z.avail_in = in.length; + } + + public void decode(ChannelBuffer decompressed) { + z.next_out_index = 0; + z.avail_out = out.length; + + int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH); + + if (resultCode == JZlib.Z_NEED_DICT) { + resultCode = z.inflateSetDictionary(SPDY_DICT, SPDY_DICT.length); + if (resultCode != JZlib.Z_OK) { + throw new CompressionException("ZStream dictionary failure"); + } + z.inflate(JZlib.Z_SYNC_FLUSH); + } + + if (z.next_out_index > 0) { + decompressed.writeBytes(out, 0, z.next_out_index); + } + } +} diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibEncoder.java new file mode 100644 index 0000000000..9a900f41ad --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyZlibEncoder.java @@ -0,0 +1,69 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.handler.codec.spdy; + +import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; + +import java.util.zip.Deflater; + +import io.netty.buffer.ChannelBuffer; + +class SpdyZlibEncoder { + + private final byte[] out = new byte[8192]; + private final Deflater compressor; + + public SpdyZlibEncoder(int compressionLevel) { + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException( + "compressionLevel: " + compressionLevel + " (expected: 0-9)"); + } + compressor = new Deflater(compressionLevel); + compressor.setDictionary(SPDY_DICT); + } + + public void setInput(ChannelBuffer decompressed) { + byte[] in = new byte[decompressed.readableBytes()]; + decompressed.readBytes(in); + compressor.setInput(in); + } + + public void encode(ChannelBuffer compressed) { + while (!compressor.needsInput()) { + int numBytes = compressor.deflate(out, 0, out.length, Deflater.SYNC_FLUSH); + compressed.writeBytes(out, 0, numBytes); + } + } + + public void end() { + compressor.end(); + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibDecoder.java b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibDecoder.java new file mode 100644 index 0000000000..458b0e255a --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibDecoder.java @@ -0,0 +1,80 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jboss.netty.handler.codec.spdy; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.codec.compression.CompressionException; +import org.jboss.netty.util.internal.jzlib.JZlib; +import org.jboss.netty.util.internal.jzlib.ZStream; + +import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*; + +class SpdyZlibDecoder { + + private final byte[] out = new byte[8192]; + private final ZStream z = new ZStream(); + + public SpdyZlibDecoder() { + int resultCode; + resultCode = z.inflateInit(JZlib.W_ZLIB); + if (resultCode != JZlib.Z_OK) { + throw new CompressionException("ZStream initialization failure"); + } + z.next_out = out; + } + + public void setInput(ChannelBuffer compressed) { + byte[] in = new byte[compressed.readableBytes()]; + compressed.readBytes(in); + z.next_in = in; + z.next_in_index = 0; + z.avail_in = in.length; + } + + public void decode(ChannelBuffer decompressed) { + z.next_out_index = 0; + z.avail_out = out.length; + + int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH); + + if (resultCode == JZlib.Z_NEED_DICT) { + resultCode = z.inflateSetDictionary(SPDY_DICT, SPDY_DICT.length); + if (resultCode != JZlib.Z_OK) { + throw new CompressionException("ZStream dictionary failure"); + } + z.inflate(JZlib.Z_SYNC_FLUSH); + } + + if (z.next_out_index > 0) { + decompressed.writeBytes(out, 0, z.next_out_index); + } + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibEncoder.java b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibEncoder.java new file mode 100644 index 0000000000..4b78a49ca5 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyZlibEncoder.java @@ -0,0 +1,69 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jboss.netty.handler.codec.spdy; + +import java.util.zip.Deflater; + +import org.jboss.netty.buffer.ChannelBuffer; + +import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*; + +class SpdyZlibEncoder { + + private final byte[] out = new byte[8192]; + private final Deflater compressor; + + public SpdyZlibEncoder(int compressionLevel) { + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException( + "compressionLevel: " + compressionLevel + " (expected: 0-9)"); + } + compressor = new Deflater(compressionLevel); + compressor.setDictionary(SPDY_DICT); + } + + public void setInput(ChannelBuffer decompressed) { + byte[] in = new byte[decompressed.readableBytes()]; + decompressed.readBytes(in); + compressor.setInput(in); + } + + public void encode(ChannelBuffer compressed) { + while (!compressor.needsInput()) { + int numBytes = compressor.deflate(out, 0, out.length, Deflater.SYNC_FLUSH); + compressed.writeBytes(out, 0, numBytes); + } + } + + public void end() { + compressor.end(); + } +}