added TunnelAddress

This commit is contained in:
Andy Taylor 2009-02-16 11:40:37 +00:00
parent 2b7427de4f
commit 9e60db4078
5 changed files with 79 additions and 23 deletions

View File

@ -0,0 +1,40 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.http;
import java.net.SocketAddress;
import java.net.URI;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class HttpTunnelAddress extends SocketAddress {
private final URI uri;
public HttpTunnelAddress(URI uri) {
this.uri = uri;
}
public URI getUri() {
return uri;
}
}

View File

@ -47,6 +47,7 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URL; import java.net.URL;
import java.net.URI;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -72,8 +73,6 @@ class HttpTunnelClientSocketChannel extends AbstractChannel
LinkedTransferQueue<byte[]> messages = new LinkedTransferQueue<byte[]>(); LinkedTransferQueue<byte[]> messages = new LinkedTransferQueue<byte[]>();
private final URL url;
private ClientSocketChannelFactory clientSocketChannelFactory; private ClientSocketChannelFactory clientSocketChannelFactory;
private SocketChannel channel; private SocketChannel channel;
@ -82,13 +81,14 @@ class HttpTunnelClientSocketChannel extends AbstractChannel
private HttpTunnelClientSocketChannel.ServletChannelHandler servletHandler = new ServletChannelHandler(); private HttpTunnelClientSocketChannel.ServletChannelHandler servletHandler = new ServletChannelHandler();
private HttpTunnelAddress remoteAddress;
HttpTunnelClientSocketChannel( HttpTunnelClientSocketChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink, URL url, ClientSocketChannelFactory clientSocketChannelFactory) { ChannelSink sink, ClientSocketChannelFactory clientSocketChannelFactory) {
super(null, factory, pipeline, sink); super(null, factory, pipeline, sink);
this.url = url;
this.clientSocketChannelFactory = clientSocketChannelFactory; this.clientSocketChannelFactory = clientSocketChannelFactory;
DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline(); DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
@ -139,16 +139,19 @@ class HttpTunnelClientSocketChannel extends AbstractChannel
} }
} }
void connectAndSendHeaders(boolean reconnect, SocketAddress remoteAddress) throws IOException { void connectAndSendHeaders(boolean reconnect, HttpTunnelAddress remoteAddress) throws IOException {
this.remoteAddress = remoteAddress;
URI url = remoteAddress.getUri();
if (reconnect) { if (reconnect) {
DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline(); DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
channelPipeline.addLast("DelimiterBasedFrameDecoder", handler); channelPipeline.addLast("DelimiterBasedFrameDecoder", handler);
channelPipeline.addLast("servletHandler", servletHandler); channelPipeline.addLast("servletHandler", servletHandler);
channel = clientSocketChannelFactory.newChannel(channelPipeline); channel = clientSocketChannelFactory.newChannel(channelPipeline);
} }
channel.connect(remoteAddress); SocketAddress connectAddress = new InetSocketAddress(url.getHost(), url.getPort());
channel.connect(connectAddress);
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("POST ").append(url.toExternalForm()).append(" HTTP/1.1").append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR). builder.append("POST ").append(url.getRawPath()).append(" HTTP/1.1").append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).
append("HOST: ").append(url.getHost()).append(":").append(url.getPort()).append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR). append("HOST: ").append(url.getHost()).append(":").append(url.getPort()).append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).
append("Content-Type: application/octet-stream").append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).append("Transfer-Encoding: chunked"). append("Content-Type: application/octet-stream").append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).append("Transfer-Encoding: chunked").
append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).append("Content-Transfer-Encoding: Binary").append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).append("Connection: Keep-Alive"). append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).append("Content-Transfer-Encoding: Binary").append(HttpTunnelClientSocketPipelineSink.LINE_TERMINATOR).append("Connection: Keep-Alive").
@ -193,7 +196,7 @@ class HttpTunnelClientSocketChannel extends AbstractChannel
try { try {
awaitingInitialResponse = true; awaitingInitialResponse = true;
connectAndSendHeaders(true, channel.getRemoteAddress()); connectAndSendHeaders(true, remoteAddress);
} }
finally { finally {
reconnectLock.unlock(); reconnectLock.unlock();

View File

@ -40,15 +40,14 @@ public class HttpTunnelClientSocketChannelFactory implements ClientSocketChannel
private final Executor workerExecutor; private final Executor workerExecutor;
private final ChannelSink sink; private final ChannelSink sink;
private final URL url;
ClientSocketChannelFactory clientSocketChannelFactory; ClientSocketChannelFactory clientSocketChannelFactory;
/** /**
* *
* @param workerExecutor * @param workerExecutor
*/ */
public HttpTunnelClientSocketChannelFactory(ClientSocketChannelFactory clientSocketChannelFactory, Executor workerExecutor, URL url) { public HttpTunnelClientSocketChannelFactory(ClientSocketChannelFactory clientSocketChannelFactory, Executor workerExecutor) {
this(url, workerExecutor, Runtime.getRuntime().availableProcessors()); this(workerExecutor, Runtime.getRuntime().availableProcessors());
this.clientSocketChannelFactory = clientSocketChannelFactory; this.clientSocketChannelFactory = clientSocketChannelFactory;
} }
@ -56,18 +55,13 @@ public class HttpTunnelClientSocketChannelFactory implements ClientSocketChannel
* Creates a new instance. * Creates a new instance.
* *
* the {@link java.util.concurrent.Executor} which will execute the boss thread * the {@link java.util.concurrent.Executor} which will execute the boss thread
* @param url
* @param workerExecutor * @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads * the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
* @param workerCount * @param workerCount
*/ */
public HttpTunnelClientSocketChannelFactory( public HttpTunnelClientSocketChannelFactory(
URL url, Executor workerExecutor, Executor workerExecutor,
int workerCount) { int workerCount) {
if (url == null) {
throw new NullPointerException("Url is null");
}
this.url = url;
if (workerExecutor == null) { if (workerExecutor == null) {
throw new NullPointerException("workerExecutor"); throw new NullPointerException("workerExecutor");
} }
@ -82,7 +76,7 @@ public class HttpTunnelClientSocketChannelFactory implements ClientSocketChannel
} }
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
return new HttpTunnelClientSocketChannel(this, pipeline, sink, url, clientSocketChannelFactory); return new HttpTunnelClientSocketChannel(this, pipeline, sink, clientSocketChannelFactory);
} }
public void releaseExternalResources() { public void releaseExternalResources() {

View File

@ -74,7 +74,7 @@ class HttpTunnelClientSocketPipelineSink extends AbstractChannelSink {
break; break;
case CONNECTED: case CONNECTED:
if (value != null) { if (value != null) {
connect(channel, future, (SocketAddress) value); connect(channel, future, (HttpTunnelAddress) value);
} else { } else {
HttpTunnelWorker.close(channel, future); HttpTunnelWorker.close(channel, future);
} }
@ -105,7 +105,7 @@ class HttpTunnelClientSocketPipelineSink extends AbstractChannelSink {
private void connect( private void connect(
HttpTunnelClientSocketChannel channel, ChannelFuture future, HttpTunnelClientSocketChannel channel, ChannelFuture future,
SocketAddress remoteAddress) { HttpTunnelAddress remoteAddress) {
boolean bound = channel.isBound(); boolean bound = channel.isBound();
boolean connected = false; boolean connected = false;

View File

@ -25,6 +25,7 @@ import java.io.BufferedReader;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URL; import java.net.URL;
import java.net.URI;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
@ -33,6 +34,7 @@ import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.socket.http.HttpTunnelClientSocketChannelFactory; import org.jboss.netty.channel.socket.http.HttpTunnelClientSocketChannelFactory;
import org.jboss.netty.channel.socket.http.HttpTunnelAddress;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory; import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringDecoder;
@ -86,13 +88,30 @@ import org.jboss.netty.handler.codec.string.StringEncoder;
*/ */
public class HttpTunnelClientExample { public class HttpTunnelClientExample {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
URL url = new URL("http", "localhost", 8080, "/netty/nettyServlet"); if (args.length != 1) {
HttpTunnelClientSocketChannelFactory factory = new HttpTunnelClientSocketChannelFactory(new OioClientSocketChannelFactory(Executors.newCachedThreadPool()), Executors.newCachedThreadPool(), url); System.err.println(
"Usage: " + HttpClient.class.getSimpleName() +
" <URL>");
return;
}
URI uri = new URI(args[0]);
String scheme = uri.getScheme() == null? "http" : uri.getScheme();
String host = uri.getHost() == null? "localhost" : uri.getHost();
int port = uri.getPort() == -1? 80 : uri.getPort();
if (!scheme.equals("http")) {
// We can actually support HTTPS fairly easily by inserting
// an SslHandler to the pipeline - left as an exercise.
System.err.println("Only HTTP is supported.");
return;
}
HttpTunnelClientSocketChannelFactory factory = new HttpTunnelClientSocketChannelFactory(new OioClientSocketChannelFactory(Executors.newCachedThreadPool()), Executors.newCachedThreadPool());
ClientBootstrap bootstrap = new ClientBootstrap(factory); ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.getPipeline().addLast("decoder", new StringDecoder()); bootstrap.getPipeline().addLast("decoder", new StringDecoder());
bootstrap.getPipeline().addLast("encoder", new StringEncoder()); bootstrap.getPipeline().addLast("encoder", new StringEncoder());
bootstrap.getPipeline().addLast("handler", new PrintHandler()); bootstrap.getPipeline().addLast("handler", new PrintHandler());
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8080)); ChannelFuture channelFuture = bootstrap.connect(new HttpTunnelAddress(uri));
channelFuture.awaitUninterruptibly(); channelFuture.awaitUninterruptibly();
System.out.println("Enter text (quit to end)"); System.out.println("Enter text (quit to end)");
// Read commands from the stdin. // Read commands from the stdin.