diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentCompressor.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentCompressor.java index 2c8b917640..cf85a2f51e 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentCompressor.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentCompressor.java @@ -16,7 +16,7 @@ package io.netty.handler.codec.http; import io.netty.channel.embedded.EmbeddedByteChannel; -import io.netty.handler.codec.compression.ZlibEncoder; +import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.compression.ZlibWrapper; /** @@ -118,8 +118,8 @@ public class HttpContentCompressor extends HttpContentEncoder { return new Result( targetContentEncoding, - new EmbeddedByteChannel( - new ZlibEncoder(wrapper, compressionLevel, windowBits, memLevel))); + new EmbeddedByteChannel(ZlibCodecFactory.newZlibEncoder( + wrapper, compressionLevel, windowBits, memLevel))); } protected ZlibWrapper determineWrapper(String acceptEncoding) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java index 8911d85505..b4b6053132 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpContentDecompressor.java @@ -16,7 +16,7 @@ package io.netty.handler.codec.http; import io.netty.channel.embedded.EmbeddedByteChannel; -import io.netty.handler.codec.compression.ZlibDecoder; +import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.compression.ZlibWrapper; /** @@ -28,10 +28,10 @@ public class HttpContentDecompressor extends HttpContentDecoder { @Override protected EmbeddedByteChannel newContentDecoder(String contentEncoding) throws Exception { if ("gzip".equalsIgnoreCase(contentEncoding) || "x-gzip".equalsIgnoreCase(contentEncoding)) { - return new EmbeddedByteChannel(new ZlibDecoder(ZlibWrapper.GZIP)); + return new EmbeddedByteChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); } else if ("deflate".equalsIgnoreCase(contentEncoding) || "x-deflate".equalsIgnoreCase(contentEncoding)) { // To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly. - return new EmbeddedByteChannel(new ZlibDecoder(ZlibWrapper.ZLIB_OR_NONE)); + return new EmbeddedByteChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.ZLIB_OR_NONE)); } // 'identity' or unsupported diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java new file mode 100644 index 0000000000..ddd12a527a --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibDecoder.java @@ -0,0 +1,178 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.internal.jzlib.JZlib; +import io.netty.util.internal.jzlib.ZStream; + +public class JZlibDecoder extends ZlibDecoder { + + private final ZStream z = new ZStream(); + private byte[] dictionary; + private volatile boolean finished; + + /** + * Creates a new instance with the default wrapper ({@link ZlibWrapper#ZLIB}). + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibDecoder() { + this(ZlibWrapper.ZLIB); + } + + /** + * Creates a new instance with the specified wrapper. + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibDecoder(ZlibWrapper wrapper) { + if (wrapper == null) { + throw new NullPointerException("wrapper"); + } + + int resultCode = z.inflateInit(ZlibUtil.convertWrapperType(wrapper)); + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "initialization failure", resultCode); + } + } + + /** + * Creates a new instance with the specified preset dictionary. The wrapper + * is always {@link ZlibWrapper#ZLIB} because it is the only format that + * supports the preset dictionary. + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibDecoder(byte[] dictionary) { + if (dictionary == null) { + throw new NullPointerException("dictionary"); + } + this.dictionary = dictionary; + + int resultCode; + resultCode = z.inflateInit(JZlib.W_ZLIB); + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "initialization failure", resultCode); + } + } + + /** + * Returns {@code true} if and only if the end of the compressed stream + * has been reached. + */ + @Override + public boolean isClosed() { + return finished; + } + + @Override + public void decode( + ChannelHandlerContext ctx, + ByteBuf in, ByteBuf out) throws Exception { + + if (!in.readable()) { + return; + } + + try { + // Configure input. + int inputLength = in.readableBytes(); + boolean inHasArray = in.hasArray(); + z.avail_in = inputLength; + if (inHasArray) { + z.next_in = in.array(); + z.next_in_index = in.arrayOffset() + in.readerIndex(); + } else { + byte[] array = new byte[inputLength]; + in.readBytes(array); + z.next_in = array; + z.next_in_index = 0; + } + int oldNextInIndex = z.next_in_index; + + // Configure output. + int maxOutputLength = inputLength << 1; + boolean outHasArray = out.hasArray(); + if (!outHasArray) { + z.next_out = new byte[maxOutputLength]; + } + + try { + loop: for (;;) { + z.avail_out = maxOutputLength; + if (outHasArray) { + out.ensureWritableBytes(maxOutputLength); + z.next_out = out.array(); + z.next_out_index = out.arrayOffset() + out.writerIndex(); + } else { + z.next_out_index = 0; + } + int oldNextOutIndex = z.next_out_index; + + // Decompress 'in' into 'out' + int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH); + int outputLength = z.next_out_index - oldNextOutIndex; + if (outputLength > 0) { + if (outHasArray) { + out.writerIndex(out.writerIndex() + outputLength); + } else { + out.writeBytes(z.next_out, 0, outputLength); + } + } + + switch (resultCode) { + case JZlib.Z_NEED_DICT: + if (dictionary == null) { + ZlibUtil.fail(z, "decompression failure", resultCode); + } else { + resultCode = z.inflateSetDictionary(dictionary, dictionary.length); + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "failed to set the dictionary", resultCode); + } + } + break; + case JZlib.Z_STREAM_END: + finished = true; // Do not decode anymore. + z.inflateEnd(); + break loop; + case JZlib.Z_OK: + break; + case JZlib.Z_BUF_ERROR: + if (z.avail_in <= 0) { + break loop; + } + break; + default: + ZlibUtil.fail(z, "decompression failure", resultCode); + } + } + } finally { + if (inHasArray) { + in.skipBytes(z.next_in_index - oldNextInIndex); + } + } + } finally { + // Deference the external references explicitly to tell the VM that + // the allocated byte arrays are temporary so that the call stack + // can be utilized. + // I'm not sure if the modern VMs do this optimization though. + z.next_in = null; + z.next_out = null; + } + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java new file mode 100644 index 0000000000..a5701630ff --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -0,0 +1,434 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.internal.jzlib.JZlib; +import io.netty.util.internal.jzlib.ZStream; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * Compresses a {@link ByteBuf} using the deflate algorithm. + * @apiviz.landmark + * @apiviz.has io.netty.handler.codec.compression.ZlibWrapper + */ +public class JZlibEncoder extends ZlibEncoder { + + private static final byte[] EMPTY_ARRAY = new byte[0]; + + private final ZStream z = new ZStream(); + private final AtomicBoolean finished = new AtomicBoolean(); + private volatile ChannelHandlerContext ctx; + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}), + * default window bits ({@code 15}), default memory level ({@code 8}), + * and the default wrapper ({@link ZlibWrapper#ZLIB}). + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder() { + this(6); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel}, + * default window bits ({@code 15}), default memory level ({@code 8}), + * and the default wrapper ({@link ZlibWrapper#ZLIB}). + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder(int compressionLevel) { + this(ZlibWrapper.ZLIB, compressionLevel); + } + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}), + * default window bits ({@code 15}), default memory level ({@code 8}), + * and the specified wrapper. + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder(ZlibWrapper wrapper) { + this(wrapper, 6); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel}, + * default window bits ({@code 15}), default memory level ({@code 8}), + * and the specified wrapper. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { + this(wrapper, compressionLevel, 15, 8); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel}, + * the specified {@code windowBits}, the specified {@code memLevel}, and + * the specified wrapper. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * @param windowBits + * The base two logarithm of the size of the history buffer. The + * value should be in the range {@code 9} to {@code 15} inclusive. + * Larger values result in better compression at the expense of + * memory usage. The default value is {@code 15}. + * @param memLevel + * How much memory should be allocated for the internal compression + * state. {@code 1} uses minimum memory and {@code 9} uses maximum + * memory. Larger values result in better and faster compression + * at the expense of memory usage. The default value is {@code 8} + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) { + + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException( + "compressionLevel: " + compressionLevel + + " (expected: 0-9)"); + } + if (windowBits < 9 || windowBits > 15) { + throw new IllegalArgumentException( + "windowBits: " + windowBits + " (expected: 9-15)"); + } + if (memLevel < 1 || memLevel > 9) { + throw new IllegalArgumentException( + "memLevel: " + memLevel + " (expected: 1-9)"); + } + if (wrapper == null) { + throw new NullPointerException("wrapper"); + } + if (wrapper == ZlibWrapper.ZLIB_OR_NONE) { + throw new IllegalArgumentException( + "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " + + "allowed for compression."); + } + + synchronized (z) { + int resultCode = z.deflateInit( + compressionLevel, windowBits, memLevel, + ZlibUtil.convertWrapperType(wrapper)); + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "initialization failure", resultCode); + } + } + } + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}), + * default window bits ({@code 15}), default memory level ({@code 8}), + * and the specified preset dictionary. The wrapper is always + * {@link ZlibWrapper#ZLIB} because it is the only format that supports + * the preset dictionary. + * + * @param dictionary the preset dictionary + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder(byte[] dictionary) { + this(6, dictionary); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel}, + * default window bits ({@code 15}), default memory level ({@code 8}), + * and the specified preset dictionary. The wrapper is always + * {@link ZlibWrapper#ZLIB} because it is the only format that supports + * the preset dictionary. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * @param dictionary the preset dictionary + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder(int compressionLevel, byte[] dictionary) { + this(compressionLevel, 15, 8, dictionary); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel}, + * the specified {@code windowBits}, the specified {@code memLevel}, + * and the specified preset dictionary. The wrapper is always + * {@link ZlibWrapper#ZLIB} because it is the only format that supports + * the preset dictionary. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * @param windowBits + * The base two logarithm of the size of the history buffer. The + * value should be in the range {@code 9} to {@code 15} inclusive. + * Larger values result in better compression at the expense of + * memory usage. The default value is {@code 15}. + * @param memLevel + * How much memory should be allocated for the internal compression + * state. {@code 1} uses minimum memory and {@code 9} uses maximum + * memory. Larger values result in better and faster compression + * at the expense of memory usage. The default value is {@code 8} + * @param dictionary the preset dictionary + * + * @throws CompressionException if failed to initialize zlib + */ + public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) { + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)"); + } + if (windowBits < 9 || windowBits > 15) { + throw new IllegalArgumentException( + "windowBits: " + windowBits + " (expected: 9-15)"); + } + if (memLevel < 1 || memLevel > 9) { + throw new IllegalArgumentException( + "memLevel: " + memLevel + " (expected: 1-9)"); + } + if (dictionary == null) { + throw new NullPointerException("dictionary"); + } + + synchronized (z) { + int resultCode; + resultCode = z.deflateInit( + compressionLevel, windowBits, memLevel, + JZlib.W_ZLIB); // Default: ZLIB format + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "initialization failure", resultCode); + } else { + resultCode = z.deflateSetDictionary(dictionary, dictionary.length); + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "failed to set the dictionary", resultCode); + } + } + } + } + + @Override + public ChannelFuture close() { + return close(ctx().channel().newFuture()); + } + + @Override + public ChannelFuture close(ChannelFuture future) { + return finishEncode(ctx(), future); + } + + private ChannelHandlerContext ctx() { + ChannelHandlerContext ctx = this.ctx; + if (ctx == null) { + throw new IllegalStateException("not added to a pipeline"); + } + return ctx; + } + + @Override + public boolean isClosed() { + return finished.get(); + } + + @Override + public void encode(ChannelHandlerContext ctx, + ByteBuf in, ByteBuf out) throws Exception { + if (finished.get()) { + return; + } + + synchronized (z) { + try { + // Configure input. + int inputLength = in.readableBytes(); + boolean inHasArray = in.hasArray(); + z.avail_in = inputLength; + if (inHasArray) { + z.next_in = in.array(); + z.next_in_index = in.arrayOffset() + in.readerIndex(); + } else { + byte[] array = new byte[inputLength]; + in.readBytes(array); + z.next_in = array; + z.next_in_index = 0; + } + int oldNextInIndex = z.next_in_index; + + // Configure output. + int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12; + boolean outHasArray = out.hasArray(); + z.avail_out = maxOutputLength; + if (outHasArray) { + out.ensureWritableBytes(maxOutputLength); + z.next_out = out.array(); + z.next_out_index = out.arrayOffset() + out.writerIndex(); + } else { + z.next_out = new byte[maxOutputLength]; + z.next_out_index = 0; + } + int oldNextOutIndex = z.next_out_index; + + // Note that Z_PARTIAL_FLUSH has been deprecated. + int resultCode; + try { + resultCode = z.deflate(JZlib.Z_SYNC_FLUSH); + } finally { + if (inHasArray) { + in.skipBytes(z.next_in_index - oldNextInIndex); + } + } + + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "compression failure", resultCode); + } + + int outputLength = z.next_out_index - oldNextOutIndex; + if (outputLength > 0) { + if (outHasArray) { + out.writerIndex(out.writerIndex() + outputLength); + } else { + out.writeBytes(z.next_out, 0, outputLength); + } + } + } finally { + // Deference the external references explicitly to tell the VM that + // the allocated byte arrays are temporary so that the call stack + // can be utilized. + // I'm not sure if the modern VMs do this optimization though. + z.next_in = null; + z.next_out = null; + } + } + } + + @Override + public void disconnect( + final ChannelHandlerContext ctx, + final ChannelFuture future) throws Exception { + ChannelFuture f = finishEncode(ctx, ctx.newFuture()); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + ctx.disconnect(future); + } + }); + + if (!f.isDone()) { + // Ensure the channel is closed even if the write operation completes in time. + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.disconnect(future); + } + }, 10, TimeUnit.SECONDS); // FIXME: Magic number + } + } + + @Override + public void close( + final ChannelHandlerContext ctx, + final ChannelFuture future) throws Exception { + ChannelFuture f = finishEncode(ctx, ctx.newFuture()); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + ctx.close(future); + } + }); + + if (!f.isDone()) { + // Ensure the channel is closed even if the write operation completes in time. + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.close(future); + } + }, 10, TimeUnit.SECONDS); // FIXME: Magic number + } + } + + private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelFuture future) { + if (!finished.compareAndSet(false, true)) { + future.setSuccess(); + return future; + } + + ByteBuf footer; + synchronized (z) { + try { + // Configure input. + z.next_in = EMPTY_ARRAY; + z.next_in_index = 0; + z.avail_in = 0; + + // Configure output. + byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header + z.next_out = out; + z.next_out_index = 0; + z.avail_out = out.length; + + // Write the ADLER32 checksum (stream footer). + int resultCode = z.deflate(JZlib.Z_FINISH); + if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) { + future.setFailure(ZlibUtil.exception(z, "compression failure", resultCode)); + return future; + } else if (z.next_out_index != 0) { + footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index); + } else { + footer = Unpooled.EMPTY_BUFFER; + } + } finally { + z.deflateEnd(); + + // Deference the external references explicitly to tell the VM that + // the allocated byte arrays are temporary so that the call stack + // can be utilized. + // I'm not sure if the modern VMs do this optimization though. + z.next_in = null; + z.next_out = null; + } + } + + ctx.write(footer, future); + return future; + } + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java new file mode 100644 index 0000000000..ba4810df6d --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -0,0 +1,296 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.CRC32; +import java.util.zip.Deflater; + + +/** + * Compresses a {@link ByteBuf} using the deflate algorithm. + * @apiviz.landmark + * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper + */ +public class JdkZlibEncoder extends ZlibEncoder { + + private final byte[] encodeBuf = new byte[8192]; + private final Deflater deflater; + private final AtomicBoolean finished = new AtomicBoolean(); + private volatile ChannelHandlerContext ctx; + + /* + * GZIP support + */ + private final boolean gzip; + private final CRC32 crc = new CRC32(); + private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0}; + private boolean writeHeader = true; + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}) + * and the default wrapper ({@link ZlibWrapper#ZLIB}). + * + * @throws CompressionException if failed to initialize zlib + */ + public JdkZlibEncoder() { + this(6); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel} + * and the default wrapper ({@link ZlibWrapper#ZLIB}). + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @throws CompressionException if failed to initialize zlib + */ + public JdkZlibEncoder(int compressionLevel) { + this(ZlibWrapper.ZLIB, compressionLevel); + } + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}) + * and the specified wrapper. + * + * @throws CompressionException if failed to initialize zlib + */ + public JdkZlibEncoder(ZlibWrapper wrapper) { + this(wrapper, 6); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel} + * and the specified wrapper. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * + * @throws CompressionException if failed to initialize zlib + */ + public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException( + "compressionLevel: " + compressionLevel + " (expected: 0-9)"); + } + if (wrapper == null) { + throw new NullPointerException("wrapper"); + } + if (wrapper == ZlibWrapper.ZLIB_OR_NONE) { + throw new IllegalArgumentException( + "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " + + "allowed for compression."); + } + + gzip = wrapper == ZlibWrapper.GZIP; + deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB); + } + + /** + * Creates a new zlib encoder with the default compression level ({@code 6}) + * and the specified preset dictionary. The wrapper is always + * {@link ZlibWrapper#ZLIB} because it is the only format that supports + * the preset dictionary. + * + * @param dictionary the preset dictionary + * + * @throws CompressionException if failed to initialize zlib + */ + public JdkZlibEncoder(byte[] dictionary) { + this(6, dictionary); + } + + /** + * Creates a new zlib encoder with the specified {@code compressionLevel} + * and the specified preset dictionary. The wrapper is always + * {@link ZlibWrapper#ZLIB} because it is the only format that supports + * the preset dictionary. + * + * @param compressionLevel + * {@code 1} yields the fastest compression and {@code 9} yields the + * best compression. {@code 0} means no compression. The default + * compression level is {@code 6}. + * @param dictionary the preset dictionary + * + * @throws CompressionException if failed to initialize zlib + */ + public JdkZlibEncoder(int compressionLevel, byte[] dictionary) { + if (compressionLevel < 0 || compressionLevel > 9) { + throw new IllegalArgumentException( + "compressionLevel: " + compressionLevel + " (expected: 0-9)"); + } + if (dictionary == null) { + throw new NullPointerException("dictionary"); + } + + gzip = false; + deflater = new Deflater(compressionLevel); + deflater.setDictionary(dictionary); + } + + @Override + public ChannelFuture close() { + return close(ctx().newFuture()); + } + + @Override + public ChannelFuture close(ChannelFuture future) { + return finishEncode(ctx(), future); + } + + private ChannelHandlerContext ctx() { + ChannelHandlerContext ctx = this.ctx; + if (ctx == null) { + throw new IllegalStateException("not added to a pipeline"); + } + return ctx; + } + + @Override + public boolean isClosed() { + return finished.get(); + } + + @Override + public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + if (finished.get()) { + out.writeBytes(in); + in.discardReadBytes(); + return; + } + + ByteBuf uncompressed = in; + byte[] inAry = new byte[uncompressed.readableBytes()]; + uncompressed.readBytes(inAry); + + int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12; + out.ensureWritableBytes(sizeEstimate); + + synchronized (deflater) { + if (gzip) { + crc.update(inAry); + if (writeHeader) { + out.writeBytes(gzipHeader); + writeHeader = false; + } + } + + deflater.setInput(inAry); + while (!deflater.needsInput()) { + int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH); + out.writeBytes(encodeBuf, 0, numBytes); + } + } + } + + @Override + public void disconnect(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception { + ChannelFuture f = finishEncode(ctx, ctx.newFuture()); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + ctx.disconnect(future); + } + }); + + if (!f.isDone()) { + // Ensure the channel is closed even if the write operation completes in time. + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.disconnect(future); + } + }, 10, TimeUnit.SECONDS); // FIXME: Magic number + } + } + + @Override + public void close(final ChannelHandlerContext ctx, final ChannelFuture future) throws Exception { + ChannelFuture f = finishEncode(ctx, ctx.newFuture()); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + ctx.close(future); + } + }); + + if (!f.isDone()) { + // Ensure the channel is closed even if the write operation completes in time. + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.close(future); + } + }, 10, TimeUnit.SECONDS); // FIXME: Magic number + } + } + + private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelFuture future) { + if (!finished.compareAndSet(false, true)) { + future.setSuccess(); + return future; + } + + ByteBuf footer = Unpooled.EMPTY_BUFFER; + synchronized (deflater) { + int numBytes = 0; + deflater.finish(); + if (!deflater.finished()) { + numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length); + } + int footerSize = gzip ? numBytes + 8 : numBytes; + if (footerSize > 0) { + footer = Unpooled.buffer(footerSize); + footer.writeBytes(encodeBuf, 0, numBytes); + if (gzip) { + int crcValue = (int) crc.getValue(); + int uncBytes = deflater.getTotalIn(); + footer.writeByte(crcValue); + footer.writeByte(crcValue >>> 8); + footer.writeByte(crcValue >>> 16); + footer.writeByte(crcValue >>> 24); + footer.writeByte(uncBytes); + footer.writeByte(uncBytes >>> 8); + footer.writeByte(uncBytes >>> 16); + footer.writeByte(uncBytes >>> 24); + } + } + deflater.end(); + } + + ctx.nextOutboundByteBuffer().writeBytes(footer); + ctx.flush(future); + + return future; + } + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibCodecFactory.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibCodecFactory.java new file mode 100644 index 0000000000..5d8609ce0a --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibCodecFactory.java @@ -0,0 +1,96 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.util.internal.DetectionUtil; + +/** + * Creates a new {@link ZlibEncoder} and a new {@link ZlibDecoder}. + */ +public final class ZlibCodecFactory { + + public static ZlibEncoder newZlibEncoder(int compressionLevel) { + if (DetectionUtil.javaVersion() < 7) { + return new JZlibEncoder(compressionLevel); + } else { + return new JdkZlibEncoder(compressionLevel); + } + } + + public static ZlibEncoder newZlibEncoder(ZlibWrapper wrapper) { + if (DetectionUtil.javaVersion() < 7) { + return new JZlibEncoder(wrapper); + } else { + return new JdkZlibEncoder(wrapper); + } + } + + public static ZlibEncoder newZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { + if (DetectionUtil.javaVersion() < 7) { + return new JZlibEncoder(wrapper, compressionLevel); + } else { + return new JdkZlibEncoder(wrapper, compressionLevel); + } + } + + public static ZlibEncoder newZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) { + if (DetectionUtil.javaVersion() < 7) { + return new JZlibEncoder(wrapper, compressionLevel, windowBits, memLevel); + } else { + return new JdkZlibEncoder(wrapper, compressionLevel); + } + } + + public static ZlibEncoder newZlibEncoder(byte[] dictionary) { + if (DetectionUtil.javaVersion() < 7) { + return new JZlibEncoder(dictionary); + } else { + return new JdkZlibEncoder(dictionary); + } + } + + public static ZlibEncoder newZlibEncoder(int compressionLevel, byte[] dictionary) { + if (DetectionUtil.javaVersion() < 7) { + return new JZlibEncoder(compressionLevel, dictionary); + } else { + return new JdkZlibEncoder(compressionLevel, dictionary); + } + } + + public static ZlibEncoder newZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) { + if (DetectionUtil.javaVersion() < 7) { + return new JZlibEncoder(compressionLevel, windowBits, memLevel, dictionary); + } else { + return new JdkZlibEncoder(compressionLevel, dictionary); + } + } + + public static ZlibDecoder newZlibDecoder() { + return new JZlibDecoder(); + } + + public static ZlibDecoder newZlibDecoder(ZlibWrapper wrapper) { + return new JZlibDecoder(wrapper); + } + + public static ZlibDecoder newZlibDecoder(byte[] dictionary) { + return new JZlibDecoder(dictionary); + } + + private ZlibCodecFactory() { + // Unused + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java index 3fe5a08bd8..a3a68e17e3 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibDecoder.java @@ -16,169 +16,19 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToByteDecoder; -import io.netty.util.internal.jzlib.JZlib; -import io.netty.util.internal.jzlib.ZStream; - /** * Decompresses a {@link ByteBuf} using the deflate algorithm. + * * @apiviz.landmark * @apiviz.has io.netty.handler.codec.compression.ZlibWrapper */ -public class ZlibDecoder extends ByteToByteDecoder { - - private final ZStream z = new ZStream(); - private byte[] dictionary; - private volatile boolean finished; - - /** - * Creates a new instance with the default wrapper ({@link ZlibWrapper#ZLIB}). - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibDecoder() { - this(ZlibWrapper.ZLIB); - } - - /** - * Creates a new instance with the specified wrapper. - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibDecoder(ZlibWrapper wrapper) { - if (wrapper == null) { - throw new NullPointerException("wrapper"); - } - - int resultCode = z.inflateInit(ZlibUtil.convertWrapperType(wrapper)); - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "initialization failure", resultCode); - } - } - - /** - * Creates a new instance with the specified preset dictionary. The wrapper - * is always {@link ZlibWrapper#ZLIB} because it is the only format that - * supports the preset dictionary. - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibDecoder(byte[] dictionary) { - if (dictionary == null) { - throw new NullPointerException("dictionary"); - } - this.dictionary = dictionary; - - int resultCode; - resultCode = z.inflateInit(JZlib.W_ZLIB); - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "initialization failure", resultCode); - } - } +public abstract class ZlibDecoder extends ByteToByteDecoder { /** * Returns {@code true} if and only if the end of the compressed stream * has been reached. */ - public boolean isClosed() { - return finished; - } - - @Override - public void decode( - ChannelHandlerContext ctx, - ByteBuf in, ByteBuf out) throws Exception { - - if (!in.readable()) { - return; - } - - try { - // Configure input. - int inputLength = in.readableBytes(); - boolean inHasArray = in.hasArray(); - z.avail_in = inputLength; - if (inHasArray) { - z.next_in = in.array(); - z.next_in_index = in.arrayOffset() + in.readerIndex(); - } else { - byte[] array = new byte[inputLength]; - in.readBytes(array); - z.next_in = array; - z.next_in_index = 0; - } - int oldNextInIndex = z.next_in_index; - - // Configure output. - int maxOutputLength = inputLength << 1; - boolean outHasArray = out.hasArray(); - if (!outHasArray) { - z.next_out = new byte[maxOutputLength]; - } - - try { - loop: for (;;) { - z.avail_out = maxOutputLength; - if (outHasArray) { - out.ensureWritableBytes(maxOutputLength); - z.next_out = out.array(); - z.next_out_index = out.arrayOffset() + out.writerIndex(); - } else { - z.next_out_index = 0; - } - int oldNextOutIndex = z.next_out_index; - - // Decompress 'in' into 'out' - int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH); - int outputLength = z.next_out_index - oldNextOutIndex; - if (outputLength > 0) { - if (outHasArray) { - out.writerIndex(out.writerIndex() + outputLength); - } else { - out.writeBytes(z.next_out, 0, outputLength); - } - } - - switch (resultCode) { - case JZlib.Z_NEED_DICT: - if (dictionary == null) { - ZlibUtil.fail(z, "decompression failure", resultCode); - } else { - resultCode = z.inflateSetDictionary(dictionary, dictionary.length); - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "failed to set the dictionary", resultCode); - } - } - break; - case JZlib.Z_STREAM_END: - finished = true; // Do not decode anymore. - z.inflateEnd(); - break loop; - case JZlib.Z_OK: - break; - case JZlib.Z_BUF_ERROR: - if (z.avail_in <= 0) { - break loop; - } - break; - default: - ZlibUtil.fail(z, "decompression failure", resultCode); - } - } - } finally { - if (inHasArray) { - in.skipBytes(z.next_in_index - oldNextInIndex); - } - } - } finally { - // Deference the external references explicitly to tell the VM that - // the allocated byte arrays are temporary so that the call stack - // can be utilized. - // I'm not sure if the modern VMs do this optimization though. - z.next_in = null; - z.next_out = null; - } - } + public abstract boolean isClosed(); } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java index b5e559d811..e81a0a0413 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java @@ -16,394 +16,25 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToByteEncoder; -import io.netty.util.internal.jzlib.JZlib; -import io.netty.util.internal.jzlib.ZStream; - -import java.util.concurrent.atomic.AtomicBoolean; - /** - * Compresses a {@link ByteBuf} using the deflate algorithm. + * Decompresses a {@link ByteBuf} using the deflate algorithm. + * * @apiviz.landmark * @apiviz.has io.netty.handler.codec.compression.ZlibWrapper */ -public class ZlibEncoder extends ByteToByteEncoder { - - private static final byte[] EMPTY_ARRAY = new byte[0]; - - private final ZStream z = new ZStream(); - private final AtomicBoolean finished = new AtomicBoolean(); - private volatile ChannelHandlerContext ctx; +public abstract class ZlibEncoder extends ByteToByteEncoder { /** - * Creates a new zlib encoder with the default compression level ({@code 6}), - * default window bits ({@code 15}), default memory level ({@code 8}), - * and the default wrapper ({@link ZlibWrapper#ZLIB}). - * - * @throws CompressionException if failed to initialize zlib + * Returns {@code true} if and only if the end of the compressed stream + * has been reached. */ - public ZlibEncoder() { - this(6); - } + public abstract boolean isClosed(); - /** - * Creates a new zlib encoder with the specified {@code compressionLevel}, - * default window bits ({@code 15}), default memory level ({@code 8}), - * and the default wrapper ({@link ZlibWrapper#ZLIB}). - * - * @param compressionLevel - * {@code 1} yields the fastest compression and {@code 9} yields the - * best compression. {@code 0} means no compression. The default - * compression level is {@code 6}. - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibEncoder(int compressionLevel) { - this(ZlibWrapper.ZLIB, compressionLevel); - } + public abstract ChannelFuture close(); - /** - * Creates a new zlib encoder with the default compression level ({@code 6}), - * default window bits ({@code 15}), default memory level ({@code 8}), - * and the specified wrapper. - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibEncoder(ZlibWrapper wrapper) { - this(wrapper, 6); - } + public abstract ChannelFuture close(ChannelFuture future); - /** - * Creates a new zlib encoder with the specified {@code compressionLevel}, - * default window bits ({@code 15}), default memory level ({@code 8}), - * and the specified wrapper. - * - * @param compressionLevel - * {@code 1} yields the fastest compression and {@code 9} yields the - * best compression. {@code 0} means no compression. The default - * compression level is {@code 6}. - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) { - this(wrapper, compressionLevel, 15, 8); - } - - /** - * Creates a new zlib encoder with the specified {@code compressionLevel}, - * the specified {@code windowBits}, the specified {@code memLevel}, and - * the specified wrapper. - * - * @param compressionLevel - * {@code 1} yields the fastest compression and {@code 9} yields the - * best compression. {@code 0} means no compression. The default - * compression level is {@code 6}. - * @param windowBits - * The base two logarithm of the size of the history buffer. The - * value should be in the range {@code 9} to {@code 15} inclusive. - * Larger values result in better compression at the expense of - * memory usage. The default value is {@code 15}. - * @param memLevel - * How much memory should be allocated for the internal compression - * state. {@code 1} uses minimum memory and {@code 9} uses maximum - * memory. Larger values result in better and faster compression - * at the expense of memory usage. The default value is {@code 8} - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) { - - if (compressionLevel < 0 || compressionLevel > 9) { - throw new IllegalArgumentException( - "compressionLevel: " + compressionLevel + - " (expected: 0-9)"); - } - if (windowBits < 9 || windowBits > 15) { - throw new IllegalArgumentException( - "windowBits: " + windowBits + " (expected: 9-15)"); - } - if (memLevel < 1 || memLevel > 9) { - throw new IllegalArgumentException( - "memLevel: " + memLevel + " (expected: 1-9)"); - } - if (wrapper == null) { - throw new NullPointerException("wrapper"); - } - if (wrapper == ZlibWrapper.ZLIB_OR_NONE) { - throw new IllegalArgumentException( - "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " + - "allowed for compression."); - } - - synchronized (z) { - int resultCode = z.deflateInit( - compressionLevel, windowBits, memLevel, - ZlibUtil.convertWrapperType(wrapper)); - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "initialization failure", resultCode); - } - } - } - - /** - * Creates a new zlib encoder with the default compression level ({@code 6}), - * default window bits ({@code 15}), default memory level ({@code 8}), - * and the specified preset dictionary. The wrapper is always - * {@link ZlibWrapper#ZLIB} because it is the only format that supports - * the preset dictionary. - * - * @param dictionary the preset dictionary - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibEncoder(byte[] dictionary) { - this(6, dictionary); - } - - /** - * Creates a new zlib encoder with the specified {@code compressionLevel}, - * default window bits ({@code 15}), default memory level ({@code 8}), - * and the specified preset dictionary. The wrapper is always - * {@link ZlibWrapper#ZLIB} because it is the only format that supports - * the preset dictionary. - * - * @param compressionLevel - * {@code 1} yields the fastest compression and {@code 9} yields the - * best compression. {@code 0} means no compression. The default - * compression level is {@code 6}. - * @param dictionary the preset dictionary - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibEncoder(int compressionLevel, byte[] dictionary) { - this(compressionLevel, 15, 8, dictionary); - } - - /** - * Creates a new zlib encoder with the specified {@code compressionLevel}, - * the specified {@code windowBits}, the specified {@code memLevel}, - * and the specified preset dictionary. The wrapper is always - * {@link ZlibWrapper#ZLIB} because it is the only format that supports - * the preset dictionary. - * - * @param compressionLevel - * {@code 1} yields the fastest compression and {@code 9} yields the - * best compression. {@code 0} means no compression. The default - * compression level is {@code 6}. - * @param windowBits - * The base two logarithm of the size of the history buffer. The - * value should be in the range {@code 9} to {@code 15} inclusive. - * Larger values result in better compression at the expense of - * memory usage. The default value is {@code 15}. - * @param memLevel - * How much memory should be allocated for the internal compression - * state. {@code 1} uses minimum memory and {@code 9} uses maximum - * memory. Larger values result in better and faster compression - * at the expense of memory usage. The default value is {@code 8} - * @param dictionary the preset dictionary - * - * @throws CompressionException if failed to initialize zlib - */ - public ZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) { - if (compressionLevel < 0 || compressionLevel > 9) { - throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)"); - } - if (windowBits < 9 || windowBits > 15) { - throw new IllegalArgumentException( - "windowBits: " + windowBits + " (expected: 9-15)"); - } - if (memLevel < 1 || memLevel > 9) { - throw new IllegalArgumentException( - "memLevel: " + memLevel + " (expected: 1-9)"); - } - if (dictionary == null) { - throw new NullPointerException("dictionary"); - } - - synchronized (z) { - int resultCode; - resultCode = z.deflateInit( - compressionLevel, windowBits, memLevel, - JZlib.W_ZLIB); // Default: ZLIB format - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "initialization failure", resultCode); - } else { - resultCode = z.deflateSetDictionary(dictionary, dictionary.length); - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "failed to set the dictionary", resultCode); - } - } - } - } - - public ChannelFuture close() { - return close(ctx().channel().newFuture()); - } - - public ChannelFuture close(ChannelFuture future) { - return finishEncode(ctx(), future); - } - - private ChannelHandlerContext ctx() { - ChannelHandlerContext ctx = this.ctx; - if (ctx == null) { - throw new IllegalStateException("not added to a pipeline"); - } - return ctx; - } - - public boolean isClosed() { - return finished.get(); - } - - @Override - public void encode(ChannelHandlerContext ctx, - ByteBuf in, ByteBuf out) throws Exception { - if (finished.get()) { - return; - } - - synchronized (z) { - try { - // Configure input. - int inputLength = in.readableBytes(); - boolean inHasArray = in.hasArray(); - z.avail_in = inputLength; - if (inHasArray) { - z.next_in = in.array(); - z.next_in_index = in.arrayOffset() + in.readerIndex(); - } else { - byte[] array = new byte[inputLength]; - in.readBytes(array); - z.next_in = array; - z.next_in_index = 0; - } - int oldNextInIndex = z.next_in_index; - - // Configure output. - int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12; - boolean outHasArray = out.hasArray(); - z.avail_out = maxOutputLength; - if (outHasArray) { - out.ensureWritableBytes(maxOutputLength); - z.next_out = out.array(); - z.next_out_index = out.arrayOffset() + out.writerIndex(); - } else { - z.next_out = new byte[maxOutputLength]; - z.next_out_index = 0; - } - int oldNextOutIndex = z.next_out_index; - - // Note that Z_PARTIAL_FLUSH has been deprecated. - int resultCode; - try { - resultCode = z.deflate(JZlib.Z_SYNC_FLUSH); - } finally { - if (inHasArray) { - in.skipBytes(z.next_in_index - oldNextInIndex); - } - } - - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "compression failure", resultCode); - } - - int outputLength = z.next_out_index - oldNextOutIndex; - if (outputLength > 0) { - if (outHasArray) { - out.writerIndex(out.writerIndex() + outputLength); - } else { - out.writeBytes(z.next_out, 0, outputLength); - } - } - } finally { - // Deference the external references explicitly to tell the VM that - // the allocated byte arrays are temporary so that the call stack - // can be utilized. - // I'm not sure if the modern VMs do this optimization though. - z.next_in = null; - z.next_out = null; - } - } - } - - @Override - public void disconnect( - final ChannelHandlerContext ctx, - final ChannelFuture future) throws Exception { - finishEncode(ctx, ctx.newFuture()).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - ctx.disconnect(future); - } - }); - } - - @Override - public void close( - final ChannelHandlerContext ctx, - final ChannelFuture future) throws Exception { - finishEncode(ctx, ctx.newFuture()).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - ctx.close(future); - } - }); - } - - private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelFuture future) { - if (!finished.compareAndSet(false, true)) { - future.setSuccess(); - return future; - } - - ByteBuf footer; - synchronized (z) { - try { - // Configure input. - z.next_in = EMPTY_ARRAY; - z.next_in_index = 0; - z.avail_in = 0; - - // Configure output. - byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header - z.next_out = out; - z.next_out_index = 0; - z.avail_out = out.length; - - // Write the ADLER32 checksum (stream footer). - int resultCode = z.deflate(JZlib.Z_FINISH); - if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) { - future.setFailure(ZlibUtil.exception(z, "compression failure", resultCode)); - return future; - } else if (z.next_out_index != 0) { - footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index); - } else { - footer = Unpooled.EMPTY_BUFFER; - } - } finally { - z.deflateEnd(); - - // Deference the external references explicitly to tell the VM that - // the allocated byte arrays are temporary so that the call stack - // can be utilized. - // I'm not sure if the modern VMs do this optimization though. - z.next_in = null; - z.next_out = null; - } - } - - ctx.write(footer, future); - return future; - } - - @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - } } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java index fd3520dd77..0eaebd356c 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibUtil.java @@ -19,7 +19,7 @@ import io.netty.util.internal.jzlib.JZlib; import io.netty.util.internal.jzlib.ZStream; /** - * Utility methods used by {@link ZlibEncoder} and {@link ZlibDecoder}. + * Utility methods used by {@link JZlibEncoder} and {@link JZlibDecoder}. */ final class ZlibUtil { diff --git a/example/src/main/java/io/netty/example/factorial/FactorialClientInitializer.java b/example/src/main/java/io/netty/example/factorial/FactorialClientInitializer.java index 47e242e921..959c1d6ffc 100644 --- a/example/src/main/java/io/netty/example/factorial/FactorialClientInitializer.java +++ b/example/src/main/java/io/netty/example/factorial/FactorialClientInitializer.java @@ -18,8 +18,7 @@ package io.netty.example.factorial; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.compression.ZlibDecoder; -import io.netty.handler.codec.compression.ZlibEncoder; +import io.netty.handler.codec.compression.ZlibCodecFactory; import io.netty.handler.codec.compression.ZlibWrapper; /** @@ -38,8 +37,8 @@ public class FactorialClientInitializer extends ChannelInitializer