From 4b1721af17ec1a4e52d96e83d1fbfaa851b1de0b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 6 May 2012 21:50:15 +0200 Subject: [PATCH] Fix regression in ChunkedWriteHandler. See #310 --- .../handler/stream/ChunkedWriteHandler.java | 29 +++-- .../stream/ChunkedWriteHandlerTest.java | 122 ++++++++++++++++++ 2 files changed, 137 insertions(+), 14 deletions(-) create mode 100644 handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 017b996c5f..3329fd0578 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -114,12 +114,11 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns assert offered; final Channel channel = ctx.getChannel(); - if (channel.isWritable()) { + // call flush if the channel is writable or not connected. flush(..) will take care of the rest + + if (channel.isWritable() || !channel.isConnected()) { this.ctx = ctx; flush(ctx, false); - } else if (!channel.isConnected()) { - this.ctx = ctx; - discard(ctx, false); } } @@ -146,7 +145,6 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns private void discard(ChannelHandlerContext ctx, boolean fireNow) { ClosedChannelException cause = null; - boolean fireExceptionCaught = false; for (;;) { MessageEvent currentEvent = this.currentEvent; @@ -172,15 +170,16 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns cause = new ClosedChannelException(); } currentEvent.getFuture().setFailure(cause); - fireExceptionCaught = true; + + currentEvent = null; } - if (fireExceptionCaught) { + if (cause != null) { if (fireNow) { - fireExceptionCaught(ctx, cause); + Channels.fireExceptionCaught(ctx.getChannel(), cause); } else { - fireExceptionCaughtLater(ctx, cause); + Channels.fireExceptionCaughtLater(ctx.getChannel(), cause); } } } @@ -195,6 +194,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns if (!channel.isConnected()) { discard(ctx, fireNow); + return; } while (channel.isWritable()) { @@ -244,8 +244,8 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns if (suspend) { // ChunkedInput.nextChunk() returned null and it has - // not reached at the end of input. Let's wait until - // more chunks arrive. Nothing to write or notify. + // not reached at the end of input. Let's wait until + // more chunks arrive. Nothing to write or notify. break; } else { ChannelFuture writeFuture; @@ -253,7 +253,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns this.currentEvent = null; writeFuture = currentEvent.getFuture(); - // Register a listener which will close the input once the write is complete. This is needed because the Chunk may have + // Register a listener which will close the input once the write is complete. This is needed because the Chunk may have // some resource bound that can not be closed before its not written // // See https://github.com/netty/netty/issues/303 @@ -287,7 +287,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns if (!channel.isConnected()) { discard(ctx, fireNow); - break; + return; } } } finally { @@ -297,11 +297,12 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } - if (acquired && !channel.isConnected() || (channel.isWritable() && !queue.isEmpty())) { + if (acquired && (!channel.isConnected() || (channel.isWritable() && !queue.isEmpty()))) { flush(ctx, fireNow); } } + static void closeInput(ChunkedInput chunks) { try { chunks.close(); diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java new file mode 100644 index 0000000000..b9e66b8864 --- /dev/null +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */package io.netty.handler.stream; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; + +import junit.framework.Assert; + +import io.netty.buffer.ChannelBuffer; +import io.netty.handler.codec.embedder.EncoderEmbedder; +import io.netty.handler.stream.ChunkedFile; +import io.netty.handler.stream.ChunkedInput; +import io.netty.handler.stream.ChunkedNioFile; +import io.netty.handler.stream.ChunkedNioStream; +import io.netty.handler.stream.ChunkedStream; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.junit.Test; + +public class ChunkedWriteHandlerTest { + private static final byte[] BYTES = new byte[1024 * 64]; + private static final File TMP; + + static { + for (int i = 0; i < BYTES.length; i++) { + BYTES[i] = (byte) i; + } + + FileOutputStream out = null; + try { + TMP = File.createTempFile("netty-chunk-", ".tmp"); + TMP.deleteOnExit(); + out = new FileOutputStream(TMP); + out.write(BYTES); + out.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + // ignore + } + } + } + } + + // See #310 + @Test + public void testChunkedStream() { + check(new ChunkedStream(new ByteArrayInputStream(BYTES))); + + check(new ChunkedStream(new ByteArrayInputStream(BYTES)), new ChunkedStream(new ByteArrayInputStream(BYTES)), new ChunkedStream(new ByteArrayInputStream(BYTES))); + + } + + @Test + public void testChunkedNioStream() { + check(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES)))); + + check(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))), new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))), new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES)))); + + } + + + @Test + public void testChunkedFile() throws IOException { + check(new ChunkedFile(TMP)); + + check(new ChunkedFile(TMP), new ChunkedFile(TMP), new ChunkedFile(TMP)); + } + + @Test + public void testChunkedNioFile() throws IOException { + check(new ChunkedNioFile(TMP)); + + check(new ChunkedNioFile(TMP), new ChunkedNioFile(TMP), new ChunkedNioFile(TMP)); + } + + private void check(ChunkedInput... inputs) { + EncoderEmbedder embedder = new EncoderEmbedder(new ChunkedWriteHandler()); + for (ChunkedInput input: inputs) { + embedder.offer(input); + } + + Assert.assertTrue(embedder.finish()); + + int i = 0; + int read = 0; + for (;;) { + ChannelBuffer buffer = embedder.poll(); + if (buffer == null) { + break; + } + while (buffer.readable()) { + Assert.assertEquals(BYTES[i++], buffer.readByte()); + read++; + if (i == BYTES.length) { + i = 0; + } + } + } + + Assert.assertEquals(BYTES.length * inputs.length, read); + } +}