diff --git a/src/main/java/org/jboss/netty/handler/codec/compression/NativeZlibEncoder.java b/src/main/java/org/jboss/netty/handler/codec/compression/NativeZlibEncoder.java new file mode 100644 index 0000000000..9abaec14c0 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/compression/NativeZlibEncoder.java @@ -0,0 +1,288 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.codec.compression; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.CRC32; +import java.util.zip.Deflater; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.LifeCycleAwareChannelHandler; +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; + + +/** + * Compresses a {@link ChannelBuffer} using the deflate algorithm. + * @apiviz.landmark + * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper + */ +public class NativeZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler { + + private final byte[] out = new byte[8192]; + private final Deflater deflater; + private final AtomicBoolean finished = new AtomicBoolean(); + private volatile ChannelHandlerContext ctx; + + /* + * GZIP support + */ + private final boolean gzip; + private final CRC32 crc = new CRC32(); + private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0}; + private boolean writeHeader = true; + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}) + * and the default wrapper ({@link ZlibWrapper#ZLIB}). + * + * @throws CompressionException if failed to initialize zlib + */ + public NativeZlibEncoder() { + this(6); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel} + * and the default wrapper ({@link ZlibWrapper#ZLIB}). + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @throws CompressionException if failed to initialize zlib + */ + public NativeZlibEncoder(int compressionLevel) { + this(ZlibWrapper.ZLIB, compressionLevel); + } + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}) + * and the specified wrapper. + * + * @throws CompressionException if failed to initialize zlib + */ + public NativeZlibEncoder(ZlibWrapper wrapper) { + this(wrapper, 6); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel} + * and the specified wrapper. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @throws CompressionException if failed to initialize zlib + */ + public NativeZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException( + "compressionLevel: " + compressionLevel + " (expected: 0-9)"); + } + if (wrapper == null) { + throw new NullPointerException("wrapper"); + } + if (wrapper == ZlibWrapper.ZLIB_OR_NONE) { + throw new IllegalArgumentException( + "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " + + "allowed for compression."); + } + + gzip = wrapper == ZlibWrapper.GZIP; + deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB); + } + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}) + * and the specified preset dictionary. The wrapper is always + * {@link ZlibWrapper#ZLIB} because it is the only format that supports + * the preset dictionary. + * + * @param dictionary the preset dictionary + * + * @throws CompressionException if failed to initialize zlib + */ + public NativeZlibEncoder(byte[] dictionary) { + this(6, dictionary); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel} + * and the specified preset dictionary. The wrapper is always + * {@link ZlibWrapper#ZLIB} because it is the only format that supports + * the preset dictionary. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * @param dictionary the preset dictionary + * + * @throws CompressionException if failed to initialize zlib + */ + public NativeZlibEncoder(int compressionLevel, byte[] dictionary) { + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException( + "compressionLevel: " + compressionLevel + " (expected: 0-9)"); + } + if (dictionary == null) { + throw new NullPointerException("dictionary"); + } + + gzip = false; + deflater = new Deflater(compressionLevel); + deflater.setDictionary(dictionary); + } + + public ChannelFuture close() { + ChannelHandlerContext ctx = this.ctx; + if (ctx == null) { + throw new IllegalStateException("not added to a pipeline"); + } + return finishEncode(ctx, null); + } + + public boolean isClosed() { + return finished.get(); + } + + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { + if (!(msg instanceof ChannelBuffer) || finished.get()) { + return msg; + } + + ChannelBuffer uncompressed = (ChannelBuffer) msg; + byte[] in = new byte[uncompressed.readableBytes()]; + uncompressed.readBytes(in); + + int sizeEstimate = (int) Math.ceil(in.length * 1.001) + 12; + ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(sizeEstimate, channel.getConfig().getBufferFactory()); + + synchronized (deflater) { + if (gzip) { + crc.update(in); + if (writeHeader) { + compressed.writeBytes(gzipHeader); + writeHeader = false; + } + } + + deflater.setInput(in); + while (!deflater.needsInput()) { + int numBytes = deflater.deflate(out, 0, out.length, Deflater.SYNC_FLUSH); + compressed.writeBytes(out, 0, numBytes); + } + } + + return compressed; + } + + @Override + public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt) + throws Exception { + if (evt instanceof ChannelStateEvent) { + ChannelStateEvent e = (ChannelStateEvent) evt; + switch (e.getState()) { + case OPEN: + case CONNECTED: + case BOUND: + if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) { + finishEncode(ctx, evt); + return; + } + } + } + + super.handleDownstream(ctx, evt); + } + + private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) { + if (!finished.compareAndSet(false, true)) { + if (evt != null) { + ctx.sendDownstream(evt); + } + return Channels.succeededFuture(ctx.getChannel()); + } + + ChannelBuffer footer = ChannelBuffers.EMPTY_BUFFER; + synchronized (deflater) { + int numBytes = 0; + deflater.finish(); + if (!deflater.finished()) { + numBytes = deflater.deflate(out, 0, out.length); + } + int footerSize = gzip ? numBytes + 8 : numBytes; + if (footerSize > 0) { + footer = ctx.getChannel().getConfig().getBufferFactory().getBuffer(footerSize); + footer.writeBytes(out, 0, numBytes); + if (gzip) { + int crcValue = (int) crc.getValue(); + int uncBytes = deflater.getTotalIn(); + footer.writeByte(crcValue); + footer.writeByte(crcValue >>> 8); + footer.writeByte(crcValue >>> 16); + footer.writeByte(crcValue >>> 24); + footer.writeByte(uncBytes); + footer.writeByte(uncBytes >>> 8); + footer.writeByte(uncBytes >>> 16); + footer.writeByte(uncBytes >>> 24); + } + } + deflater.end(); + } + + ChannelFuture future = Channels.future(ctx.getChannel()); + Channels.write(ctx, future, footer); + + if (evt != null) { + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) throws Exception { + ctx.sendDownstream(evt); + } + }); + } + + return future; + } + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + } + + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // Unused + } + + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // Unused + } + + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Unused + } +} diff --git a/src/main/java/org/jboss/netty/handler/codec/http/HttpContentCompressor.java b/src/main/java/org/jboss/netty/handler/codec/http/HttpContentCompressor.java index 3cfd859a19..8a245781e4 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/HttpContentCompressor.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/HttpContentCompressor.java @@ -16,9 +16,11 @@ package org.jboss.netty.handler.codec.http; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.handler.codec.compression.NativeZlibEncoder; import org.jboss.netty.handler.codec.compression.ZlibEncoder; import org.jboss.netty.handler.codec.compression.ZlibWrapper; import org.jboss.netty.handler.codec.embedder.EncoderEmbedder; +import org.jboss.netty.util.internal.DetectionUtil; /** * Compresses an {@link HttpMessage} and an {@link HttpChunk} in {@code gzip} or @@ -98,8 +100,13 @@ public class HttpContentCompressor extends HttpContentEncoder { return null; } - return new EncoderEmbedder( - new ZlibEncoder(wrapper, compressionLevel, windowBits, memLevel)); + if (DetectionUtil.javaVersion() >= 7) { + return new EncoderEmbedder( + new NativeZlibEncoder(wrapper, compressionLevel)); + } else { + return new EncoderEmbedder( + new ZlibEncoder(wrapper, compressionLevel, windowBits, memLevel)); + } } @Override