Merge pull request #37 from normanmaurer/master

Safe construction via factory method
This commit is contained in:
Norman Maurer 2011-11-03 07:36:20 -07:00
commit 220d95fe0d
26 changed files with 154 additions and 62 deletions

View File

@ -59,7 +59,16 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
volatile LocalAddress localAddress; volatile LocalAddress localAddress;
volatile LocalAddress remoteAddress; volatile LocalAddress remoteAddress;
DefaultLocalChannel(LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, DefaultLocalChannel pairedChannel) { static DefaultLocalChannel create(LocalServerChannel parent,
ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink,
DefaultLocalChannel pairedChannel) {
DefaultLocalChannel instance = new DefaultLocalChannel(parent, factory, pipeline, sink,
pairedChannel);
fireChannelOpen(instance);
return instance;
}
private DefaultLocalChannel(LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, DefaultLocalChannel pairedChannel) {
super(parent, factory, pipeline, sink); super(parent, factory, pipeline, sink);
this.pairedChannel = pairedChannel; this.pairedChannel = pairedChannel;
config = new DefaultChannelConfig(); config = new DefaultChannelConfig();
@ -73,7 +82,6 @@ final class DefaultLocalChannel extends AbstractChannel implements LocalChannel
} }
}); });
fireChannelOpen(this);
} }
@Override @Override

View File

@ -41,7 +41,7 @@ public class DefaultLocalClientChannelFactory implements LocalClientChannelFacto
@Override @Override
public LocalChannel newChannel(ChannelPipeline pipeline) { public LocalChannel newChannel(ChannelPipeline pipeline) {
return new DefaultLocalChannel(null, this, pipeline, sink, null); return DefaultLocalChannel.create(null, this, pipeline, sink, null);
} }
/** /**

View File

@ -32,17 +32,27 @@ import org.jboss.netty.channel.DefaultServerChannelConfig;
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
final class DefaultLocalServerChannel extends AbstractServerChannel final class DefaultLocalServerChannel extends AbstractServerChannel implements
implements LocalServerChannel { LocalServerChannel {
final ChannelConfig channelConfig; final ChannelConfig channelConfig;
final AtomicBoolean bound = new AtomicBoolean(); final AtomicBoolean bound = new AtomicBoolean();
volatile LocalAddress localAddress; volatile LocalAddress localAddress;
DefaultLocalServerChannel(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) { static DefaultLocalServerChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
DefaultLocalServerChannel instance =
new DefaultLocalServerChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
}
private DefaultLocalServerChannel(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
super(factory, pipeline, sink); super(factory, pipeline, sink);
channelConfig = new DefaultServerChannelConfig(); channelConfig = new DefaultServerChannelConfig();
fireChannelOpen(this);
} }
@Override @Override
@ -57,7 +67,7 @@ final class DefaultLocalServerChannel extends AbstractServerChannel
@Override @Override
public LocalAddress getLocalAddress() { public LocalAddress getLocalAddress() {
return isBound()? localAddress : null; return isBound() ? localAddress : null;
} }
@Override @Override

View File

@ -41,7 +41,7 @@ public class DefaultLocalServerChannelFactory implements LocalServerChannelFacto
@Override @Override
public LocalServerChannel newChannel(ChannelPipeline pipeline) { public LocalServerChannel newChannel(ChannelPipeline pipeline) {
return new DefaultLocalServerChannel(this, pipeline, sink); return DefaultLocalServerChannel.create(this, pipeline, sink);
} }
/** /**

View File

@ -128,8 +128,7 @@ final class LocalClientChannelSink extends AbstractChannelSink {
} }
future.setSuccess(); future.setSuccess();
DefaultLocalChannel acceptedChannel = new DefaultLocalChannel( DefaultLocalChannel acceptedChannel = DefaultLocalChannel.create(serverChannel, serverChannel.getFactory(), pipeline, this, channel);
serverChannel, serverChannel.getFactory(), pipeline, this, channel);
channel.pairedChannel = acceptedChannel; channel.pairedChannel = acceptedChannel;
bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL)); bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));

View File

@ -42,13 +42,27 @@ import org.jboss.netty.channel.socket.SocketChannelConfig;
*/ */
class HttpTunnelAcceptedChannel extends AbstractChannel implements class HttpTunnelAcceptedChannel extends AbstractChannel implements
SocketChannel, HttpTunnelAcceptedChannelReceiver { SocketChannel, HttpTunnelAcceptedChannelReceiver {
private final HttpTunnelAcceptedChannelConfig config; private final HttpTunnelAcceptedChannelConfig config;
private final HttpTunnelAcceptedChannelSink sink; private final HttpTunnelAcceptedChannelSink sink;
private final InetSocketAddress remoteAddress; private final InetSocketAddress remoteAddress;
protected HttpTunnelAcceptedChannel(HttpTunnelServerChannel parent, protected static HttpTunnelAcceptedChannel create(
HttpTunnelServerChannel parent, ChannelFactory factory,
ChannelPipeline pipeline, HttpTunnelAcceptedChannelSink sink,
InetSocketAddress remoteAddress,
HttpTunnelAcceptedChannelConfig config) {
HttpTunnelAcceptedChannel instance = new HttpTunnelAcceptedChannel(parent, factory, pipeline, sink,
remoteAddress, config);
fireChannelOpen(instance);
fireChannelBound(instance, instance.getLocalAddress());
fireChannelConnected(instance, instance.getRemoteAddress());
return instance;
}
private HttpTunnelAcceptedChannel(HttpTunnelServerChannel parent,
ChannelFactory factory, ChannelPipeline pipeline, ChannelFactory factory, ChannelPipeline pipeline,
HttpTunnelAcceptedChannelSink sink, HttpTunnelAcceptedChannelSink sink,
InetSocketAddress remoteAddress, InetSocketAddress remoteAddress,
@ -57,9 +71,6 @@ class HttpTunnelAcceptedChannel extends AbstractChannel implements
this.config = config; this.config = config;
this.sink = sink; this.sink = sink;
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
fireChannelOpen(this);
fireChannelBound(this, getLocalAddress());
fireChannelConnected(this, getRemoteAddress());
} }
@Override @Override

View File

@ -73,10 +73,20 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
private final SaturationManager saturationManager; private final SaturationManager saturationManager;
protected static HttpTunnelClientChannel create(ChannelFactory factory,
ChannelPipeline pipeline, HttpTunnelClientChannelSink sink,
ClientSocketChannelFactory outboundFactory,
ChannelGroup realConnections) {
HttpTunnelClientChannel instance = new HttpTunnelClientChannel(factory, pipeline, sink,
outboundFactory, realConnections);
Channels.fireChannelOpen(instance);
return instance;
}
/** /**
* @see HttpTunnelClientChannelFactory#newChannel(ChannelPipeline) * @see HttpTunnelClientChannelFactory#newChannel(ChannelPipeline)
*/ */
protected HttpTunnelClientChannel(ChannelFactory factory, private HttpTunnelClientChannel(ChannelFactory factory,
ChannelPipeline pipeline, HttpTunnelClientChannelSink sink, ChannelPipeline pipeline, HttpTunnelClientChannelSink sink,
ClientSocketChannelFactory outboundFactory, ClientSocketChannelFactory outboundFactory,
ChannelGroup realConnections) { ChannelGroup realConnections) {
@ -97,7 +107,6 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
realConnections.add(sendChannel); realConnections.add(sendChannel);
realConnections.add(pollChannel); realConnections.add(pollChannel);
Channels.fireChannelOpen(this);
} }
@Override @Override

View File

@ -43,8 +43,8 @@ public class HttpTunnelClientChannelFactory implements
@Override @Override
public HttpTunnelClientChannel newChannel(ChannelPipeline pipeline) { public HttpTunnelClientChannel newChannel(ChannelPipeline pipeline) {
return new HttpTunnelClientChannel(this, pipeline, return HttpTunnelClientChannel.create(this, pipeline, new HttpTunnelClientChannelSink(), factory,
new HttpTunnelClientChannelSink(), factory, realConnections); realConnections);
} }
@Override @Override

View File

@ -49,18 +49,25 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
} }
}; };
protected HttpTunnelServerChannel(HttpTunnelServerChannelFactory factory, protected static HttpTunnelServerChannel create(
HttpTunnelServerChannelFactory factory, ChannelPipeline pipeline) {
HttpTunnelServerChannel instance = new HttpTunnelServerChannel(factory, pipeline);
Channels.fireChannelOpen(instance);
return instance;
}
private HttpTunnelServerChannel(HttpTunnelServerChannelFactory factory,
ChannelPipeline pipeline) { ChannelPipeline pipeline) {
super(factory, pipeline, new HttpTunnelServerChannelSink()); super(factory, pipeline, new HttpTunnelServerChannelSink());
messageSwitch = new ServerMessageSwitch(new TunnelCreator()); messageSwitch = new ServerMessageSwitch(new TunnelCreator());
realChannel = factory.createRealChannel(this, messageSwitch); realChannel = factory.createRealChannel(this, messageSwitch);
// TODO fix calling of overrideable getPipeline() from constructor
HttpTunnelServerChannelSink sink = HttpTunnelServerChannelSink sink =
(HttpTunnelServerChannelSink) getPipeline().getSink(); (HttpTunnelServerChannelSink) getPipeline().getSink();
sink.setRealChannel(realChannel); sink.setRealChannel(realChannel);
sink.setCloseListener(CLOSE_FUTURE_PROXY); sink.setCloseListener(CLOSE_FUTURE_PROXY);
config = new HttpTunnelServerChannelConfig(realChannel); config = new HttpTunnelServerChannelConfig(realChannel);
Channels.fireChannelOpen(this);
} }
@Override @Override
@ -114,8 +121,8 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
HttpTunnelAcceptedChannelSink sink = HttpTunnelAcceptedChannelSink sink =
new HttpTunnelAcceptedChannelSink(messageSwitch, new HttpTunnelAcceptedChannelSink(messageSwitch,
newTunnelId, config); newTunnelId, config);
return new HttpTunnelAcceptedChannel(HttpTunnelServerChannel.this, return HttpTunnelAcceptedChannel.create(HttpTunnelServerChannel.this, getFactory(), childPipeline, sink,
getFactory(), childPipeline, sink, remoteAddress, config); remoteAddress, config);
} }
@Override @Override

View File

@ -42,7 +42,7 @@ public class HttpTunnelServerChannelFactory implements
@Override @Override
public HttpTunnelServerChannel newChannel(ChannelPipeline pipeline) { public HttpTunnelServerChannel newChannel(ChannelPipeline pipeline) {
return new HttpTunnelServerChannel(this, pipeline); return HttpTunnelServerChannel.create(this, pipeline);
} }
ServerSocketChannel createRealChannel(HttpTunnelServerChannel channel, ServerSocketChannel createRealChannel(HttpTunnelServerChannel channel,

View File

@ -36,7 +36,17 @@ final class NioAcceptedSocketChannel extends NioSocketChannel {
final Thread bossThread; final Thread bossThread;
NioAcceptedSocketChannel( static NioAcceptedSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker, Thread bossThread) {
NioAcceptedSocketChannel instance = new NioAcceptedSocketChannel(
factory, pipeline, parent, sink, socket, worker, bossThread);
instance.setConnected();
fireChannelOpen(instance);
return instance;
}
private NioAcceptedSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline, ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink, Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker, Thread bossThread) { SocketChannel socket, NioWorker worker, Thread bossThread) {
@ -44,8 +54,5 @@ final class NioAcceptedSocketChannel extends NioSocketChannel {
super(parent, factory, pipeline, sink, socket, worker); super(parent, factory, pipeline, sink, socket, worker);
this.bossThread = bossThread; this.bossThread = bossThread;
setConnected();
fireChannelOpen(this);
} }
} }

View File

@ -76,11 +76,18 @@ final class NioClientSocketChannel extends NioSocketChannel {
// Does not need to be volatile as it's accessed by only one thread. // Does not need to be volatile as it's accessed by only one thread.
long connectDeadlineNanos; long connectDeadlineNanos;
NioClientSocketChannel( static NioClientSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, NioWorker worker) {
NioClientSocketChannel instance =
new NioClientSocketChannel(factory, pipeline, sink, worker);
fireChannelOpen(instance);
return instance;
}
private NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline, ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) { ChannelSink sink, NioWorker worker) {
super(null, factory, pipeline, sink, newSocket(), worker); super(null, factory, pipeline, sink, newSocket(), worker);
fireChannelOpen(this);
} }
} }

View File

@ -137,7 +137,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
@Override @Override
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker()); return NioClientSocketChannel.create(this, pipeline, sink, sink.nextWorker());
} }
@Override @Override

View File

@ -119,15 +119,21 @@ class NioDatagramChannel extends AbstractChannel
private volatile InetSocketAddress localAddress; private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress; volatile InetSocketAddress remoteAddress;
NioDatagramChannel(final ChannelFactory factory, static NioDatagramChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink, NioDatagramWorker worker) {
NioDatagramChannel instance =
new NioDatagramChannel(factory, pipeline, sink, worker);
fireChannelOpen(instance);
return instance;
}
private NioDatagramChannel(final ChannelFactory factory,
final ChannelPipeline pipeline, final ChannelSink sink, final ChannelPipeline pipeline, final ChannelSink sink,
final NioDatagramWorker worker) { final NioDatagramWorker worker) {
super(null, factory, pipeline, sink); super(null, factory, pipeline, sink);
this.worker = worker; this.worker = worker;
datagramChannel = openNonBlockingChannel(); datagramChannel = openNonBlockingChannel();
config = new DefaultNioDatagramChannelConfig(datagramChannel.socket()); config = new DefaultNioDatagramChannelConfig(datagramChannel.socket());
fireChannelOpen(this);
} }
private DatagramChannel openNonBlockingChannel() { private DatagramChannel openNonBlockingChannel() {

View File

@ -125,7 +125,7 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
@Override @Override
public DatagramChannel newChannel(final ChannelPipeline pipeline) { public DatagramChannel newChannel(final ChannelPipeline pipeline) {
return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker()); return NioDatagramChannel.create(this, pipeline, sink, sink.nextWorker());
} }
@Override @Override

View File

@ -53,7 +53,15 @@ class NioServerSocketChannel extends AbstractServerChannel
volatile Selector selector; volatile Selector selector;
private final ServerSocketChannelConfig config; private final ServerSocketChannelConfig config;
NioServerSocketChannel( static NioServerSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
NioServerSocketChannel instance =
new NioServerSocketChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
}
private NioServerSocketChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink) { ChannelSink sink) {
@ -81,8 +89,6 @@ class NioServerSocketChannel extends AbstractServerChannel
} }
config = new DefaultServerSocketChannelConfig(socket.socket()); config = new DefaultServerSocketChannelConfig(socket.socket());
fireChannelOpen(this);
} }
@Override @Override

View File

@ -140,7 +140,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
@Override @Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new NioServerSocketChannel(this, pipeline, sink); return NioServerSocketChannel.create(this, pipeline, sink);
} }
@Override @Override

View File

@ -271,10 +271,8 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
ChannelPipeline pipeline = ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline(); channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker(); NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel( worker.register(NioAcceptedSocketChannel.create(channel.getFactory(), pipeline, channel,
channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null);
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"Failed to initialize an accepted socket.", e); "Failed to initialize an accepted socket.", e);

View File

@ -41,7 +41,17 @@ class OioAcceptedSocketChannel extends OioSocketChannel {
private final PushbackInputStream in; private final PushbackInputStream in;
private final OutputStream out; private final OutputStream out;
OioAcceptedSocketChannel( static OioAcceptedSocketChannel create(Channel parent,
ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink,
Socket socket) {
OioAcceptedSocketChannel instance = new OioAcceptedSocketChannel(
parent, factory, pipeline, sink, socket);
fireChannelOpen(instance);
fireChannelBound(instance, instance.getLocalAddress());
return instance;
}
private OioAcceptedSocketChannel(
Channel parent, Channel parent,
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,

View File

@ -38,14 +38,20 @@ class OioClientSocketChannel extends OioSocketChannel {
volatile PushbackInputStream in; volatile PushbackInputStream in;
volatile OutputStream out; volatile OutputStream out;
OioClientSocketChannel( static OioClientSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
OioClientSocketChannel instance =
new OioClientSocketChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
}
private OioClientSocketChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink) { ChannelSink sink) {
super(null, factory, pipeline, sink, new Socket()); super(null, factory, pipeline, sink, new Socket());
fireChannelOpen(this);
} }
@Override @Override

View File

@ -97,7 +97,7 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory
@Override @Override
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
return new OioClientSocketChannel(this, pipeline, sink); return OioClientSocketChannel.create(this, pipeline, sink);
} }
@Override @Override

View File

@ -53,7 +53,15 @@ final class OioDatagramChannel extends AbstractChannel
private volatile InetSocketAddress localAddress; private volatile InetSocketAddress localAddress;
volatile InetSocketAddress remoteAddress; volatile InetSocketAddress remoteAddress;
OioDatagramChannel( static OioDatagramChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
OioDatagramChannel instance =
new OioDatagramChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
}
private OioDatagramChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink) { ChannelSink sink) {
@ -74,8 +82,6 @@ final class OioDatagramChannel extends AbstractChannel
"Failed to configure the datagram socket timeout.", e); "Failed to configure the datagram socket timeout.", e);
} }
config = new DefaultDatagramChannelConfig(socket); config = new DefaultDatagramChannelConfig(socket);
fireChannelOpen(this);
} }
@Override @Override

View File

@ -96,7 +96,7 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory {
@Override @Override
public DatagramChannel newChannel(ChannelPipeline pipeline) { public DatagramChannel newChannel(ChannelPipeline pipeline) {
return new OioDatagramChannel(this, pipeline, sink); return OioDatagramChannel.create(this, pipeline, sink);
} }
@Override @Override

View File

@ -52,7 +52,15 @@ class OioServerSocketChannel extends AbstractServerChannel
final Lock shutdownLock = new ReentrantLock(); final Lock shutdownLock = new ReentrantLock();
private final ServerSocketChannelConfig config; private final ServerSocketChannelConfig config;
OioServerSocketChannel( static OioServerSocketChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
OioServerSocketChannel instance =
new OioServerSocketChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
}
private OioServerSocketChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink) { ChannelSink sink) {
@ -80,8 +88,6 @@ class OioServerSocketChannel extends AbstractServerChannel
} }
config = new DefaultServerSocketChannelConfig(socket); config = new DefaultServerSocketChannelConfig(socket);
fireChannelOpen(this);
} }
@Override @Override

View File

@ -117,7 +117,7 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory
@Override @Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new OioServerSocketChannel(this, pipeline, sink); return OioServerSocketChannel.create(this, pipeline, sink);
} }
@Override @Override

View File

@ -205,12 +205,8 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
ChannelPipeline pipeline = ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline(); channel.getConfig().getPipelineFactory().getPipeline();
final OioAcceptedSocketChannel acceptedChannel = final OioAcceptedSocketChannel acceptedChannel =
new OioAcceptedSocketChannel( OioAcceptedSocketChannel.create(channel, channel.getFactory(),
channel, pipeline, OioServerSocketPipelineSink.this, acceptedSocket);
channel.getFactory(),
pipeline,
OioServerSocketPipelineSink.this,
acceptedSocket);
DeadLockProofWorker.start( DeadLockProofWorker.start(
workerExecutor, workerExecutor,
new OioWorker(acceptedChannel)); new OioWorker(acceptedChannel));