diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index c451875b4c..8decd9b7d9 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -23,10 +23,11 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelPromiseNotifier; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.EmptyArrays; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** @@ -35,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class JZlibEncoder extends ZlibEncoder { private final Deflater z = new Deflater(); - private final AtomicBoolean finished = new AtomicBoolean(); + private volatile boolean finished; private volatile ChannelHandlerContext ctx; /** @@ -138,13 +139,11 @@ public class JZlibEncoder extends ZlibEncoder { "allowed for compression."); } - synchronized (z) { - int resultCode = z.init( - compressionLevel, windowBits, memLevel, - ZlibUtil.convertWrapperType(wrapper)); - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "initialization failure", resultCode); - } + int resultCode = z.init( + compressionLevel, windowBits, memLevel, + ZlibUtil.convertWrapperType(wrapper)); + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "initialization failure", resultCode); } } @@ -222,19 +221,16 @@ public class JZlibEncoder extends ZlibEncoder { if (dictionary == null) { throw new NullPointerException("dictionary"); } - - synchronized (z) { - int resultCode; - resultCode = z.deflateInit( - compressionLevel, windowBits, memLevel, - JZlib.W_ZLIB); // Default: ZLIB format + int resultCode; + resultCode = z.deflateInit( + compressionLevel, windowBits, memLevel, + JZlib.W_ZLIB); // Default: ZLIB format + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "initialization failure", resultCode); + } else { + resultCode = z.deflateSetDictionary(dictionary, dictionary.length); if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "initialization failure", resultCode); - } else { - resultCode = z.deflateSetDictionary(dictionary, dictionary.length); - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "failed to set the dictionary", resultCode); - } + ZlibUtil.fail(z, "failed to set the dictionary", resultCode); } } } @@ -245,8 +241,22 @@ public class JZlibEncoder extends ZlibEncoder { } @Override - public ChannelFuture close(ChannelPromise promise) { - return finishEncode(ctx(), promise); + public ChannelFuture close(final ChannelPromise promise) { + ChannelHandlerContext ctx = ctx(); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + return finishEncode(ctx, promise); + } else { + final ChannelPromise p = ctx.newPromise(); + executor.execute(new Runnable() { + @Override + public void run() { + ChannelFuture f = finishEncode(ctx(), p); + f.addListener(new ChannelPromiseNotifier(promise)); + } + }); + return p; + } } private ChannelHandlerContext ctx() { @@ -259,71 +269,69 @@ public class JZlibEncoder extends ZlibEncoder { @Override public boolean isClosed() { - return finished.get(); + return finished; } @Override protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { - if (finished.get()) { + if (finished) { return; } - synchronized (z) { - try { - // Configure input. - int inputLength = in.readableBytes(); - boolean inHasArray = in.hasArray(); - z.avail_in = inputLength; - if (inHasArray) { - z.next_in = in.array(); - z.next_in_index = in.arrayOffset() + in.readerIndex(); - } else { - byte[] array = new byte[inputLength]; - in.getBytes(in.readerIndex(), array); - z.next_in = array; - z.next_in_index = 0; - } - int oldNextInIndex = z.next_in_index; - - // Configure output. - int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12; - out.ensureWritable(maxOutputLength); - z.avail_out = maxOutputLength; - z.next_out = out.array(); - z.next_out_index = out.arrayOffset() + out.writerIndex(); - int oldNextOutIndex = z.next_out_index; - - // Note that Z_PARTIAL_FLUSH has been deprecated. - int resultCode; - try { - resultCode = z.deflate(JZlib.Z_SYNC_FLUSH); - } finally { - in.skipBytes(z.next_in_index - oldNextInIndex); - } - - if (resultCode != JZlib.Z_OK) { - ZlibUtil.fail(z, "compression failure", resultCode); - } - - int outputLength = z.next_out_index - oldNextOutIndex; - if (outputLength > 0) { - out.writerIndex(out.writerIndex() + outputLength); - } - } finally { - // Deference the external references explicitly to tell the VM that - // the allocated byte arrays are temporary so that the call stack - // can be utilized. - // I'm not sure if the modern VMs do this optimization though. - z.next_in = null; - z.next_out = null; + try { + // Configure input. + int inputLength = in.readableBytes(); + boolean inHasArray = in.hasArray(); + z.avail_in = inputLength; + if (inHasArray) { + z.next_in = in.array(); + z.next_in_index = in.arrayOffset() + in.readerIndex(); + } else { + byte[] array = new byte[inputLength]; + in.getBytes(in.readerIndex(), array); + z.next_in = array; + z.next_in_index = 0; } + int oldNextInIndex = z.next_in_index; + + // Configure output. + int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12; + out.ensureWritable(maxOutputLength); + z.avail_out = maxOutputLength; + z.next_out = out.array(); + z.next_out_index = out.arrayOffset() + out.writerIndex(); + int oldNextOutIndex = z.next_out_index; + + // Note that Z_PARTIAL_FLUSH has been deprecated. + int resultCode; + try { + resultCode = z.deflate(JZlib.Z_SYNC_FLUSH); + } finally { + in.skipBytes(z.next_in_index - oldNextInIndex); + } + + if (resultCode != JZlib.Z_OK) { + ZlibUtil.fail(z, "compression failure", resultCode); + } + + int outputLength = z.next_out_index - oldNextOutIndex; + if (outputLength > 0) { + out.writerIndex(out.writerIndex() + outputLength); + } + } finally { + // Deference the external references explicitly to tell the VM that + // the allocated byte arrays are temporary so that the call stack + // can be utilized. + // I'm not sure if the modern VMs do this optimization though. + z.next_in = null; + z.next_out = null; } } @Override public void close( final ChannelHandlerContext ctx, - final ChannelPromise promise) throws Exception { + final ChannelPromise promise) { ChannelFuture f = finishEncode(ctx, ctx.newPromise()); f.addListener(new ChannelFutureListener() { @Override @@ -344,47 +352,45 @@ public class JZlibEncoder extends ZlibEncoder { } private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) { - if (!finished.compareAndSet(false, true)) { + if (finished) { promise.setSuccess(); return promise; } + finished = true; ByteBuf footer; - synchronized (z) { - try { - // Configure input. - z.next_in = EmptyArrays.EMPTY_BYTES; - z.next_in_index = 0; - z.avail_in = 0; + try { + // Configure input. + z.next_in = EmptyArrays.EMPTY_BYTES; + z.next_in_index = 0; + z.avail_in = 0; - // Configure output. - byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header - z.next_out = out; - z.next_out_index = 0; - z.avail_out = out.length; + // Configure output. + byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header + z.next_out = out; + z.next_out_index = 0; + z.avail_out = out.length; - // Write the ADLER32 checksum (stream footer). - int resultCode = z.deflate(JZlib.Z_FINISH); - if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) { - promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode)); - return promise; - } else if (z.next_out_index != 0) { - footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index); - } else { - footer = Unpooled.EMPTY_BUFFER; - } - } finally { - z.deflateEnd(); - - // Deference the external references explicitly to tell the VM that - // the allocated byte arrays are temporary so that the call stack - // can be utilized. - // I'm not sure if the modern VMs do this optimization though. - z.next_in = null; - z.next_out = null; + // Write the ADLER32 checksum (stream footer). + int resultCode = z.deflate(JZlib.Z_FINISH); + if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) { + promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode)); + return promise; + } else if (z.next_out_index != 0) { + footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index); + } else { + footer = Unpooled.EMPTY_BUFFER; } - } + } finally { + z.deflateEnd(); + // Deference the external references explicitly to tell the VM that + // the allocated byte arrays are temporary so that the call stack + // can be utilized. + // I'm not sure if the modern VMs do this optimization though. + z.next_in = null; + z.next_out = null; + } return ctx.writeAndFlush(footer, promise); } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index 36b26b038d..518b06bbde 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -16,14 +16,14 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelPromiseNotifier; +import io.netty.util.concurrent.EventExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.CRC32; import java.util.zip.Deflater; @@ -35,7 +35,7 @@ public class JdkZlibEncoder extends ZlibEncoder { private final byte[] encodeBuf = new byte[8192]; private final Deflater deflater; - private final AtomicBoolean finished = new AtomicBoolean(); + private volatile boolean finished; private volatile ChannelHandlerContext ctx; /* @@ -158,8 +158,22 @@ public class JdkZlibEncoder extends ZlibEncoder { } @Override - public ChannelFuture close(ChannelPromise future) { - return finishEncode(ctx(), future); + public ChannelFuture close(final ChannelPromise promise) { + ChannelHandlerContext ctx = ctx(); + EventExecutor executor = ctx.executor(); + if (executor.inEventLoop()) { + return finishEncode(ctx, promise); + } else { + final ChannelPromise p = ctx.newPromise(); + executor.execute(new Runnable() { + @Override + public void run() { + ChannelFuture f = finishEncode(ctx(), p); + f.addListener(new ChannelPromiseNotifier(promise)); + } + }); + return p; + } } private ChannelHandlerContext ctx() { @@ -172,12 +186,12 @@ public class JdkZlibEncoder extends ZlibEncoder { @Override public boolean isClosed() { - return finished.get(); + return finished; } @Override protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { - if (finished.get()) { + if (finished) { out.writeBytes(in); return; } @@ -189,20 +203,18 @@ public class JdkZlibEncoder extends ZlibEncoder { int sizeEstimate = (int) Math.ceil(inAry.length * 1.001) + 12; out.ensureWritable(sizeEstimate); - synchronized (deflater) { - if (gzip) { - crc.update(inAry); - if (writeHeader) { - out.writeBytes(gzipHeader); - writeHeader = false; - } + if (gzip) { + crc.update(inAry); + if (writeHeader) { + out.writeBytes(gzipHeader); + writeHeader = false; } + } - deflater.setInput(inAry); - while (!deflater.needsInput()) { - int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH); - out.writeBytes(encodeBuf, 0, numBytes); - } + deflater.setInput(inAry); + while (!deflater.needsInput()) { + int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length, Deflater.SYNC_FLUSH); + out.writeBytes(encodeBuf, 0, numBytes); } } @@ -228,33 +240,31 @@ public class JdkZlibEncoder extends ZlibEncoder { } private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) { - if (!finished.compareAndSet(false, true)) { + if (finished) { promise.setSuccess(); return promise; } + finished = true; ByteBuf footer = ctx.alloc().buffer(); - synchronized (deflater) { - deflater.finish(); - while (!deflater.finished()) { - int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length); - footer.writeBytes(encodeBuf, 0, numBytes); - } - if (gzip) { - int crcValue = (int) crc.getValue(); - int uncBytes = deflater.getTotalIn(); - footer.writeByte(crcValue); - footer.writeByte(crcValue >>> 8); - footer.writeByte(crcValue >>> 16); - footer.writeByte(crcValue >>> 24); - footer.writeByte(uncBytes); - footer.writeByte(uncBytes >>> 8); - footer.writeByte(uncBytes >>> 16); - footer.writeByte(uncBytes >>> 24); - } - deflater.end(); + deflater.finish(); + while (!deflater.finished()) { + int numBytes = deflater.deflate(encodeBuf, 0, encodeBuf.length); + footer.writeBytes(encodeBuf, 0, numBytes); } - + if (gzip) { + int crcValue = (int) crc.getValue(); + int uncBytes = deflater.getTotalIn(); + footer.writeByte(crcValue); + footer.writeByte(crcValue >>> 8); + footer.writeByte(crcValue >>> 16); + footer.writeByte(crcValue >>> 24); + footer.writeByte(uncBytes); + footer.writeByte(uncBytes >>> 8); + footer.writeByte(uncBytes >>> 16); + footer.writeByte(uncBytes >>> 24); + } + deflater.end(); return ctx.writeAndFlush(footer, promise); }