* Thread safety

* Proper destruction of deflater and inflater
This commit is contained in:
Trustin Lee 2009-10-21 04:15:56 +00:00
parent c3a88d0c48
commit 6022f207ab
2 changed files with 186 additions and 152 deletions

View File

@ -47,9 +47,11 @@ public class ZlibDecoder extends OneToOneDecoder {
* @throws ZStreamException if failed to initialize zlib
*/
public ZlibDecoder() throws ZStreamException {
int resultCode = z.inflateInit();
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
synchronized (z) {
int resultCode = z.inflateInit();
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
}
}
}
@ -63,14 +65,16 @@ public class ZlibDecoder extends OneToOneDecoder {
throw new NullPointerException("dictionary");
}
int resultCode;
resultCode = z.inflateInit();
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
} else {
resultCode = z.inflateSetDictionary(dictionary, dictionary.length);
synchronized (z) {
int resultCode;
resultCode = z.inflateInit();
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
ZlibUtil.fail(z, "initialization failure", resultCode);
} else {
resultCode = z.inflateSetDictionary(dictionary, dictionary.length);
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
}
}
}
}
@ -89,53 +93,58 @@ public class ZlibDecoder extends OneToOneDecoder {
return msg;
}
try {
// Configure input.
ChannelBuffer compressed = (ChannelBuffer) msg;
byte[] in = new byte[compressed.readableBytes()];
compressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
synchronized (z) {
try {
// Configure input.
ChannelBuffer compressed = (ChannelBuffer) msg;
byte[] in = new byte[compressed.readableBytes()];
compressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
// Configure output.
byte[] out = new byte[in.length << 1];
ChannelBuffer decompressed = ChannelBuffers.dynamicBuffer(
compressed.order(), out.length,
ctx.getChannel().getConfig().getBufferFactory());
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
// Configure output.
byte[] out = new byte[in.length << 1];
ChannelBuffer decompressed = ChannelBuffers.dynamicBuffer(
compressed.order(), out.length,
ctx.getChannel().getConfig().getBufferFactory());
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
do {
// Decompress 'in' into 'out'
int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH);
switch (resultCode) {
case JZlib.Z_STREAM_END:
finished = true; // Do not decode anymore.
case JZlib.Z_OK:
case JZlib.Z_BUF_ERROR:
decompressed.writeBytes(out, 0, z.next_out_index);
z.next_out_index = 0;
z.avail_out = out.length;
break;
default:
ZlibUtil.fail(z, "decompression failure", resultCode);
do {
// Decompress 'in' into 'out'
int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH);
switch (resultCode) {
case JZlib.Z_STREAM_END:
case JZlib.Z_OK:
case JZlib.Z_BUF_ERROR:
decompressed.writeBytes(out, 0, z.next_out_index);
z.next_out_index = 0;
z.avail_out = out.length;
if (resultCode == JZlib.Z_STREAM_END) {
finished = true; // Do not decode anymore.
z.inflateEnd();
}
break;
default:
ZlibUtil.fail(z, "decompression failure", resultCode);
}
} while (z.avail_in > 0);
if (decompressed.writerIndex() != 0) { // readerIndex is always 0
return decompressed;
} else {
return ChannelBuffers.EMPTY_BUFFER;
}
} while (z.avail_in > 0);
if (decompressed.writerIndex() != 0) { // readerIndex is always 0
return decompressed;
} else {
return ChannelBuffers.EMPTY_BUFFER;
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
}
}

View File

@ -43,6 +43,8 @@ import com.jcraft.jzlib.ZStreamException;
@ChannelPipelineCoverage("one")
public class ZlibEncoder extends OneToOneEncoder {
private static final byte[] EMPTY_ARRAY = new byte[0];
private final ZStream z = new ZStream();
private final AtomicBoolean finished = new AtomicBoolean();
@ -50,8 +52,6 @@ public class ZlibEncoder extends OneToOneEncoder {
// TODO support three wrappers - zlib (default), gzip (unsupported by jzlib, but easy to implement), nowrap
// TODO Disallow preset dictionary for gzip
// TODO add close() method
// FIXME thread safety
/**
* Creates a new zlib encoder with the default compression level
@ -77,9 +77,11 @@ public class ZlibEncoder extends OneToOneEncoder {
* @throws ZStreamException if failed to initialize zlib
*/
public ZlibEncoder(int compressionLevel) throws ZStreamException {
int resultCode = z.deflateInit(compressionLevel, false); // Default: ZLIB format
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
synchronized (z) {
int resultCode = z.deflateInit(compressionLevel, false); // Default: ZLIB format
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
}
}
}
@ -116,59 +118,74 @@ public class ZlibEncoder extends OneToOneEncoder {
throw new NullPointerException("dictionary");
}
int resultCode;
resultCode = z.deflateInit(compressionLevel, false); // Default: ZLIB format
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
} else {
resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
if (resultCode != JZlib.Z_OK){
ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
synchronized (z) {
int resultCode;
resultCode = z.deflateInit(compressionLevel, false); // Default: ZLIB format
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode);
} else {
resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
if (resultCode != JZlib.Z_OK){
ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
}
}
}
}
public ChannelFuture close(Channel channel) {
return finishEncode(channel.getPipeline().getContext(this), null);
}
public boolean isClosed() {
return finished.get();
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
if (!(msg instanceof ChannelBuffer) || finished.get()) {
return msg;
}
try {
// Configure input.
ChannelBuffer uncompressed = (ChannelBuffer) msg;
byte[] in = new byte[uncompressed.readableBytes()];
uncompressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
ChannelBuffer result;
synchronized (z) {
try {
// Configure input.
ChannelBuffer uncompressed = (ChannelBuffer) msg;
byte[] in = new byte[uncompressed.readableBytes()];
uncompressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
// Configure output.
byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
// Configure output.
byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
// Note that Z_PARTIAL_FLUSH has been deprecated.
int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "compression failure", resultCode);
// Note that Z_PARTIAL_FLUSH has been deprecated.
int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "compression failure", resultCode);
}
if (z.next_out_index != 0) {
result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
uncompressed.order(), out, 0, z.next_out_index);
} else {
result = ChannelBuffers.EMPTY_BUFFER;
}
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
if (z.next_out_index != 0) {
return ctx.getChannel().getConfig().getBufferFactory().getBuffer(
uncompressed.order(), out, 0, z.next_out_index);
} else {
return ChannelBuffers.EMPTY_BUFFER;
}
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
return result;
}
@Override
@ -192,62 +209,70 @@ public class ZlibEncoder extends OneToOneEncoder {
private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
if (!finished.compareAndSet(false, true)) {
return Channels.failedFuture(
ctx.getChannel(),
new ZStreamException("zlib stream closed already"));
}
try {
// Configure input.
z.next_in = new byte[0];
z.next_in_index = 0;
z.avail_in = 0;
// Configure output.
byte[] out = new byte[8]; // Minimum room for ADLER32 + ZLIB header
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
ChannelFuture future;
// Write the ADLER32 checksum.
int resultCode = z.deflate(JZlib.Z_FINISH);
if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
future = Channels.failedFuture(
ctx.getChannel(),
ZlibUtil.exception(z, "compression failure", resultCode));
} else if (z.next_out_index != 0) {
future = Channels.future(ctx.getChannel());
Channels.write(
ctx, future,
ctx.getChannel().getConfig().getBufferFactory().getBuffer(
out, 0, z.next_out_index));
} else {
// Note that we don't return a SucceededChannelFuture
// just in case any downstream handler or a sink wants to
// notify a write error.
future = Channels.future(ctx.getChannel());
Channels.write(ctx, future, ChannelBuffers.EMPTY_BUFFER);
}
if (evt != null) {
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
ctx.sendDownstream(evt);
}
});
ctx.sendDownstream(evt);
}
return future;
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
return Channels.succeededFuture(evt.getChannel());
}
ChannelBuffer footer;
ChannelFuture future;
synchronized (z) {
try {
// Configure input.
z.next_in = EMPTY_ARRAY;
z.next_in_index = 0;
z.avail_in = 0;
// Configure output.
byte[] out = new byte[8]; // Minimum room for ADLER32 + ZLIB header
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
// Write the ADLER32 checksum (stream footer).
int resultCode = z.deflate(JZlib.Z_FINISH);
if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
future = Channels.failedFuture(
ctx.getChannel(),
ZlibUtil.exception(z, "compression failure", resultCode));
footer = null;
} else if (z.next_out_index != 0) {
future = Channels.future(ctx.getChannel());
footer =
ctx.getChannel().getConfig().getBufferFactory().getBuffer(
out, 0, z.next_out_index);
} else {
// Note that we should never use a SucceededChannelFuture
// here just in case any downstream handler or a sink wants
// to notify a write error.
future = Channels.future(ctx.getChannel());
footer = ChannelBuffers.EMPTY_BUFFER;
}
} finally {
z.deflateEnd();
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
}
if (footer != null) {
Channels.write(ctx, future, footer);
}
if (evt != null) {
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
ctx.sendDownstream(evt);
}
});
}
return future;
}
}