From 754cd9984335fd2691ef7351b8833d1eac1b7918 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 1 Jun 2012 00:36:12 -0700 Subject: [PATCH] Port ChunkedWriteHandler --- .../codec/embedder/EmbeddedChannel.java | 11 +- handler/pom.xml | 5 + .../io/netty/handler/stream/ChunkedFile.java | 2 - .../netty/handler/stream/ChunkedNioFile.java | 2 - .../handler/stream/ChunkedWriteHandler.java | 348 ++++++++---------- .../stream/ChunkedWriteHandlerTest.java | 5 +- 6 files changed, 169 insertions(+), 204 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java index a5e59eef76..89c0d21a0e 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java @@ -16,7 +16,6 @@ package io.netty.handler.codec.embedder; import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; import io.netty.channel.AbstractChannel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; @@ -37,8 +36,7 @@ class EmbeddedChannel extends AbstractChannel { private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED EmbeddedChannel(Queue productQueue) { - super(null, null, ChannelBufferHolders.catchAllBuffer( - productQueue, ChannelBuffers.dynamicBuffer())); + super(null, null, ChannelBufferHolders.catchAllBuffer()); this.productQueue = productQueue; } @@ -105,7 +103,12 @@ class EmbeddedChannel extends AbstractChannel { productQueue.add(byteBuf.readBytes(byteBufLen)); byteBuf.clear(); } - // We do nothing for message buffer because it's actually productQueue. + + Queue msgBuf = buf.messageBuffer(); + if (!msgBuf.isEmpty()) { + productQueue.addAll(msgBuf); + msgBuf.clear(); + } } @Override diff --git a/handler/pom.xml b/handler/pom.xml index 4f89f71406..c7d2646e26 100644 --- a/handler/pom.xml +++ b/handler/pom.xml @@ -39,6 +39,11 @@ netty-transport ${project.version} + + ${project.groupId} + netty-codec + ${project.version} + diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java index 7aa0d06320..370f2ac086 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java @@ -21,8 +21,6 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import io.netty.channel.FileRegion; - /** * A {@link ChunkedInput} that fetches data from a file chunk by chunk. *

diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java index 8c3efa572b..3faf26bf4d 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java @@ -23,8 +23,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import io.netty.channel.FileRegion; - /** * A {@link ChunkedInput} that fetches data from a file chunk by chunk using * NIO {@link FileChannel}. diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 11e5ea82f2..2651a74065 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -17,20 +17,25 @@ package io.netty.handler.stream; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.QueueFactory; -import java.io.IOException; -import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * A {@link ChannelHandler} that adds support for writing a large data stream @@ -65,16 +70,36 @@ import java.util.concurrent.atomic.AtomicBoolean; * @apiviz.landmark * @apiviz.has io.netty.handler.stream.ChunkedInput oneway - - reads from */ -public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler { +public class ChunkedWriteHandler extends ChannelHandlerAdapter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); - private final Queue queue = QueueFactory.createQueue(MessageEvent.class); + private static final int MAX_PENDING_WRITES = 4; + + private final Queue queue = QueueFactory.createQueue(); private volatile ChannelHandlerContext ctx; + private final AtomicInteger pendingWrites = new AtomicInteger(); private final AtomicBoolean flush = new AtomicBoolean(false); - private MessageEvent currentEvent; + private Object currentEvent; + + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + this.ctx = ctx; + return ChannelBufferHolders.inboundBypassBuffer(ctx); + } + + @Override + public ChannelBufferHolder newOutboundBuffer( + ChannelOutboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(queue); + } + + private boolean isWritable() { + return pendingWrites.get() < MAX_PENDING_WRITES; + } /** * Continues to fetch the chunks from the input. @@ -86,7 +111,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } try { - flush(ctx, false); + doFlush(ctx, false); } catch (Exception e) { if (logger.isWarnEnabled()) { logger.warn("Unexpected exception while sending chunks.", e); @@ -94,50 +119,25 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } } - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - if (!(e instanceof MessageEvent)) { - ctx.sendDownstream(e); - return; - } - - boolean offered = queue.offer((MessageEvent) e); - assert offered; - - final Channel channel = ctx.getChannel(); - // call flush if the channel is writable or not connected. flush(..) will take care of the rest - - if (channel.isWritable() || !channel.isConnected()) { - this.ctx = ctx; - flush(ctx, false); + @Override + public void flush(ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + queue.add(future); + if (isWritable() || !ctx.channel().isActive()) { + doFlush(ctx, false); } } - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - if (e instanceof ChannelStateEvent) { - ChannelStateEvent cse = (ChannelStateEvent) e; - switch (cse.getState()) { - case INTEREST_OPS: - // Continue writing when the channel becomes writable. - flush(ctx, true); - break; - case OPEN: - if (!Boolean.TRUE.equals(cse.getValue())) { - // Fail all pending writes - flush(ctx, true); - } - break; - } - } - ctx.sendUpstream(e); + @Override + public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { + doFlush(ctx, true); + super.channelInactive(ctx); } - private void discard(ChannelHandlerContext ctx, boolean fireNow) { - ClosedChannelException cause = null; + private void discard(final ChannelHandlerContext ctx, final Throwable cause) { + boolean fireExceptionCaught = false; for (;;) { - MessageEvent currentEvent = this.currentEvent; + Object currentEvent = this.currentEvent; if (this.currentEvent == null) { currentEvent = queue.poll(); @@ -149,45 +149,39 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns break; } - - Object m = currentEvent.getMessage(); - if (m instanceof ChunkedInput) { - closeInput((ChunkedInput) m); + if (currentEvent instanceof ChunkedInput) { + closeInput((ChunkedInput) currentEvent); + } else if (currentEvent instanceof ChannelFuture) { + fireExceptionCaught = true; + ((ChannelFuture) currentEvent).setFailure(cause); } - - // Trigger a ClosedChannelException - if (cause == null) { - cause = new ClosedChannelException(); - } - currentEvent.getFuture().setFailure(cause); - - currentEvent = null; } - - if (cause != null) { - if (fireNow) { - Channels.fireExceptionCaught(ctx.getChannel(), cause); + if (fireExceptionCaught) { + if (ctx.eventLoop().inEventLoop()) { + ctx.fireExceptionCaught(cause); } else { - Channels.fireExceptionCaughtLater(ctx.getChannel(), cause); + ctx.eventLoop().execute(new Runnable() { + @Override + public void run() { + ctx.fireExceptionCaught(cause); + } + }); } } } - private void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception { + private void doFlush(final ChannelHandlerContext ctx, boolean fireNow) throws Exception { boolean acquired = false; - final Channel channel = ctx.getChannel(); - - // use CAS to see if the have flush already running, if so we don't need to take futher actions + Channel channel = ctx.channel(); + // use CAS to see if the have flush already running, if so we don't need to take further actions if (acquired = flush.compareAndSet(false, true)) { try { - - if (!channel.isConnected()) { - discard(ctx, fireNow); + if (!channel.isActive()) { + discard(ctx, new ClosedChannelException()); return; } - - while (channel.isWritable()) { + while (isWritable()) { if (currentEvent == null) { currentEvent = queue.poll(); } @@ -196,89 +190,96 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns break; } - if (currentEvent.getFuture().isDone()) { - // Skip the current request because the previous partial write - // attempt for the current request has been failed. - currentEvent = null; - } else { - final MessageEvent currentEvent = this.currentEvent; - Object m = currentEvent.getMessage(); - if (m instanceof ChunkedInput) { - final ChunkedInput chunks = (ChunkedInput) m; - Object chunk; - boolean endOfInput; - boolean suspend; - try { - chunk = chunks.nextChunk(); - endOfInput = chunks.isEndOfInput(); - if (chunk == null) { - chunk = ChannelBuffers.EMPTY_BUFFER; - // No need to suspend when reached at the end. - suspend = !endOfInput; - } else { - suspend = false; - } - } catch (Throwable t) { - this.currentEvent = null; - - currentEvent.getFuture().setFailure(t); - if (fireNow) { - fireExceptionCaught(ctx, t); - } else { - fireExceptionCaughtLater(ctx, t); - } - - closeInput(chunks); - break; - } - - if (suspend) { - // ChunkedInput.nextChunk() returned null and it has - // not reached at the end of input. Let's wait until - // more chunks arrive. Nothing to write or notify. - break; + final Object currentEvent = this.currentEvent; + if (currentEvent instanceof ChannelFuture) { + this.currentEvent = null; + ctx.flush((ChannelFuture) currentEvent); + } else if (currentEvent instanceof ChunkedInput) { + final ChunkedInput chunks = (ChunkedInput) currentEvent; + Object chunk; + boolean endOfInput; + boolean suspend; + try { + chunk = chunks.nextChunk(); + endOfInput = chunks.isEndOfInput(); + if (chunk == null) { + chunk = ChannelBuffers.EMPTY_BUFFER; + // No need to suspend when reached at the end. + suspend = !endOfInput; } else { - ChannelFuture writeFuture; - if (endOfInput) { - this.currentEvent = null; - writeFuture = currentEvent.getFuture(); - - // Register a listener which will close the input once the write is complete. This is needed because the Chunk may have - // some resource bound that can not be closed before its not written - // - // See https://github.com/netty/netty/issues/303 - writeFuture.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - closeInput(chunks); - } - }); - } else { - writeFuture = future(channel); - writeFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - currentEvent.getFuture().setFailure(future.getCause()); - closeInput((ChunkedInput) currentEvent.getMessage()); - } - } - }); - } - - Channels.write( - ctx, writeFuture, chunk, - currentEvent.getRemoteAddress()); + suspend = false; } - } else { + } catch (final Throwable t) { this.currentEvent = null; - ctx.sendDownstream(currentEvent); + + if (ctx.eventLoop().inEventLoop()) { + ctx.fireExceptionCaught(t); + } else { + ctx.eventLoop().execute(new Runnable() { + @Override + public void run() { + ctx.fireExceptionCaught(t); + } + }); + } + + closeInput(chunks); + break; } + + if (suspend) { + // ChunkedInput.nextChunk() returned null and it has + // not reached at the end of input. Let's wait until + // more chunks arrive. Nothing to write or notify. + break; + } + + pendingWrites.incrementAndGet(); + ctx.nextOutboundMessageBuffer().add(chunk); + ChannelFuture f = ctx.flush(); + if (endOfInput) { + this.currentEvent = null; + + // Register a listener which will close the input once the write is complete. This is needed because the Chunk may have + // some resource bound that can not be closed before its not written + // + // See https://github.com/netty/netty/issues/303 + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + pendingWrites.decrementAndGet(); + closeInput(chunks); + } + }); + } else if (isWritable()) { + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + pendingWrites.decrementAndGet(); + if (!future.isSuccess()) { + closeInput((ChunkedInput) currentEvent); + } + } + }); + } else { + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + pendingWrites.decrementAndGet(); + if (!future.isSuccess()) { + closeInput((ChunkedInput) currentEvent); + } else if (isWritable()) { + resumeTransfer(); + } + } + }); + } + } else { + ctx.nextOutboundMessageBuffer().add(currentEvent); } - if (!channel.isConnected()) { - discard(ctx, fireNow); + if (!channel.isActive()) { + discard(ctx, new ClosedChannelException()); return; } } @@ -286,11 +287,10 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns // mark the flush as done flush.set(false); } - } - if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) { - flush(ctx, fireNow); + if (acquired && (!channel.isActive() || isWritable() && !queue.isEmpty())) { + doFlush(ctx, fireNow); } } @@ -304,61 +304,19 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } } - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - // nothing to do - - } - - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // nothing to do - - } - + @Override public void beforeRemove(ChannelHandlerContext ctx) throws Exception { // try to flush again a last time. // // See #304 - flush(ctx, false); + doFlush(ctx, false); } // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events + @Override public void afterRemove(ChannelHandlerContext ctx) throws Exception { // Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the // ChannelFuture and the registered FutureListener. See #304 - // - Throwable cause = null; - boolean fireExceptionCaught = false; - - for (;;) { - MessageEvent currentEvent = this.currentEvent; - - if (this.currentEvent == null) { - currentEvent = queue.poll(); - } else { - this.currentEvent = null; - } - - if (currentEvent == null) { - break; - } - - Object m = currentEvent.getMessage(); - if (m instanceof ChunkedInput) { - closeInput((ChunkedInput) m); - } - - // Create exception - if (cause == null) { - cause = new IOException("Unable to flush event, discarding"); - } - currentEvent.getFuture().setFailure(cause); - fireExceptionCaught = true; - - currentEvent = null; - } - - if (fireExceptionCaught) { - Channels.fireExceptionCaughtLater(ctx.getChannel(), cause); - } + discard(ctx, new ChannelException(ChunkedWriteHandler.class.getSimpleName() + " removed from pipeline.")); } } diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index e6151a03c3..74dba02bd7 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -16,6 +16,7 @@ package io.netty.handler.stream; import io.netty.buffer.ChannelBuffer; +import io.netty.handler.codec.embedder.EncoderEmbedder; import java.io.ByteArrayInputStream; import java.io.File; @@ -89,7 +90,9 @@ public class ChunkedWriteHandlerTest { } private static void check(ChunkedInput... inputs) { - EncoderEmbedder embedder = new EncoderEmbedder(new ChunkedWriteHandler()); + EncoderEmbedder embedder = + new EncoderEmbedder(new ChunkedWriteHandler()); + for (ChunkedInput input: inputs) { embedder.offer(input); }