diff --git a/src/main/java/org/jboss/netty/channel/Channels.java b/src/main/java/org/jboss/netty/channel/Channels.java index 43f1fcf83d..de17862a9e 100644 --- a/src/main/java/org/jboss/netty/channel/Channels.java +++ b/src/main/java/org/jboss/netty/channel/Channels.java @@ -408,6 +408,31 @@ public class Channels { fireMessageReceived(ctx, message, remoteAddress); } + /** + * Sends a {@code "writeComplete"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + */ + public static void fireWriteComplete(Channel channel, int amount) { + if (amount == 0) { + return; + } + + channel.getPipeline().sendUpstream( + new DefaultWriteCompletionEvent( + channel, succeededFuture(channel), amount)); + } + + /** + * Sends a {@code "writeComplete"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + */ + public static void fireWriteCompleted(ChannelHandlerContext ctx, int amount) { + ctx.sendUpstream(new DefaultWriteCompletionEvent( + ctx.getChannel(), succeededFuture(ctx.getChannel()), amount)); + } + /** * Sends a {@code "channelInterestChanged"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of diff --git a/src/main/java/org/jboss/netty/channel/DefaultWriteCompletionEvent.java b/src/main/java/org/jboss/netty/channel/DefaultWriteCompletionEvent.java new file mode 100644 index 0000000000..66f2f2d6e6 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/DefaultWriteCompletionEvent.java @@ -0,0 +1,67 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel; + +/** + * The default {@link WriteCompletionEvent} implementation. + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + */ +public class DefaultWriteCompletionEvent extends DefaultChannelEvent implements + WriteCompletionEvent { + + private final int writtenAmount; + + /** + * Creates a new instance. + */ + public DefaultWriteCompletionEvent( + Channel channel, ChannelFuture future, int writtenAmount) { + + super(channel, future); + if (writtenAmount <= 0) { + throw new IllegalArgumentException( + "writtenAmount must be a positive integer: " + writtenAmount); + } + + this.writtenAmount = writtenAmount; + } + + public int getWrittenAmount() { + return writtenAmount; + } + + @Override + public String toString() { + String parentString = super.toString(); + StringBuilder buf = new StringBuilder(parentString.length() + 32); + buf.append(parentString); + buf.append(" - (writtenAmount: "); + buf.append(getWrittenAmount()); + buf.append(')'); + return buf.toString(); + } +} diff --git a/src/main/java/org/jboss/netty/channel/SimpleChannelHandler.java b/src/main/java/org/jboss/netty/channel/SimpleChannelHandler.java index 7ccdf5f0ec..8135cc80af 100644 --- a/src/main/java/org/jboss/netty/channel/SimpleChannelHandler.java +++ b/src/main/java/org/jboss/netty/channel/SimpleChannelHandler.java @@ -103,6 +103,9 @@ public class SimpleChannelHandler implements ChannelUpstreamHandler, ChannelDown if (e instanceof MessageEvent) { messageReceived(ctx, (MessageEvent) e); + } else if (e instanceof WriteCompletionEvent) { + WriteCompletionEvent evt = (WriteCompletionEvent) e; + writeComplete(ctx, evt); } else if (e instanceof ChildChannelStateEvent) { ChildChannelStateEvent evt = (ChildChannelStateEvent) e; if (evt.getChildChannel().isOpen()) { @@ -230,6 +233,14 @@ public class SimpleChannelHandler implements ChannelUpstreamHandler, ChannelDown ctx.sendUpstream(e); } + /** + * Invoked when something was written into a {@link Channel}. + */ + public void writeComplete( + ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { + ctx.sendUpstream(e); + } + /** * Invoked when a child {@link Channel} was open. * (e.g. a server channel accepted a connection) diff --git a/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java b/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java index 843aabf1c1..4c092a6f33 100644 --- a/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java +++ b/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java @@ -85,6 +85,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler { if (e instanceof MessageEvent) { messageReceived(ctx, (MessageEvent) e); + } else if (e instanceof WriteCompletionEvent) { + WriteCompletionEvent evt = (WriteCompletionEvent) e; + writeComplete(ctx, evt); } else if (e instanceof ChildChannelStateEvent) { ChildChannelStateEvent evt = (ChildChannelStateEvent) e; if (evt.getChildChannel().isOpen()) { @@ -212,6 +215,14 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler { ctx.sendUpstream(e); } + /** + * Invoked when something was written into a {@link Channel}. + */ + public void writeComplete( + ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { + ctx.sendUpstream(e); + } + /** * Invoked when a child {@link Channel} was open. * (e.g. a server channel accepted a connection) diff --git a/src/main/java/org/jboss/netty/channel/WriteCompletionEvent.java b/src/main/java/org/jboss/netty/channel/WriteCompletionEvent.java new file mode 100644 index 0000000000..8c08d994d3 --- /dev/null +++ b/src/main/java/org/jboss/netty/channel/WriteCompletionEvent.java @@ -0,0 +1,45 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2008, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.channel; + +/** + * A {@link ChannelEvent} which represents the notification of the completion + * of a write request on a {@link Channel}. This event is for going upstream + * only. Please refer to the {@link ChannelEvent} documentation to find out + * what an upstream event and a downstream event are and what fundamental + * differences they have. + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * + * @version $Rev$, $Date$ + */ +public interface WriteCompletionEvent extends ChannelEvent { + /** + * Returns the amount of data written. + * + * @return the number of written bytes or messages, depending on the + * type of the transport + */ + int getWrittenAmount(); +} diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 9f0f353d60..3fdd509c4b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -417,7 +417,7 @@ class NioWorker implements Runnable { } } - //fireChannelWritten(channel, writtenBytes); + fireWriteComplete(channel, writtenBytes); if (open) { if (addOpWrite) { diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java index 40fadd1624..88488e0e8b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java @@ -118,7 +118,7 @@ class OioWorker implements Runnable { synchronized (out) { a.getBytes(a.readerIndex(), out, bytes); } - //fireChannelWritten(channel, bytes); + fireWriteComplete(channel, bytes); future.setSuccess(); } catch (Throwable t) { future.setFailure(t);