diff --git a/src/main/java/org/jboss/netty/channel/ChannelEvent.java b/src/main/java/org/jboss/netty/channel/ChannelEvent.java index 020cf47094..cd5d175c0a 100644 --- a/src/main/java/org/jboss/netty/channel/ChannelEvent.java +++ b/src/main/java/org/jboss/netty/channel/ChannelEvent.java @@ -71,6 +71,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannel; * {@code "channelOpen"} * {@link ChannelStateEvent}
(state = {@link ChannelState#OPEN OPEN}, value = {@code true}) * a {@link Channel} is open, but not bound nor connected + * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! * * * {@code "channelClosed"} @@ -80,7 +81,8 @@ import org.jboss.netty.channel.socket.ServerSocketChannel; * * {@code "channelBound"} * {@link ChannelStateEvent}
(state = {@link ChannelState#BOUND BOUND}, value = {@link SocketAddress}) - * a {@link Channel} is open and bound to a local address, but not connected + * a {@link Channel} is open and bound to a local address, but not connected. + * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! * * * {@code "channelUnbound"} @@ -91,6 +93,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannel; * {@code "channelConnected"} * {@link ChannelStateEvent}
(state = {@link ChannelState#CONNECTED CONNECTED}, value = {@link SocketAddress}) * a {@link Channel} is open, bound to a local address, and connected to a remote address + * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! * * * {@code "writeComplete"} diff --git a/src/main/java/org/jboss/netty/channel/ChannelLocal.java b/src/main/java/org/jboss/netty/channel/ChannelLocal.java index 407700ee3f..32abb6ae6a 100644 --- a/src/main/java/org/jboss/netty/channel/ChannelLocal.java +++ b/src/main/java/org/jboss/netty/channel/ChannelLocal.java @@ -51,7 +51,7 @@ public class ChannelLocal { * Returns the initial value of the variable. By default, it returns * {@code null}. Override it to change the initial value. */ - protected T initialValue(@SuppressWarnings("unused") Channel channel) { + protected T initialValue(Channel channel) { return null; } diff --git a/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java b/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java index 590a04a7c6..8fc3c286bf 100644 --- a/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java +++ b/src/main/java/org/jboss/netty/channel/SimpleChannelUpstreamHandler.java @@ -150,6 +150,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler { /** * Invoked when a {@link Channel} is open, but not bound nor connected. + *
+ * + * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! */ public void channelOpen( ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { @@ -159,6 +162,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler { /** * Invoked when a {@link Channel} is open and bound to a local address, * but not connected. + *
+ * + * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! */ public void channelBound( ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { @@ -168,6 +174,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler { /** * Invoked when a {@link Channel} is open, bound to a local address, and * connected to a remote address. + *
+ * + * Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers! */ public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java index d9ee05ded5..187b727d3b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioAcceptedSocketChannel.java @@ -15,7 +15,8 @@ */ package org.jboss.netty.channel.socket.nio; -import static org.jboss.netty.channel.Channels.*; +import static org.jboss.netty.channel.Channels.fireChannelBound; +import static org.jboss.netty.channel.Channels.fireChannelOpen; import java.nio.channels.SocketChannel; @@ -46,8 +47,8 @@ final class NioAcceptedSocketChannel extends NioSocketChannel { this.bossThread = bossThread; setConnected(); + fireChannelOpen(this); fireChannelBound(this, getLocalAddress()); - fireChannelConnected(this, getRemoteAddress()); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 7b547b7b8a..6dc97527a5 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -29,7 +29,6 @@ import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 8618ab3c4e..0618dec97f 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -82,7 +82,7 @@ class NioWorker implements Runnable { private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); - + NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; this.id = id; @@ -96,6 +96,7 @@ class NioWorker implements Runnable { Selector selector; synchronized (startStopLock) { + if (!started) { // Open a selector if this worker didn't start yet. try { @@ -160,6 +161,7 @@ class NioWorker implements Runnable { } try { + SelectorUtil.select(selector); // 'wakenUp.compareAndSet(false, true)' is always evaluated @@ -790,6 +792,12 @@ class NioWorker implements Runnable { } fireChannelConnected(channel, remoteAddress); } + + // Handle the channelConnected in the worker thread + if (channel instanceof NioAcceptedSocketChannel) { + fireChannelConnected(channel, channel.getRemoteAddress()); + + } } } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java index 70826d0300..d4f898ab8c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/SocketSendBufferPool.java @@ -288,7 +288,8 @@ final class SocketSendBufferPool { } public void release() { - // Unpooled. + // Make sure the FileRegion resource are released otherwise it may cause a FD leak or something similar + file.releaseExternalResources(); } } diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioAcceptedSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioAcceptedSocketChannel.java index 0454b2f6a3..c8728df6db 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioAcceptedSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioAcceptedSocketChannel.java @@ -15,7 +15,8 @@ */ package org.jboss.netty.channel.socket.oio; -import static org.jboss.netty.channel.Channels.*; +import static org.jboss.netty.channel.Channels.fireChannelBound; +import static org.jboss.netty.channel.Channels.fireChannelOpen; import java.io.IOException; import java.io.OutputStream; @@ -60,10 +61,9 @@ class OioAcceptedSocketChannel extends OioSocketChannel { } catch (IOException e) { throw new ChannelException("Failed to obtain an OutputStream.", e); } - + fireChannelOpen(this); fireChannelBound(this, getLocalAddress()); - fireChannelConnected(this, getRemoteAddress()); } @Override diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java index 11ba4001d1..eaee7fbdb8 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioWorker.java @@ -20,12 +20,15 @@ import static org.jboss.netty.channel.Channels.*; import java.io.OutputStream; import java.io.PushbackInputStream; import java.net.SocketException; +import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; +import java.nio.channels.WritableByteChannel; import java.util.regex.Pattern; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.FileRegion; /** * @@ -50,7 +53,13 @@ class OioWorker implements Runnable { channel.workerThread = Thread.currentThread(); final PushbackInputStream in = channel.getInputStream(); + boolean fireConnected = channel instanceof OioAcceptedSocketChannel; + while (channel.isOpen()) { + if (fireConnected) { + fireConnected = false; + fireChannelConnected(channel, channel.getRemoteAddress()); + } synchronized (channel.interestOpsLock) { while (!channel.isReadable()) { try { @@ -113,13 +122,39 @@ class OioWorker implements Runnable { } try { - ChannelBuffer a = (ChannelBuffer) message; - int length = a.readableBytes(); - synchronized (out) { - a.getBytes(a.readerIndex(), out, length); + int length = 0; + + // Add support to write a FileRegion. This in fact will not give any performance gain but at least it not fail and + // we did the best to emulate it + if (message instanceof FileRegion) { + FileRegion fr = (FileRegion) message; + try { + synchronized (out) { + WritableByteChannel bchannel = Channels.newChannel(out); + + long i = 0; + while ((i = fr.transferTo(bchannel, length)) > 0) { + length += i; + if (length >= fr.getCount()) { + break; + } + } + } + } finally { + fr.releaseExternalResources(); + + } + } else { + ChannelBuffer a = (ChannelBuffer) message; + length = a.readableBytes(); + synchronized (out) { + a.getBytes(a.readerIndex(), out, length); + } } + fireWriteComplete(channel, length); future.setSuccess(); + } catch (Throwable t) { // Convert 'SocketException: Socket closed' to // ClosedChannelException. diff --git a/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java b/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java index 940953248e..c22bded121 100644 --- a/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java +++ b/src/main/java/org/jboss/netty/handler/ssl/SslHandler.java @@ -370,7 +370,7 @@ public class SslHandler extends FrameDecoder * @deprecated Use {@link #handshake()} instead. */ @Deprecated - public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) { + public ChannelFuture handshake(Channel channel) { return handshake(); } @@ -394,7 +394,7 @@ public class SslHandler extends FrameDecoder * @deprecated Use {@link #close()} instead. */ @Deprecated - public ChannelFuture close(@SuppressWarnings("unused") Channel channel) { + public ChannelFuture close(Channel channel) { return close(); } diff --git a/src/main/java/org/jboss/netty/handler/timeout/WriteTimeoutHandler.java b/src/main/java/org/jboss/netty/handler/timeout/WriteTimeoutHandler.java index 3fe62cf319..63c15f904d 100644 --- a/src/main/java/org/jboss/netty/handler/timeout/WriteTimeoutHandler.java +++ b/src/main/java/org/jboss/netty/handler/timeout/WriteTimeoutHandler.java @@ -135,7 +135,7 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler timer.stop(); } - protected long getTimeoutMillis(@SuppressWarnings("unused") MessageEvent e) { + protected long getTimeoutMillis(MessageEvent e) { return timeoutMillis; }