Resolved NETTY-82 (Provide a convenient way to shut down a service)

* Added ChannelFactoryResource
* Added ChannelFactory.getExternalResource()
* Updated all client examples to use getExternalResource().release()
This commit is contained in:
Trustin Lee 2008-11-26 10:18:29 +00:00
parent 0886c11645
commit 5a4e0e4d47
11 changed files with 185 additions and 17 deletions

View File

@ -58,4 +58,14 @@ public interface ChannelFactory {
* @throws ChannelException if failed to create and open a new channel
*/
Channel newChannel(ChannelPipeline pipeline);
/**
* Returns the external resources that this factory depends on to function.
* Please note that {@link ChannelFactoryResource#release()} can be called
* to release all external resources conveniently when the resources are not
* used by this factory or any other part of your application.
* An unexpected behavior will be resulted in if the returned resources are
* released when there's an open channel which is managed by this factory.
*/
ChannelFactoryResource getExternalResource();
}

View File

@ -0,0 +1,81 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* A {@link ChannelFactoryResource} whose underlying resource is a list of
* {@link Executor}s. {@link #release()} will shut down all specified
* {@link ExecutorService}s immediately and wait for their termination.
* An {@link Executor} which is not an {@link ExecutorService} will be ignored.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class ChannelFactoryExecutorResource implements
ChannelFactoryResource {
private final Executor[] executors;
/**
* Creates a new instance with the specified executors.
*
* @param executors a list of {@link Executor}s
*/
public ChannelFactoryExecutorResource(Executor... executors) {
if (executors == null) {
throw new NullPointerException("executors");
}
this.executors = new Executor[executors.length];
for (int i = 0; i < executors.length; i ++) {
if (executors[i] == null) {
throw new NullPointerException("executors[" + i + "]");
}
this.executors[i] = executors[i];
}
}
public void release() {
for (Executor e: executors) {
if (!(e instanceof ExecutorService)) {
continue;
}
ExecutorService es = (ExecutorService) e;
for (;;) {
es.shutdownNow();
try {
if (es.awaitTermination(1, TimeUnit.SECONDS)) {
break;
}
} catch (InterruptedException ex) {
// Ignore.
}
}
}
}
}

View File

@ -0,0 +1,45 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel;
import java.util.concurrent.ExecutorService;
/**
* The abstract representation of external resources that a
* {@link ChannelFactory} depends on. A user can release the external
* resources such as {@link ExecutorService}s conveniently.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev$, $Date$
*
* @apiviz.exclude
*/
public interface ChannelFactoryResource {
/**
* Releases this resource.
*/
void release();
}

View File

@ -30,8 +30,10 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactoryResource;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelFactoryExecutorResource;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
@ -95,6 +97,7 @@ import org.jboss.netty.channel.socket.SocketChannel;
*/
public class NioClientSocketChannelFactory implements ClientSocketChannelFactory {
private final ChannelFactoryResource externalResource;
private final ChannelSink sink;
/**
@ -137,10 +140,16 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
externalResource = new ChannelFactoryExecutorResource(bossExecutor, workerExecutor);
sink = new NioClientSocketPipelineSink(bossExecutor, workerExecutor, workerCount);
}
public SocketChannel newChannel(ChannelPipeline pipeline) {
return new NioClientSocketChannel(this, pipeline, sink);
}
public ChannelFactoryResource getExternalResource() {
return externalResource;
}
}

View File

@ -30,8 +30,10 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactoryResource;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelFactoryExecutorResource;
import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
@ -99,6 +101,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor;
private final ChannelFactoryResource externalResource;
private final ChannelSink sink;
/**
@ -142,6 +145,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
"must be a positive integer.");
}
this.bossExecutor = bossExecutor;
externalResource = new ChannelFactoryExecutorResource(bossExecutor, workerExecutor);
sink = new NioServerSocketPipelineSink(workerExecutor, workerCount);
}
@ -149,4 +153,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
return new NioServerSocketChannel(this, pipeline, sink);
}
public ChannelFactoryResource getExternalResource() {
return externalResource;
}
}

View File

@ -28,7 +28,9 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactoryResource;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelFactoryExecutorResource;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
@ -87,6 +89,7 @@ import org.jboss.netty.channel.socket.SocketChannel;
*/
public class OioClientSocketChannelFactory implements ClientSocketChannelFactory {
private final ChannelFactoryResource externalResource;
final OioClientSocketPipelineSink sink;
/**
@ -99,10 +102,15 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
externalResource = new ChannelFactoryExecutorResource(workerExecutor);
sink = new OioClientSocketPipelineSink(workerExecutor);
}
public SocketChannel newChannel(ChannelPipeline pipeline) {
return new OioClientSocketChannel(this, pipeline, sink);
}
public ChannelFactoryResource getExternalResource() {
return externalResource;
}
}

View File

@ -28,8 +28,10 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactoryResource;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelFactoryExecutorResource;
import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
@ -100,6 +102,7 @@ import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
public class OioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor;
private final ChannelFactoryResource externalResource;
private final ChannelSink sink;
/**
@ -119,10 +122,15 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory
throw new NullPointerException("workerExecutor");
}
this.bossExecutor = bossExecutor;
externalResource = new ChannelFactoryExecutorResource(bossExecutor, workerExecutor);
sink = new OioServerSocketPipelineSink(workerExecutor);
}
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new OioServerSocketChannel(this, pipeline, sink);
}
public ChannelFactoryResource getExternalResource() {
return externalResource;
}
}

View File

@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@ -75,17 +76,17 @@ public class FactorialClient {
bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is made successfully.
Channel channel = connectFuture.awaitUninterruptibly().getChannel();
// Get the handler instance to retrieve the answer.
FactorialClientHandler handler = (FactorialClientHandler)
connectFuture.await().getChannel().getPipeline().getLast();
FactorialClientHandler handler =
(FactorialClientHandler) channel.getPipeline().getLast();
// Print out the answer.
System.out.format(
"Factorial of %,d is: %,d", count, handler.getFactorial());
// We should shut down all thread pools here to exit normally.
// However, it is just fine to call System.exit(0) because we are
// finished with the business.
System.exit(0);
// Shut down all thread pools to exit.
factory.getExternalResource().release();
}
}

View File

@ -80,7 +80,8 @@ public class HttpClient {
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
System.exit(0);
factory.getExternalResource().release();
return;
}
// Send the HTTP request.

View File

@ -78,7 +78,8 @@ public class SecureChatClient {
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
System.exit(0);
factory.getExternalResource().release();
return;
}
// Read commands from the stdin.
@ -101,9 +102,7 @@ public class SecureChatClient {
// all I/O operations are asynchronous in Netty.
channel.close().awaitUninterruptibly();
// We should shut down all thread pools here to exit normally.
// However, it is just fine to call System.exit(0) because we are
// finished with the business.
System.exit(0);
// Shut down all thread pools to exit.
factory.getExternalResource().release();
}
}

View File

@ -76,7 +76,8 @@ public class TelnetClient {
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
System.exit(0);
factory.getExternalResource();
return;
}
// Read commands from the stdin.
@ -101,9 +102,7 @@ public class TelnetClient {
// all I/O operations are asynchronous in Netty.
channel.close().awaitUninterruptibly();
// We should shut down all thread pools here to exit normally.
// However, it is just fine to call System.exit(0) because we are
// finished with the business.
System.exit(0);
// Shut down all thread pools to exit.
factory.getExternalResource().release();
}
}