Merge pull request #32 from normanmaurer/master

Push fixes from 3.2 branch to master
This commit is contained in:
Norman Maurer 2011-10-21 09:18:42 -07:00
commit 8fb34a4717
7 changed files with 52 additions and 8 deletions

View File

@ -71,6 +71,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannel;
* <td>{@code "channelOpen"}</td> * <td>{@code "channelOpen"}</td>
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#OPEN OPEN}, value = {@code true})</td> * <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#OPEN OPEN}, value = {@code true})</td>
* <td>a {@link Channel} is open, but not bound nor connected</td> * <td>a {@link Channel} is open, but not bound nor connected</td>
* <td><strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong></td>
* </tr> * </tr>
* <tr> * <tr>
* <td>{@code "channelClosed"}</td> * <td>{@code "channelClosed"}</td>
@ -81,6 +82,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannel;
* <td>{@code "channelBound"}</td> * <td>{@code "channelBound"}</td>
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#BOUND BOUND}, value = {@link SocketAddress})</td> * <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#BOUND BOUND}, value = {@link SocketAddress})</td>
* <td>a {@link Channel} is open and bound to a local address, but not connected</td> * <td>a {@link Channel} is open and bound to a local address, but not connected</td>
* <td><strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong></td>
* </tr> * </tr>
* <tr> * <tr>
* <td>{@code "channelUnbound"}</td> * <td>{@code "channelUnbound"}</td>

View File

@ -151,7 +151,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler {
/** /**
* Invoked when a {@link Channel} is open, but not bound nor connected. * Invoked when a {@link Channel} is open, but not bound nor connected.
*/ * <br/>
* <strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong>
*/
public void channelOpen( public void channelOpen(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
ctx.sendUpstream(e); ctx.sendUpstream(e);
@ -160,6 +162,8 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler {
/** /**
* Invoked when a {@link Channel} is open and bound to a local address, * Invoked when a {@link Channel} is open and bound to a local address,
* but not connected. * but not connected.
* <br/>
* <strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong>
*/ */
public void channelBound( public void channelBound(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

View File

@ -48,6 +48,5 @@ final class NioAcceptedSocketChannel extends NioSocketChannel {
setConnected(); setConnected();
fireChannelOpen(this); fireChannelOpen(this);
fireChannelBound(this, getLocalAddress()); fireChannelBound(this, getLocalAddress());
fireChannelConnected(this, getRemoteAddress());
} }
} }

View File

@ -782,6 +782,11 @@ class NioWorker implements Runnable {
} }
fireChannelConnected(channel, remoteAddress); fireChannelConnected(channel, remoteAddress);
} }
// Handle the channelConnected in the worker thread
if (channel instanceof NioAcceptedSocketChannel) {
fireChannelConnected(channel, channel.getRemoteAddress());
}
} }
} }
} }

View File

@ -306,7 +306,8 @@ final class SocketSendBufferPool {
@Override @Override
public void release() { public void release() {
// Unpooled. // Make sure the FileRegion resource are released otherwise it may cause a FD leak or something similar
file.releaseExternalResources();
} }
} }

View File

@ -63,7 +63,6 @@ class OioAcceptedSocketChannel extends OioSocketChannel {
fireChannelOpen(this); fireChannelOpen(this);
fireChannelBound(this, getLocalAddress()); fireChannelBound(this, getLocalAddress());
fireChannelConnected(this, getRemoteAddress());
} }
@Override @Override

View File

@ -20,12 +20,15 @@ import static org.jboss.netty.channel.Channels.*;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PushbackInputStream; import java.io.PushbackInputStream;
import java.net.SocketException; import java.net.SocketException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.FileRegion;
/** /**
* *
@ -50,8 +53,13 @@ class OioWorker implements Runnable {
public void run() { public void run() {
channel.workerThread = Thread.currentThread(); channel.workerThread = Thread.currentThread();
final PushbackInputStream in = channel.getInputStream(); final PushbackInputStream in = channel.getInputStream();
boolean fireOpen = channel instanceof OioAcceptedSocketChannel;
while (channel.isOpen()) { while (channel.isOpen()) {
if (fireOpen) {
fireOpen = false;
fireChannelConnected(channel, channel.getRemoteAddress());
}
synchronized (channel.interestOpsLock) { synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) { while (!channel.isReadable()) {
try { try {
@ -114,13 +122,39 @@ class OioWorker implements Runnable {
} }
try { try {
ChannelBuffer a = (ChannelBuffer) message; int length = 0;
int length = a.readableBytes();
synchronized (out) { // Add support to write a FileRegion. This in fact will not give any performance gain but at least it not fail and
a.getBytes(a.readerIndex(), out, length); // we did the best to emulate it
if (message instanceof FileRegion) {
FileRegion fr = (FileRegion) message;
try {
synchronized (out) {
WritableByteChannel bchannel = Channels.newChannel(out);
long i = 0;
while ((i = fr.transferTo(bchannel, length)) > 0) {
length += i;
if (length >= fr.getCount()) {
break;
}
}
}
} finally {
fr.releaseExternalResources();
}
} else {
ChannelBuffer a = (ChannelBuffer) message;
length = a.readableBytes();
synchronized (out) {
a.getBytes(a.readerIndex(), out, length);
}
} }
fireWriteComplete(channel, length); fireWriteComplete(channel, length);
future.setSuccess(); future.setSuccess();
} catch (Throwable t) { } catch (Throwable t) {
// Convert 'SocketException: Socket closed' to // Convert 'SocketException: Socket closed' to
// ClosedChannelException. // ClosedChannelException.