Merge pull request #30 from normanmaurer/3.2
Move channelConnected handling to worker thread + javadocs
This commit is contained in:
commit
ce1643c187
@ -71,6 +71,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannel;
|
|||||||
* <td>{@code "channelOpen"}</td>
|
* <td>{@code "channelOpen"}</td>
|
||||||
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#OPEN OPEN}, value = {@code true})</td>
|
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#OPEN OPEN}, value = {@code true})</td>
|
||||||
* <td>a {@link Channel} is open, but not bound nor connected</td>
|
* <td>a {@link Channel} is open, but not bound nor connected</td>
|
||||||
|
* <td><strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong></td>
|
||||||
* </tr>
|
* </tr>
|
||||||
* <tr>
|
* <tr>
|
||||||
* <td>{@code "channelClosed"}</td>
|
* <td>{@code "channelClosed"}</td>
|
||||||
@ -80,7 +81,8 @@ import org.jboss.netty.channel.socket.ServerSocketChannel;
|
|||||||
* <tr>
|
* <tr>
|
||||||
* <td>{@code "channelBound"}</td>
|
* <td>{@code "channelBound"}</td>
|
||||||
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#BOUND BOUND}, value = {@link SocketAddress})</td>
|
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#BOUND BOUND}, value = {@link SocketAddress})</td>
|
||||||
* <td>a {@link Channel} is open and bound to a local address, but not connected</td>
|
* <td>a {@link Channel} is open and bound to a local address, but not connected.</td>
|
||||||
|
* <td><strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong></td>
|
||||||
* </tr>
|
* </tr>
|
||||||
* <tr>
|
* <tr>
|
||||||
* <td>{@code "channelUnbound"}</td>
|
* <td>{@code "channelUnbound"}</td>
|
||||||
@ -91,6 +93,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannel;
|
|||||||
* <td>{@code "channelConnected"}</td>
|
* <td>{@code "channelConnected"}</td>
|
||||||
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#CONNECTED CONNECTED}, value = {@link SocketAddress})</td>
|
* <td>{@link ChannelStateEvent}<br/>(state = {@link ChannelState#CONNECTED CONNECTED}, value = {@link SocketAddress})</td>
|
||||||
* <td>a {@link Channel} is open, bound to a local address, and connected to a remote address</td>
|
* <td>a {@link Channel} is open, bound to a local address, and connected to a remote address</td>
|
||||||
|
* <td><strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong></td>
|
||||||
* </tr>
|
* </tr>
|
||||||
* <tr>
|
* <tr>
|
||||||
* <td>{@code "writeComplete"}</td>
|
* <td>{@code "writeComplete"}</td>
|
||||||
|
@ -51,7 +51,7 @@ public class ChannelLocal<T> {
|
|||||||
* Returns the initial value of the variable. By default, it returns
|
* Returns the initial value of the variable. By default, it returns
|
||||||
* {@code null}. Override it to change the initial value.
|
* {@code null}. Override it to change the initial value.
|
||||||
*/
|
*/
|
||||||
protected T initialValue(@SuppressWarnings("unused") Channel channel) {
|
protected T initialValue(Channel channel) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +150,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked when a {@link Channel} is open, but not bound nor connected.
|
* Invoked when a {@link Channel} is open, but not bound nor connected.
|
||||||
|
* <br/>
|
||||||
|
*
|
||||||
|
* <strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong>
|
||||||
*/
|
*/
|
||||||
public void channelOpen(
|
public void channelOpen(
|
||||||
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||||
@ -159,6 +162,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler {
|
|||||||
/**
|
/**
|
||||||
* Invoked when a {@link Channel} is open and bound to a local address,
|
* Invoked when a {@link Channel} is open and bound to a local address,
|
||||||
* but not connected.
|
* but not connected.
|
||||||
|
* <br/>
|
||||||
|
*
|
||||||
|
* <strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong>
|
||||||
*/
|
*/
|
||||||
public void channelBound(
|
public void channelBound(
|
||||||
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||||
@ -168,6 +174,9 @@ public class SimpleChannelUpstreamHandler implements ChannelUpstreamHandler {
|
|||||||
/**
|
/**
|
||||||
* Invoked when a {@link Channel} is open, bound to a local address, and
|
* Invoked when a {@link Channel} is open, bound to a local address, and
|
||||||
* connected to a remote address.
|
* connected to a remote address.
|
||||||
|
* <br/>
|
||||||
|
*
|
||||||
|
* <strong>Be aware that this event is fired from within the Boss-Thread so you should not execute any heavy operation in there as it will block the dispatching to other workers!</strong>
|
||||||
*/
|
*/
|
||||||
public void channelConnected(
|
public void channelConnected(
|
||||||
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||||
|
@ -15,7 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.jboss.netty.channel.socket.nio;
|
package org.jboss.netty.channel.socket.nio;
|
||||||
|
|
||||||
import static org.jboss.netty.channel.Channels.*;
|
import static org.jboss.netty.channel.Channels.fireChannelBound;
|
||||||
|
import static org.jboss.netty.channel.Channels.fireChannelOpen;
|
||||||
|
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
|
|
||||||
@ -46,8 +47,8 @@ final class NioAcceptedSocketChannel extends NioSocketChannel {
|
|||||||
this.bossThread = bossThread;
|
this.bossThread = bossThread;
|
||||||
|
|
||||||
setConnected();
|
setConnected();
|
||||||
|
|
||||||
fireChannelOpen(this);
|
fireChannelOpen(this);
|
||||||
fireChannelBound(this, getLocalAddress());
|
fireChannelBound(this, getLocalAddress());
|
||||||
fireChannelConnected(this, getRemoteAddress());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,6 @@ import org.jboss.netty.channel.AbstractChannel;
|
|||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelFactory;
|
import org.jboss.netty.channel.ChannelFactory;
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
import org.jboss.netty.channel.ChannelFutureListener;
|
|
||||||
import org.jboss.netty.channel.ChannelPipeline;
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
import org.jboss.netty.channel.ChannelSink;
|
import org.jboss.netty.channel.ChannelSink;
|
||||||
import org.jboss.netty.channel.MessageEvent;
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
@ -82,7 +82,7 @@ class NioWorker implements Runnable {
|
|||||||
|
|
||||||
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
|
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
|
||||||
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
|
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
|
||||||
|
|
||||||
NioWorker(int bossId, int id, Executor executor) {
|
NioWorker(int bossId, int id, Executor executor) {
|
||||||
this.bossId = bossId;
|
this.bossId = bossId;
|
||||||
this.id = id;
|
this.id = id;
|
||||||
@ -96,6 +96,7 @@ class NioWorker implements Runnable {
|
|||||||
Selector selector;
|
Selector selector;
|
||||||
|
|
||||||
synchronized (startStopLock) {
|
synchronized (startStopLock) {
|
||||||
|
|
||||||
if (!started) {
|
if (!started) {
|
||||||
// Open a selector if this worker didn't start yet.
|
// Open a selector if this worker didn't start yet.
|
||||||
try {
|
try {
|
||||||
@ -160,6 +161,7 @@ class NioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
SelectorUtil.select(selector);
|
SelectorUtil.select(selector);
|
||||||
|
|
||||||
// 'wakenUp.compareAndSet(false, true)' is always evaluated
|
// 'wakenUp.compareAndSet(false, true)' is always evaluated
|
||||||
@ -790,6 +792,12 @@ class NioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
fireChannelConnected(channel, remoteAddress);
|
fireChannelConnected(channel, remoteAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle the channelConnected in the worker thread
|
||||||
|
if (channel instanceof NioAcceptedSocketChannel) {
|
||||||
|
fireChannelConnected(channel, channel.getRemoteAddress());
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -288,7 +288,8 @@ final class SocketSendBufferPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void release() {
|
public void release() {
|
||||||
// Unpooled.
|
// Make sure the FileRegion resource are released otherwise it may cause a FD leak or something similar
|
||||||
|
file.releaseExternalResources();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,7 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.jboss.netty.channel.socket.oio;
|
package org.jboss.netty.channel.socket.oio;
|
||||||
|
|
||||||
import static org.jboss.netty.channel.Channels.*;
|
import static org.jboss.netty.channel.Channels.fireChannelBound;
|
||||||
|
import static org.jboss.netty.channel.Channels.fireChannelOpen;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
@ -60,10 +61,9 @@ class OioAcceptedSocketChannel extends OioSocketChannel {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ChannelException("Failed to obtain an OutputStream.", e);
|
throw new ChannelException("Failed to obtain an OutputStream.", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
fireChannelOpen(this);
|
fireChannelOpen(this);
|
||||||
fireChannelBound(this, getLocalAddress());
|
fireChannelBound(this, getLocalAddress());
|
||||||
fireChannelConnected(this, getRemoteAddress());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -20,12 +20,15 @@ import static org.jboss.netty.channel.Channels.*;
|
|||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.PushbackInputStream;
|
import java.io.PushbackInputStream;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
import java.nio.channels.WritableByteChannel;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
|
import org.jboss.netty.channel.FileRegion;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -50,7 +53,13 @@ class OioWorker implements Runnable {
|
|||||||
channel.workerThread = Thread.currentThread();
|
channel.workerThread = Thread.currentThread();
|
||||||
final PushbackInputStream in = channel.getInputStream();
|
final PushbackInputStream in = channel.getInputStream();
|
||||||
|
|
||||||
|
boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
|
||||||
|
|
||||||
while (channel.isOpen()) {
|
while (channel.isOpen()) {
|
||||||
|
if (fireConnected) {
|
||||||
|
fireConnected = false;
|
||||||
|
fireChannelConnected(channel, channel.getRemoteAddress());
|
||||||
|
}
|
||||||
synchronized (channel.interestOpsLock) {
|
synchronized (channel.interestOpsLock) {
|
||||||
while (!channel.isReadable()) {
|
while (!channel.isReadable()) {
|
||||||
try {
|
try {
|
||||||
@ -113,13 +122,39 @@ class OioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ChannelBuffer a = (ChannelBuffer) message;
|
int length = 0;
|
||||||
int length = a.readableBytes();
|
|
||||||
synchronized (out) {
|
// Add support to write a FileRegion. This in fact will not give any performance gain but at least it not fail and
|
||||||
a.getBytes(a.readerIndex(), out, length);
|
// we did the best to emulate it
|
||||||
|
if (message instanceof FileRegion) {
|
||||||
|
FileRegion fr = (FileRegion) message;
|
||||||
|
try {
|
||||||
|
synchronized (out) {
|
||||||
|
WritableByteChannel bchannel = Channels.newChannel(out);
|
||||||
|
|
||||||
|
long i = 0;
|
||||||
|
while ((i = fr.transferTo(bchannel, length)) > 0) {
|
||||||
|
length += i;
|
||||||
|
if (length >= fr.getCount()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
fr.releaseExternalResources();
|
||||||
|
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ChannelBuffer a = (ChannelBuffer) message;
|
||||||
|
length = a.readableBytes();
|
||||||
|
synchronized (out) {
|
||||||
|
a.getBytes(a.readerIndex(), out, length);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fireWriteComplete(channel, length);
|
fireWriteComplete(channel, length);
|
||||||
future.setSuccess();
|
future.setSuccess();
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
// Convert 'SocketException: Socket closed' to
|
// Convert 'SocketException: Socket closed' to
|
||||||
// ClosedChannelException.
|
// ClosedChannelException.
|
||||||
|
@ -370,7 +370,7 @@ public class SslHandler extends FrameDecoder
|
|||||||
* @deprecated Use {@link #handshake()} instead.
|
* @deprecated Use {@link #handshake()} instead.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
|
public ChannelFuture handshake(Channel channel) {
|
||||||
return handshake();
|
return handshake();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -394,7 +394,7 @@ public class SslHandler extends FrameDecoder
|
|||||||
* @deprecated Use {@link #close()} instead.
|
* @deprecated Use {@link #close()} instead.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
|
public ChannelFuture close(Channel channel) {
|
||||||
return close();
|
return close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler
|
|||||||
timer.stop();
|
timer.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected long getTimeoutMillis(@SuppressWarnings("unused") MessageEvent e) {
|
protected long getTimeoutMillis(MessageEvent e) {
|
||||||
return timeoutMillis;
|
return timeoutMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user