[#1668] Remove synchronized usage in JZlibEncoder and JdkZlibEncoder

This commit is contained in:
Norman Maurer 2013-07-29 07:08:49 +02:00
parent 3922c518cd
commit 6ce8571df3
2 changed files with 162 additions and 146 deletions

View File

@ -23,10 +23,11 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -35,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class JZlibEncoder extends ZlibEncoder { public class JZlibEncoder extends ZlibEncoder {
private final Deflater z = new Deflater(); private final Deflater z = new Deflater();
private final AtomicBoolean finished = new AtomicBoolean(); private volatile boolean finished;
private volatile ChannelHandlerContext ctx; private volatile ChannelHandlerContext ctx;
/** /**
@ -138,13 +139,11 @@ public class JZlibEncoder extends ZlibEncoder {
"allowed for compression."); "allowed for compression.");
} }
synchronized (z) { int resultCode = z.init(
int resultCode = z.init( compressionLevel, windowBits, memLevel,
compressionLevel, windowBits, memLevel, ZlibUtil.convertWrapperType(wrapper));
ZlibUtil.convertWrapperType(wrapper)); if (resultCode != JZlib.Z_OK) {
if (resultCode != JZlib.Z_OK) { ZlibUtil.fail(z, "initialization failure", resultCode);
ZlibUtil.fail(z, "initialization failure", resultCode);
}
} }
} }
@ -222,19 +221,16 @@ public class JZlibEncoder extends ZlibEncoder {
if (dictionary == null) { if (dictionary == null) {
throw new NullPointerException("dictionary"); throw new NullPointerException("dictionary");
} }
int resultCode;
synchronized (z) { resultCode = z.deflateInit(
int resultCode; compressionLevel, windowBits, memLevel,
resultCode = z.deflateInit( JZlib.W_ZLIB); // Default: ZLIB format
compressionLevel, windowBits, memLevel, if (resultCode != JZlib.Z_OK) {
JZlib.W_ZLIB); // Default: ZLIB format ZlibUtil.fail(z, "initialization failure", resultCode);
} else {
resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
if (resultCode != JZlib.Z_OK) { if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "initialization failure", resultCode); ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
} else {
resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
}
} }
} }
} }
@ -245,8 +241,22 @@ public class JZlibEncoder extends ZlibEncoder {
} }
@Override @Override
public ChannelFuture close(ChannelPromise promise) { public ChannelFuture close(final ChannelPromise promise) {
return finishEncode(ctx(), promise); ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
} }
private ChannelHandlerContext ctx() { private ChannelHandlerContext ctx() {
@ -259,71 +269,69 @@ public class JZlibEncoder extends ZlibEncoder {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return finished.get(); return finished;
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (finished.get()) { if (finished) {
return; return;
} }
synchronized (z) { try {
try { // Configure input.
// Configure input. int inputLength = in.readableBytes();
int inputLength = in.readableBytes(); boolean inHasArray = in.hasArray();
boolean inHasArray = in.hasArray(); z.avail_in = inputLength;
z.avail_in = inputLength; if (inHasArray) {
if (inHasArray) { z.next_in = in.array();
z.next_in = in.array(); z.next_in_index = in.arrayOffset() + in.readerIndex();
z.next_in_index = in.arrayOffset() + in.readerIndex(); } else {
} else { byte[] array = new byte[inputLength];
byte[] array = new byte[inputLength]; in.getBytes(in.readerIndex(), array);
in.getBytes(in.readerIndex(), array); z.next_in = array;
z.next_in = array; z.next_in_index = 0;
z.next_in_index = 0;
}
int oldNextInIndex = z.next_in_index;
// Configure output.
int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12;
out.ensureWritable(maxOutputLength);
z.avail_out = maxOutputLength;
z.next_out = out.array();
z.next_out_index = out.arrayOffset() + out.writerIndex();
int oldNextOutIndex = z.next_out_index;
// Note that Z_PARTIAL_FLUSH has been deprecated.
int resultCode;
try {
resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
} finally {
in.skipBytes(z.next_in_index - oldNextInIndex);
}
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "compression failure", resultCode);
}
int outputLength = z.next_out_index - oldNextOutIndex;
if (outputLength > 0) {
out.writerIndex(out.writerIndex() + outputLength);
}
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
} }
int oldNextInIndex = z.next_in_index;
// Configure output.
int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12;
out.ensureWritable(maxOutputLength);
z.avail_out = maxOutputLength;
z.next_out = out.array();
z.next_out_index = out.arrayOffset() + out.writerIndex();
int oldNextOutIndex = z.next_out_index;
// Note that Z_PARTIAL_FLUSH has been deprecated.
int resultCode;
try {
resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
} finally {
in.skipBytes(z.next_in_index - oldNextInIndex);
}
if (resultCode != JZlib.Z_OK) {
ZlibUtil.fail(z, "compression failure", resultCode);
}
int outputLength = z.next_out_index - oldNextOutIndex;
if (outputLength > 0) {
out.writerIndex(out.writerIndex() + outputLength);
}
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
} }
} }
@Override @Override
public void close( public void close(
final ChannelHandlerContext ctx, final ChannelHandlerContext ctx,
final ChannelPromise promise) throws Exception { final ChannelPromise promise) {
ChannelFuture f = finishEncode(ctx, ctx.newPromise()); ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener(new ChannelFutureListener() { f.addListener(new ChannelFutureListener() {
@Override @Override
@ -344,47 +352,45 @@ public class JZlibEncoder extends ZlibEncoder {
} }
private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) { private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
if (!finished.compareAndSet(false, true)) { if (finished) {
promise.setSuccess(); promise.setSuccess();
return promise; return promise;
} }
finished = true;
ByteBuf footer; ByteBuf footer;
synchronized (z) { try {
try { // Configure input.
// Configure input. z.next_in = EmptyArrays.EMPTY_BYTES;
z.next_in = EmptyArrays.EMPTY_BYTES; z.next_in_index = 0;
z.next_in_index = 0; z.avail_in = 0;
z.avail_in = 0;
// Configure output. // Configure output.
byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
z.next_out = out; z.next_out = out;
z.next_out_index = 0; z.next_out_index = 0;
z.avail_out = out.length; z.avail_out = out.length;
// Write the ADLER32 checksum (stream footer). // Write the ADLER32 checksum (stream footer).
int resultCode = z.deflate(JZlib.Z_FINISH); int resultCode = z.deflate(JZlib.Z_FINISH);
if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) { if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode)); promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
return promise; return promise;
} else if (z.next_out_index != 0) { } else if (z.next_out_index != 0) {
footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index); footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
} else { } else {
footer = Unpooled.EMPTY_BUFFER; footer = Unpooled.EMPTY_BUFFER;
}
} finally {
z.deflateEnd();
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
} }
} } finally {
z.deflateEnd();
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
return ctx.writeAndFlush(footer, promise); return ctx.writeAndFlush(footer, promise);
} }

View File

@ -16,14 +16,14 @@
package io.netty.handler.codec.compression; package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.Deflater; import java.util.zip.Deflater;
@ -35,7 +35,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
private final byte[] encodeBuf = new byte[8192]; private final byte[] encodeBuf = new byte[8192];
private final Deflater deflater; private final Deflater deflater;
private final AtomicBoolean finished = new AtomicBoolean(); private volatile boolean finished;
private volatile ChannelHandlerContext ctx; private volatile ChannelHandlerContext ctx;
/* /*
@ -158,8 +158,22 @@ public class JdkZlibEncoder extends ZlibEncoder {
} }
@Override @Override
public ChannelFuture close(ChannelPromise future) { public ChannelFuture close(final ChannelPromise promise) {
return finishEncode(ctx(), future); ChannelHandlerContext ctx = ctx();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
return finishEncode(ctx, promise);
} else {
final ChannelPromise p = ctx.newPromise();
executor.execute(new Runnable() {
@Override
public void run() {
ChannelFuture f = finishEncode(ctx(), p);
f.addListener(new ChannelPromiseNotifier(promise));
}
});
return p;
}
} }
private ChannelHandlerContext ctx() { private ChannelHandlerContext ctx() {
@ -172,12 +186,12 @@ public class JdkZlibEncoder extends ZlibEncoder {
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return finished.get(); return finished;
} }
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (finished.get()) { if (finished) {
out.writeBytes(in); out.writeBytes(in);
return; return;
} }
@ -189,20 +203,18 @@ public class JdkZlibEncoder extends ZlibEncoder {
int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12; int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12;
out.ensureWritable(sizeEstimate); out.ensureWritable(sizeEstimate);
synchronized (deflater) { if (gzip) {
if (gzip) { crc.update(inAry);
crc.update(inAry); if (writeHeader) {
if (writeHeader) { out.writeBytes(gzipHeader);
out.writeBytes(gzipHeader); writeHeader = false;
writeHeader = false;
}
} }
}
deflater.setInput(inAry); deflater.setInput(inAry);
while (!deflater.needsInput()) { while (!deflater.needsInput()) {
int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH); int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH);
out.writeBytes(encodeBuf, 0, numBytes); out.writeBytes(encodeBuf, 0, numBytes);
}
} }
} }
@ -228,33 +240,31 @@ public class JdkZlibEncoder extends ZlibEncoder {
} }
private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) { private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
if (!finished.compareAndSet(false, true)) { if (finished) {
promise.setSuccess(); promise.setSuccess();
return promise; return promise;
} }
finished = true;
ByteBuf footer = ctx.alloc().buffer(); ByteBuf footer = ctx.alloc().buffer();
synchronized (deflater) { deflater.finish();
deflater.finish(); while (!deflater.finished()) {
while (!deflater.finished()) { int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length);
int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length); footer.writeBytes(encodeBuf, 0, numBytes);
footer.writeBytes(encodeBuf, 0, numBytes);
}
if (gzip) {
int crcValue = (int) crc.getValue();
int uncBytes = deflater.getTotalIn();
footer.writeByte(crcValue);
footer.writeByte(crcValue >>> 8);
footer.writeByte(crcValue >>> 16);
footer.writeByte(crcValue >>> 24);
footer.writeByte(uncBytes);
footer.writeByte(uncBytes >>> 8);
footer.writeByte(uncBytes >>> 16);
footer.writeByte(uncBytes >>> 24);
}
deflater.end();
} }
if (gzip) {
int crcValue = (int) crc.getValue();
int uncBytes = deflater.getTotalIn();
footer.writeByte(crcValue);
footer.writeByte(crcValue >>> 8);
footer.writeByte(crcValue >>> 16);
footer.writeByte(crcValue >>> 24);
footer.writeByte(uncBytes);
footer.writeByte(uncBytes >>> 8);
footer.writeByte(uncBytes >>> 16);
footer.writeByte(uncBytes >>> 24);
}
deflater.end();
return ctx.writeAndFlush(footer, promise); return ctx.writeAndFlush(footer, promise);
} }