* Fixed a bug where ZlibDecoder fails to recognize Z_STREAM_END result code

* Fixed a bug where ZlibEncoder does not finish the compressed stream with the ADLER32 checksum
This commit is contained in:
Trustin Lee 2009-10-21 03:34:23 +00:00
parent a7132ee08e
commit ab6a869825
2 changed files with 158 additions and 8 deletions

View File

@ -75,6 +75,8 @@ public class ZlibDecoder extends OneToOneDecoder {
// Decompress 'in' into 'out'
int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH);
switch (resultCode) {
case JZlib.Z_STREAM_END:
// TODO: Remove myself from the pipeline
case JZlib.Z_OK:
case JZlib.Z_BUF_ERROR:
decompressed.writeBytes(out, 0, z.next_out_index);

View File

@ -15,11 +15,18 @@
*/
package org.jboss.netty.handler.codec.compression;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import com.jcraft.jzlib.JZlib;
@ -37,20 +44,27 @@ import com.jcraft.jzlib.ZStreamException;
public class ZlibEncoder extends OneToOneEncoder {
private final ZStream z = new ZStream();
private final AtomicBoolean finished = new AtomicBoolean();
// TODO 'do not compress' once option
// TODO support three wrappers - zlib (default), gzip (unsupported by jzlib, but easy to implement), nowrap
// TODO Disallow preset dictionary for gzip
// TODO add close() method
// FIXME thread safety
/**
* Creates a new GZip encoder with the default compression level
* Creates a new zlib encoder with the default compression level
* ({@link JZlib#Z_DEFAULT_COMPRESSION}).
*
* @throws ZStreamException if failed to initialize zlib
*/
public ZlibEncoder() {
public ZlibEncoder() throws ZStreamException {
this(JZlib.Z_DEFAULT_COMPRESSION);
}
/**
* Creates a new GZip encoder with the specified {@code compressionLevel}.
* Creates a new zlib encoder with the specified {@code compressionLevel}.
*
* @param compressionLevel
* the compression level, as specified in {@link JZlib}.
@ -59,9 +73,56 @@ public class ZlibEncoder extends OneToOneEncoder {
* {@link JZlib#Z_BEST_SPEED},
* {@link JZlib#Z_DEFAULT_COMPRESSION}, and
* {@link JZlib#Z_NO_COMPRESSION}.
*
* @throws ZStreamException if failed to initialize zlib
*/
public ZlibEncoder(int compressionLevel) {
z.deflateInit(compressionLevel, false); // Default: ZLIB format
public ZlibEncoder(int compressionLevel) throws ZStreamException {
int resultCode = z.deflateInit(compressionLevel, false); // Default: ZLIB format
if (resultCode != JZlib.Z_OK) {
fail("initialization failure", resultCode);
}
}
/**
* Creates a new zlib encoder with the default compression level
* ({@link JZlib#Z_DEFAULT_COMPRESSION}) and the specified preset
* dictionary.
*
* @param dictionary the preset dictionary
*
* @throws ZStreamException if failed to initialize zlib
*/
public ZlibEncoder(byte[] dictionary) throws ZStreamException {
this(JZlib.Z_DEFAULT_COMPRESSION, dictionary);
}
/**
* Creates a new zlib encoder with the specified {@code compressionLevel}
* and the specified preset dictionary.
*
* @param compressionLevel
* the compression level, as specified in {@link JZlib}.
* The common values are
* {@link JZlib#Z_BEST_COMPRESSION},
* {@link JZlib#Z_BEST_SPEED},
* {@link JZlib#Z_DEFAULT_COMPRESSION}, and
* {@link JZlib#Z_NO_COMPRESSION}.
* @param dictionary the preset dictionary
*
* @throws ZStreamException if failed to initialize zlib
*/
public ZlibEncoder(int compressionLevel, byte[] dictionary) throws ZStreamException {
int resultCode;
resultCode = z.deflateInit(compressionLevel, false); // Default: ZLIB format
if (resultCode != JZlib.Z_OK) {
fail("initialization failure", resultCode);
} else {
resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
if (resultCode != JZlib.Z_OK){
fail("failed to set the dictionary", resultCode);
}
}
}
@Override
@ -88,9 +149,7 @@ public class ZlibEncoder extends OneToOneEncoder {
// Note that Z_PARTIAL_FLUSH has been deprecated.
int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
if (resultCode != JZlib.Z_OK) {
throw new ZStreamException(
"compression failure (" + resultCode + ")" +
(z.msg != null? ": " + z.msg : ""));
fail("compression failure", resultCode);
}
if (z.next_out_index != 0) {
@ -108,4 +167,93 @@ public class ZlibEncoder extends OneToOneEncoder {
z.next_out = null;
}
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.getState()) {
case OPEN:
case CONNECTED:
case BOUND:
if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
finishEncode(ctx, evt);
return;
}
}
}
super.handleDownstream(ctx, evt);
}
private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
if (!finished.compareAndSet(false, true)) {
return Channels.failedFuture(
ctx.getChannel(),
new ZStreamException("zlib stream closed already"));
}
try {
// Configure input.
z.next_in = new byte[0];
z.next_in_index = 0;
z.avail_in = 0;
// Configure output.
byte[] out = new byte[8]; // Minimum room for ADLER32 + ZLIB header
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
ChannelFuture future;
// Write the ADLER32 checksum.
int resultCode = z.deflate(JZlib.Z_FINISH);
if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
future = Channels.failedFuture(
ctx.getChannel(),
exception("compression failure", resultCode));
} else if (z.next_out_index != 0) {
future = Channels.future(ctx.getChannel());
Channels.write(
ctx, future,
ctx.getChannel().getConfig().getBufferFactory().getBuffer(
out, 0, z.next_out_index));
} else {
// Note that we don't return a SucceededChannelFuture
// just in case any downstream handler or a sink wants to
// notify a write error.
future = Channels.future(ctx.getChannel());
Channels.write(ctx, future, ChannelBuffers.EMPTY_BUFFER);
}
if (evt != null) {
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
ctx.sendDownstream(evt);
}
});
}
return future;
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
}
private void fail(String message, int resultCode) throws ZStreamException {
throw exception(message, resultCode);
}
private ZStreamException exception(String message, int resultCode) {
return new ZStreamException(message + " (" + resultCode + ")" +
(z.msg != null? ": " + z.msg : ""));
}
}