Fix regression in ChunkedWriteHandler. See #310

This commit is contained in:
Norman Maurer 2012-05-06 21:50:15 +02:00
parent c24eafed48
commit 4b1721af17
2 changed files with 137 additions and 14 deletions

View File

@ -114,12 +114,11 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
assert offered; assert offered;
final Channel channel = ctx.getChannel(); final Channel channel = ctx.getChannel();
if (channel.isWritable()) { // call flush if the channel is writable or not connected. flush(..) will take care of the rest
if (channel.isWritable() || !channel.isConnected()) {
this.ctx = ctx; this.ctx = ctx;
flush(ctx, false); flush(ctx, false);
} else if (!channel.isConnected()) {
this.ctx = ctx;
discard(ctx, false);
} }
} }
@ -146,7 +145,6 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
private void discard(ChannelHandlerContext ctx, boolean fireNow) { private void discard(ChannelHandlerContext ctx, boolean fireNow) {
ClosedChannelException cause = null; ClosedChannelException cause = null;
boolean fireExceptionCaught = false;
for (;;) { for (;;) {
MessageEvent currentEvent = this.currentEvent; MessageEvent currentEvent = this.currentEvent;
@ -172,15 +170,16 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
cause = new ClosedChannelException(); cause = new ClosedChannelException();
} }
currentEvent.getFuture().setFailure(cause); currentEvent.getFuture().setFailure(cause);
fireExceptionCaught = true;
currentEvent = null;
} }
if (fireExceptionCaught) { if (cause != null) {
if (fireNow) { if (fireNow) {
fireExceptionCaught(ctx, cause); Channels.fireExceptionCaught(ctx.getChannel(), cause);
} else { } else {
fireExceptionCaughtLater(ctx, cause); Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
} }
} }
} }
@ -195,6 +194,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
if (!channel.isConnected()) { if (!channel.isConnected()) {
discard(ctx, fireNow); discard(ctx, fireNow);
return;
} }
while (channel.isWritable()) { while (channel.isWritable()) {
@ -244,8 +244,8 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
if (suspend) { if (suspend) {
// ChunkedInput.nextChunk() returned null and it has // ChunkedInput.nextChunk() returned null and it has
// not reached at the end of input. Let's wait until // not reached at the end of input. Let's wait until
// more chunks arrive. Nothing to write or notify. // more chunks arrive. Nothing to write or notify.
break; break;
} else { } else {
ChannelFuture writeFuture; ChannelFuture writeFuture;
@ -287,7 +287,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
if (!channel.isConnected()) { if (!channel.isConnected()) {
discard(ctx, fireNow); discard(ctx, fireNow);
break; return;
} }
} }
} finally { } finally {
@ -297,11 +297,12 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
} }
if (acquired && !channel.isConnected() || (channel.isWritable() && !queue.isEmpty())) { if (acquired && (!channel.isConnected() || (channel.isWritable() && !queue.isEmpty()))) {
flush(ctx, fireNow); flush(ctx, fireNow);
} }
} }
static void closeInput(ChunkedInput chunks) { static void closeInput(ChunkedInput chunks) {
try { try {
chunks.close(); chunks.close();

View File

@ -0,0 +1,122 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/package io.netty.handler.stream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import junit.framework.Assert;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedInput;
import io.netty.handler.stream.ChunkedNioFile;
import io.netty.handler.stream.ChunkedNioStream;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.junit.Test;
public class ChunkedWriteHandlerTest {
private static final byte[] BYTES = new byte[1024 * 64];
private static final File TMP;
static {
for (int i = 0; i < BYTES.length; i++) {
BYTES[i] = (byte) i;
}
FileOutputStream out = null;
try {
TMP = File.createTempFile("netty-chunk-", ".tmp");
TMP.deleteOnExit();
out = new FileOutputStream(TMP);
out.write(BYTES);
out.flush();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
// ignore
}
}
}
}
// See #310
@Test
public void testChunkedStream() {
check(new ChunkedStream(new ByteArrayInputStream(BYTES)));
check(new ChunkedStream(new ByteArrayInputStream(BYTES)), new ChunkedStream(new ByteArrayInputStream(BYTES)), new ChunkedStream(new ByteArrayInputStream(BYTES)));
}
@Test
public void testChunkedNioStream() {
check(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))));
check(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))), new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))), new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))));
}
@Test
public void testChunkedFile() throws IOException {
check(new ChunkedFile(TMP));
check(new ChunkedFile(TMP), new ChunkedFile(TMP), new ChunkedFile(TMP));
}
@Test
public void testChunkedNioFile() throws IOException {
check(new ChunkedNioFile(TMP));
check(new ChunkedNioFile(TMP), new ChunkedNioFile(TMP), new ChunkedNioFile(TMP));
}
private void check(ChunkedInput... inputs) {
EncoderEmbedder<ChannelBuffer> embedder = new EncoderEmbedder<ChannelBuffer>(new ChunkedWriteHandler());
for (ChunkedInput input: inputs) {
embedder.offer(input);
}
Assert.assertTrue(embedder.finish());
int i = 0;
int read = 0;
for (;;) {
ChannelBuffer buffer = embedder.poll();
if (buffer == null) {
break;
}
while (buffer.readable()) {
Assert.assertEquals(BYTES[i++], buffer.readByte());
read++;
if (i == BYTES.length) {
i = 0;
}
}
}
Assert.assertEquals(BYTES.length * inputs.length, read);
}
}