* Revamped the HTTP tunneling transport

** One HTTP request/response corresponds to one socket connection now
** No more reconnection
** HTTP server should not disconnect the connection or close the request or response prematurely
** Added related documentation and updated the example
This commit is contained in:
Trustin Lee 2009-07-14 10:31:22 +00:00
parent f9c2f66f19
commit 25d5023267
11 changed files with 480 additions and 872 deletions

View File

@ -1,82 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.http;
import java.net.SocketAddress;
import java.net.URI;
/**
* The URI of {@link HttpTunnelingServlet} where
* {@link HttpTunnelingClientSocketChannelFactory} connects to.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Andy Taylor (andy.taylor@jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class HttpTunnelAddress extends SocketAddress implements Comparable<HttpTunnelAddress> {
private static final long serialVersionUID = -7933609652910855887L;
private final URI uri;
/**
* Creates a new instance with the specified URI.
*/
public HttpTunnelAddress(URI uri) {
if (uri == null) {
throw new NullPointerException("uri");
}
this.uri = uri;
}
/**
* Returns the {@link URI} where {@link HttpTunnelingServlet} is bound.
*/
public URI getUri() {
return uri;
}
@Override
public int hashCode() {
return uri.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof HttpTunnelAddress)) {
return false;
}
return getUri().equals(((HttpTunnelAddress) o).getUri());
}
public int compareTo(HttpTunnelAddress o) {
return getUri().compareTo(o.getUri());
}
@Override
public String toString() {
return "htun:" + getUri();
}
}

View File

@ -1,212 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.http;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpSession;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* A {@link ChannelHandler} that proxies received messages to the
* {@link OutputStream} of the {@link HttpTunnelingServlet}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Andy Taylor (andy.taylor@jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
@ChannelPipelineCoverage("one")
class HttpTunnelingChannelHandler extends SimpleChannelUpstreamHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpTunnelingChannelHandler.class);
private final List<MessageEvent> awaitingEvents = new ArrayList<MessageEvent>();
private final Lock reconnectLock = new ReentrantLock();
private final Condition reconnectCondition = reconnectLock.newCondition();
private final long reconnectTimeoutMillis;
private volatile boolean connected = false;
private final AtomicBoolean invalidated = new AtomicBoolean(false);
private volatile ServletOutputStream outputStream;
private final boolean stream;
private final HttpSession session;
public HttpTunnelingChannelHandler(
boolean stream, HttpSession session, long reconnectTimeoutMillis) {
this.stream = stream;
this.session = session;
this.reconnectTimeoutMillis = reconnectTimeoutMillis;
}
@Override
public synchronized void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
if (stream) {
boolean success = false;
Exception cause = null;
byte[] b = null;
reconnectLock.lock();
try {
if (outputStream == null) {
awaitingEvents.add(e);
return;
}
b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
outputStream.write(b);
outputStream.flush();
success = true;
} catch (Exception ex) {
success = false;
cause = ex;
// If the inbound connection was closed before
// forwarding the received message, wait for
// a while (reconnectTimeoutMillis) so that the
// client can recover from the disconnection.
if (awaitReconnect()) {
// Write again if the client reconnected.
// XXX: What if failed consecutively?
// We need a loop instead of nested exception handling.
try {
outputStream.write(b);
outputStream.flush();
success = true;
} catch (Exception ex2) {
success = false;
cause = ex2;
}
} else {
// Client did not reconnect within the reconnect timeout.
// Close the outbound connection.
invalidateHttpSession();
e.getChannel().close();
}
} finally {
reconnectLock.unlock();
if (!success) {
assert cause != null;
throw cause;
}
}
} else {
awaitingEvents.add(e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
logger.warn("Unexpected exception while HTTP tunneling", e.getCause());
invalidateHttpSession();
e.getChannel().close();
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
invalidateHttpSession();
}
private void invalidateHttpSession() {
if (invalidated.compareAndSet(false, true)) {
try {
session.invalidate();
} catch (Exception e) {
// Gulp - https://jira.jboss.org/jira/browse/JBWEB-139
logger.debug(
"Unexpected exception raised by the Servlet container; " +
"ignoring.", e);
}
}
}
synchronized List<MessageEvent> getAwaitingEvents() {
List<MessageEvent> list = new ArrayList<MessageEvent>();
list.addAll(awaitingEvents);
awaitingEvents.clear();
return list;
}
void setOutputStream(ServletOutputStream outputStream) throws IOException {
reconnectLock.lock();
try {
this.outputStream = outputStream;
connected = true;
for (MessageEvent awaitingEvent : awaitingEvents) {
ChannelBuffer buffer = (ChannelBuffer) awaitingEvent.getMessage();
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
outputStream.write(b);
outputStream.flush();
}
reconnectCondition.signalAll();
}
finally {
reconnectLock.unlock();
}
}
boolean isStreaming() {
return stream;
}
boolean awaitReconnect() {
reconnectLock.lock();
try {
connected = false;
reconnectCondition.await(reconnectTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// return with current state.
} finally {
reconnectLock.unlock();
}
return connected;
}
}

View File

@ -26,14 +26,16 @@ import static org.jboss.netty.channel.Channels.*;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.NotYetConnectedException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
@ -48,9 +50,6 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.handler.codec.http.Cookie;
import org.jboss.netty.handler.codec.http.CookieDecoder;
import org.jboss.netty.handler.codec.http.CookieEncoder;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpChunk;
@ -60,10 +59,9 @@ import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
@ -74,32 +72,16 @@ import org.jboss.netty.logging.InternalLoggerFactory;
class HttpTunnelingClientSocketChannel extends AbstractChannel
implements org.jboss.netty.channel.socket.SocketChannel {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(HttpTunnelingClientSocketChannel.class);
private static final String JSESSIONID = "JSESSIONID";
final HttpTunnelingSocketChannelConfig config;
private final HttpTunnelingSocketChannelConfig config;
volatile boolean requestHeaderWritten;
volatile boolean awaitingInitialResponse = true;
private final Object writeLock = new Object();
final Object interestOpsLock = new Object();
volatile Thread workerThread;
volatile String sessionId;
volatile boolean closed = false;
private final ClientSocketChannelFactory clientSocketChannelFactory;
volatile SocketChannel channel;
final SocketChannel realChannel;
private final HttpTunnelingClientSocketChannel.ServletChannelHandler handler = new ServletChannelHandler();
volatile HttpTunnelAddress remoteAddress;
HttpTunnelingClientSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
@ -107,10 +89,13 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
super(null, factory, pipeline, sink);
this.clientSocketChannelFactory = clientSocketChannelFactory;
createSocketChannel();
config = new HttpTunnelingSocketChannelConfig(this);
DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
channelPipeline.addLast("decoder", new HttpResponseDecoder());
channelPipeline.addLast("encoder", new HttpRequestEncoder());
channelPipeline.addLast("handler", handler);
realChannel = clientSocketChannelFactory.newChannel(channelPipeline);
fireChannelOpen(this);
}
@ -119,51 +104,29 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
}
public InetSocketAddress getLocalAddress() {
// FIXME: NPE - Cache to avoid
return channel.getLocalAddress();
return realChannel.getLocalAddress();
}
public InetSocketAddress getRemoteAddress() {
// FIXME: NPE - Cache to avoid
return channel.getRemoteAddress();
return realChannel.getRemoteAddress();
}
public boolean isBound() {
// FIXME: Should not return false during reconnection.
return channel.isBound();
return realChannel.isBound();
}
public boolean isConnected() {
// FIXME: Should not return false during reconnection.
return channel.isConnected();
return realChannel.isConnected();
}
@Override
public int getInterestOps() {
return channel.getInterestOps();
return realChannel.getInterestOps();
}
@Override
public boolean isWritable() {
return channel.isWritable();
}
@Override
public ChannelFuture setInterestOps(int interestOps) {
final ChannelFuture future = future(this);
channel.setInterestOps(interestOps).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
if (f.isSuccess()) {
future.setSuccess();
fireChannelInterestChanged(HttpTunnelingClientSocketChannel.this);
} else {
future.setFailure(f.getCause());
fireExceptionCaught(HttpTunnelingClientSocketChannel.this, f.getCause());
}
}
});
return future;
return realChannel.isWritable();
}
@Override
@ -181,29 +144,38 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
}
}
void connectAndSendHeaders(boolean reconnect, final HttpTunnelAddress remoteAddress, final ChannelFuture future) {
this.remoteAddress = remoteAddress;
final URI url = remoteAddress.getUri();
if (reconnect) {
closeSocket();
createSocketChannel();
}
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
SocketAddress connectAddress = new InetSocketAddress(url.getHost(), url.getPort());
// FIXME: bindAddress not respected.
channel.connect(connectAddress).addListener(new ChannelFutureListener() {
void bindReal(final SocketAddress localAddress, final ChannelFuture future) {
realChannel.bind(localAddress).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
future.setSuccess();
} else {
future.setFailure(f.getCause());
}
}
});
}
void connectReal(final SocketAddress remoteAddress, final ChannelFuture future) {
final SocketChannel virtualChannel = this;
realChannel.connect(remoteAddress).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
final String serverName = config.getServerName();
final int serverPort = ((InetSocketAddress) remoteAddress).getPort();
final String serverPath = config.getServerPath();
if (f.isSuccess()) {
// Configure SSL
HttpTunnelingSocketChannelConfig config = getConfig();
SSLContext sslContext = config.getSslContext();
ChannelFuture sslHandshakeFuture = null;
if (sslContext != null) {
URI uri = remoteAddress.getUri();
SSLEngine engine = sslContext.createSSLEngine(
uri.getHost(), uri.getPort());
// Create a new SSLEngine from the specified SSLContext.
SSLEngine engine;
if (serverName != null) {
engine = sslContext.createSSLEngine(serverName, serverPort);
} else {
engine = sslContext.createSSLEngine();
}
// Configure the SSLEngine.
engine.setUseClientMode(true);
@ -218,139 +190,166 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
}
SslHandler sslHandler = new SslHandler(engine);
channel.getPipeline().addFirst("ssl", sslHandler);
realChannel.getPipeline().addFirst("ssl", sslHandler);
try {
sslHandshakeFuture = sslHandler.handshake(channel);
sslHandshakeFuture = sslHandler.handshake(realChannel);
} catch (SSLException e) {
future.setFailure(e);
fireExceptionCaught(channel, e);
fireExceptionCaught(virtualChannel, e);
return;
}
}
// Send the HTTP request.
final HttpRequest req = new DefaultHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, url.getRawPath());
req.setHeader(HttpHeaders.Names.HOST, url.getHost());
HttpVersion.HTTP_1_1, HttpMethod.POST, serverPath);
if (serverName != null) {
req.setHeader(HttpHeaders.Names.HOST, serverName);
}
req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
if (sessionId != null) {
CookieEncoder ce = new CookieEncoder(false);
ce.addCookie(JSESSIONID, sessionId);
String cookie = ce.encode();
//System.out.println("COOKIE: " + cookie);
req.setHeader(HttpHeaders.Names.COOKIE, cookie);
}
req.setHeader(HttpHeaders.Names.USER_AGENT, HttpTunnelingClientSocketChannel.class.getName());
if (sslHandshakeFuture == null) {
channel.write(req);
realChannel.write(req);
requestHeaderWritten = true;
future.setSuccess();
if (!channel.isBound()) {
fireChannelBound(HttpTunnelingClientSocketChannel.this, channel.getLocalAddress());
}
fireChannelConnected(HttpTunnelingClientSocketChannel.this, channel.getRemoteAddress());
fireChannelConnected(virtualChannel, remoteAddress);
} else {
sslHandshakeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(
ChannelFuture f)
throws Exception {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
channel.write(req);
future.setSuccess();
if (!isBound()) {
// FIXME: channelBound is not fired - needs own state flag
fireChannelBound(HttpTunnelingClientSocketChannel.this, channel.getLocalAddress());
}
fireChannelConnected(HttpTunnelingClientSocketChannel.this, channel.getRemoteAddress());
fireChannelConnected(virtualChannel, remoteAddress);
} else {
future.setFailure(f.getCause());
fireExceptionCaught(HttpTunnelingClientSocketChannel.this, f.getCause());
fireExceptionCaught(virtualChannel, f.getCause());
}
}
});
}
} else {
future.setFailure(f.getCause());
fireExceptionCaught(channel, f.getCause());
fireExceptionCaught(virtualChannel, f.getCause());
}
}
});
}
private void createSocketChannel() {
DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
// TODO Expose the codec options via HttpTunnelingSocketChannelConfig
channelPipeline.addLast("decoder", new HttpResponseDecoder());
channelPipeline.addLast("encoder", new HttpRequestEncoder());
channelPipeline.addLast("handler", handler);
channel = clientSocketChannelFactory.newChannel(channelPipeline);
}
void writeReal(final ChannelBuffer a, final ChannelFuture future) {
if (!requestHeaderWritten) {
throw new NotYetConnectedException();
}
void sendChunk(ChannelBuffer a, final ChannelFuture future) {
// XXX: Investigate race condition during reconnection
final int size = a.readableBytes();
channel.write(new DefaultHttpChunk(a)).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
final ChannelFuture f;
if (size == 0) {
f = realChannel.write(ChannelBuffers.EMPTY_BUFFER);
} else {
f = realChannel.write(new DefaultHttpChunk(a));
}
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
future.setSuccess();
fireWriteComplete(HttpTunnelingClientSocketChannel.this, size);
} else {
future.setFailure(f.getCause());
fireExceptionCaught(HttpTunnelingClientSocketChannel.this, f.getCause());
}
}
});
}
void closeSocket() {
if (setClosed()) {
// Send the end of chunk.
// XXX: Investigate race condition during reconnection
synchronized (writeLock) {
channel.write(HttpChunk.LAST_CHUNK).addListener(ChannelFutureListener.CLOSE);
}
closed = true;
private ChannelFuture writeLastChunk() {
if (!requestHeaderWritten) {
throw new NotYetConnectedException();
} else {
return realChannel.write(HttpChunk.LAST_CHUNK);
}
}
void bindSocket(SocketAddress localAddress) {
channel.bind(localAddress);
void setInterestOpsReal(final int interestOps, final ChannelFuture future) {
realChannel.setInterestOps(interestOps).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
future.setSuccess();
} else {
future.setFailure(f.getCause());
}
}
});
}
void disconnectReal(final ChannelFuture future) {
writeLastChunk().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
realChannel.disconnect().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
future.setSuccess();
} else {
future.setFailure(f.getCause());
}
}
});
}
});
}
void unbindReal(final ChannelFuture future) {
writeLastChunk().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
realChannel.unbind().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
future.setSuccess();
} else {
future.setFailure(f.getCause());
}
}
});
}
});
}
void closeReal(final ChannelFuture future) {
writeLastChunk().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
realChannel.close().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
future.setSuccess();
} else {
future.setFailure(f.getCause());
}
}
});
}
});
}
@ChannelPipelineCoverage("one")
class ServletChannelHandler extends SimpleChannelUpstreamHandler {
final class ServletChannelHandler extends SimpleChannelUpstreamHandler {
private volatile boolean readingChunks;
final SocketChannel virtualChannel = HttpTunnelingClientSocketChannel.this;
@Override
public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
fireChannelBound(virtualChannel, (SocketAddress) e.getValue());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (!readingChunks) {
HttpResponse res = (HttpResponse) e.getMessage();
String newSessionId = null;
newSessionId = getSessionId(res, HttpHeaders.Names.SET_COOKIE);
if (newSessionId == null) {
newSessionId = getSessionId(res, HttpHeaders.Names.SET_COOKIE2);
if (res.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
throw new ChannelException("Unexpected HTTP response status: " + res.getStatus());
}
//System.out.println("NEW_SESSION_ID: " + newSessionId);
// XXX: Utilize keep-alive if possible to reduce reconnection overhead.
// XXX: Consider non-200 status code.
// If the status code is not 200, no more reconnection attempt
// should be made.
// If the session ID has been changed, it means the session has
// been timed out and a new session has been created. If so,
// channel must be closed.
if (sessionId != null && !sessionId.equals(newSessionId)) {
closeSocket();
return;
}
sessionId = newSessionId;
if (res.isChunked()) {
readingChunks = true;
} else {
@ -358,6 +357,8 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
if (content.readable()) {
fireMessageReceived(HttpTunnelingClientSocketChannel.this, content);
}
// Reached to the end of response - close the request.
closeReal(succeededFuture(virtualChannel));
}
} else {
HttpChunk chunk = (HttpChunk) e.getMessage();
@ -365,46 +366,40 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent());
} else {
readingChunks = false;
// Reached to the end of response - close the request.
closeReal(succeededFuture(virtualChannel));
}
}
}
@Override
public void channelInterestChanged(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
fireChannelInterestChanged(virtualChannel);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
fireChannelDisconnected(virtualChannel);
}
@Override
public void channelUnbound(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
fireChannelUnbound(virtualChannel);
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
if (sessionId != null) {
// TODO Reconnect.
} else {
// sessionId is null if:
// 1) A user closed the channel explicitly, or
// 2) The server does not support JSESSIONID.
channel.close();
}
fireChannelClosed(virtualChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
fireExceptionCaught(
HttpTunnelingClientSocketChannel.this,
e.getCause());
channel.close();
fireExceptionCaught(virtualChannel, e.getCause());
realChannel.close();
}
}
static String getSessionId(HttpResponse res, String headerName) {
CookieDecoder decoder = null;
for (String v: res.getHeaders(headerName)) {
if (decoder == null) {
decoder = new CookieDecoder();
}
for (Cookie c: decoder.decode(v)) {
if (c.getName().equals(JSESSIONID)) {
return c.getValue();
}
}
}
return null;
}
}

View File

@ -22,15 +22,12 @@
*/
package org.jboss.netty.channel.socket.http;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
@ -44,8 +41,6 @@ import org.jboss.netty.channel.MessageEvent;
*/
final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
static final String LINE_TERMINATOR = "\r\n";
HttpTunnelingClientSocketPipelineSink() {
super();
}
@ -61,78 +56,29 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
channel.closeReal(future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
channel.bindReal((SocketAddress) value, future);
} else {
close(channel, future);
channel.unbindReal(future);
}
break;
case CONNECTED:
if (value != null) {
channel.connectAndSendHeaders(false, ((HttpTunnelAddress) value), future);
channel.connectReal((SocketAddress) value, future);
} else {
close(channel, future);
channel.closeReal(future);
}
break;
case INTEREST_OPS:
final ChannelFuture actualFuture = future;
setInterestOps(channel.channel, ((Integer) value).intValue()).addListener(
new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
actualFuture.setSuccess();
} else {
actualFuture.setFailure(future.getCause());
}
}
});
channel.setInterestOpsReal(((Integer) value).intValue(), future);
break;
}
} else if (e instanceof MessageEvent) {
channel.sendChunk(((ChannelBuffer) ((MessageEvent) e).getMessage()), future);
}
}
private void bind(
HttpTunnelingClientSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
channel.bindSocket(localAddress);
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void close(
HttpTunnelingClientSocketChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.closeSocket();
if (channel.setClosed()) {
future.setSuccess();
if (connected) {
fireChannelDisconnected(channel);
}
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
}
catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
channel.writeReal(((ChannelBuffer) ((MessageEvent) e).getMessage()), future);
}
}
}

View File

@ -1,71 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.http;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
import org.jboss.netty.channel.local.LocalClientChannelFactory;
/**
* A {@link ServletContextListener} that creates a {@link ClientBootstrap}
* using a {@link LocalClientChannelFactory}. The factory should be registered
* before this context is loaded.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Andy Taylor (andy.taylor@jboss.org)
* @version $Rev$, $Date$
*/
public class HttpTunnelingContextListener implements ServletContextListener {
private static final long DEFAULT_RECONNECT_TIMEOUT = 5000;
private static final boolean DEFAULT_IS_STREAMING = true;
static final String SERVER_CHANNEL_PROP = "serverChannelName";
static final String RECONNECT_PROP = "reconnectTimeout";
static final String STREAMING_PROP = "streaming";
static final String BOOTSTRAP_PROP = "bootstrap";
private final ChannelFactory factory = new DefaultLocalClientChannelFactory();
public void contextInitialized(ServletContextEvent context) {
context.getServletContext().setAttribute(BOOTSTRAP_PROP, new ClientBootstrap(factory));
String timeoutParam = context.getServletContext().getInitParameter(RECONNECT_PROP);
context.getServletContext().setAttribute(RECONNECT_PROP, timeoutParam == null?DEFAULT_RECONNECT_TIMEOUT:Long.decode(timeoutParam.trim()));
String streaming = context.getServletContext().getInitParameter(STREAMING_PROP);
context.getServletContext().setAttribute(STREAMING_PROP, streaming == null?DEFAULT_IS_STREAMING: Boolean.valueOf(streaming.trim()));
String serverChannel = context.getServletContext().getInitParameter(SERVER_CHANNEL_PROP);
context.getServletContext().setAttribute(SERVER_CHANNEL_PROP, serverChannel);
}
public void contextDestroyed(ServletContextEvent context) {
// Unused
}
}

View File

@ -23,20 +23,39 @@
package org.jboss.netty.channel.socket.http;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.List;
import java.net.SocketAddress;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.example.echo.EchoHandler;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.logging.LoggingHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* An {@link HttpServlet} that proxies an incoming data to the actual server
@ -49,62 +68,126 @@ import org.jboss.netty.channel.MessageEvent;
*/
public class HttpTunnelingServlet extends HttpServlet {
private static final long serialVersionUID = -872309493835745385L;
private static final long serialVersionUID = -2396314792027814020L;
final static String CHANNEL_PROP = "channel";
final static String HANDLER_PROP = "handler";
private static final String ENDPOINT = "endpoint";
protected void doRequest(
HttpServletRequest request,
HttpServletResponse response) throws IOException {
HttpSession session = request.getSession();
Channel channel = (Channel) session.getAttribute(CHANNEL_PROP);
HttpTunnelingChannelHandler handler =
(HttpTunnelingChannelHandler) session.getAttribute(HANDLER_PROP);
if (handler.isStreaming()) {
streamResponse(request, response, session, handler, channel);
} else {
pollResponse(channel, request, response, session, handler);
static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpTunnelingServlet.class);
private volatile SocketAddress remoteAddress;
private volatile ChannelFactory channelFactory;
@Override
public void init() throws ServletException {
ServletConfig config = getServletConfig();
String endpoint = config.getInitParameter(ENDPOINT);
if (endpoint == null) {
throw new ServletException("init-param '" + ENDPOINT + "' must be specified.");
}
}
private void streamResponse(
final HttpServletRequest request,
final HttpServletResponse response, HttpSession session,
HttpTunnelingChannelHandler handler, Channel channel) throws IOException {
try {
response.setHeader("JSESSIONID", session.getId());
response.setHeader("Content-Type", "application/octet-stream");
response.setStatus(HttpServletResponse.SC_OK);
remoteAddress = parseEndpoint(endpoint.trim());
} catch (ServletException e) {
throw e;
} catch (Exception e) {
throw new ServletException("Failed to parse an endpoint.", e);
}
try {
channelFactory = createChannelFactory(remoteAddress);
} catch (ServletException e) {
throw e;
} catch (Exception e) {
throw new ServletException("Failed to create a channel factory.", e);
}
ServerBootstrap b = new ServerBootstrap(new DefaultLocalServerChannelFactory());
// TODO Add more constructor params for LoggingHandler
b.getPipeline().addLast("logger", new LoggingHandler(getClass(), InternalLogLevel.INFO, true));
b.getPipeline().addLast("handler", new EchoHandler());
b.bind(remoteAddress);
}
protected SocketAddress parseEndpoint(String endpoint) throws Exception {
if (endpoint.startsWith("local:")) {
return new LocalAddress(endpoint.substring(6).trim());
} else {
throw new ServletException(
"Invalid or unknown endpoint: " + endpoint);
}
}
protected ChannelFactory createChannelFactory(SocketAddress remoteAddress) throws Exception {
if (remoteAddress instanceof LocalAddress) {
return new DefaultLocalClientChannelFactory();
} else {
throw new ServletException(
"Unsupported remote address type: " +
remoteAddress.getClass().getName());
}
}
@Override
public void destroy() {
try {
destroyChannelFactory(channelFactory);
} catch (Exception e) {
logger.warn("Failed to destroy a channel factory.", e);
}
}
protected void destroyChannelFactory(ChannelFactory factory) throws Exception {
factory.releaseExternalResources();
}
@Override
protected void service(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
if (!"POST".equalsIgnoreCase(req.getMethod())) {
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "Method not allowed");
return;
}
final ChannelPipeline pipeline = Channels.pipeline();
final ServletOutputStream out = res.getOutputStream();
final OutboundConnectionHandler handler = new OutboundConnectionHandler(out);
pipeline.addLast("handler", handler);
Channel channel = channelFactory.newChannel(pipeline);
ChannelFuture future = channel.connect(remoteAddress).awaitUninterruptibly();
if (!future.isSuccess()) {
res.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Endpoint unavailable: " + future.getCause().getMessage());
return;
}
ChannelFuture lastWriteFuture = null;
try {
res.setStatus(HttpServletResponse.SC_OK);
res.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
res.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
// Initiate chunked encoding by flushing the headers.
response.getOutputStream().flush();
handler.setOutputStream(response.getOutputStream());
out.flush();
PushbackInputStream in =
new PushbackInputStream(request.getInputStream());
new PushbackInputStream(req.getInputStream());
for (;;) {
try {
ChannelBuffer buffer = read(in);
if (buffer == null) {
break;
}
channel.write(buffer);
} catch (IOException e) {
// this is ok, the client can reconnect.
ChannelBuffer buffer = read(in);
if (buffer == null) {
break;
}
lastWriteFuture = channel.write(buffer);
}
} finally {
// Mark the channel as closed if the client didn't reconnect in time.
if (!handler.awaitReconnect()) {
if (lastWriteFuture == null) {
channel.close();
} else {
lastWriteFuture.addListener(ChannelFutureListener.CLOSE);
}
}
}
private ChannelBuffer read(PushbackInputStream in) throws IOException {
private static ChannelBuffer read(PushbackInputStream in) throws IOException {
byte[] buf;
int readBytes;
@ -140,61 +223,26 @@ public class HttpTunnelingServlet extends HttpServlet {
return buffer;
}
private void pollResponse(
Channel channel,
HttpServletRequest request,
HttpServletResponse response, HttpSession session,
HttpTunnelingChannelHandler handler) throws IOException {
@ChannelPipelineCoverage("one")
private final class OutboundConnectionHandler extends SimpleChannelUpstreamHandler {
InputStream in = request.getInputStream();
if (in != null) {
ChannelBuffer requestContent = ChannelBuffers.dynamicBuffer();
for (;;) {
int writtenBytes = requestContent.writeBytes(in, 4096);
if (writtenBytes < 0) {
break;
}
}
if (requestContent.readable()) {
channel.write(requestContent);
}
private final ServletOutputStream out;
public OutboundConnectionHandler(ServletOutputStream out) {
this.out = out;
}
handler.setOutputStream(response.getOutputStream());
List<MessageEvent> buffers = handler.getAwaitingEvents();
int length = 0;
if (buffers.size() > 0) {
for (MessageEvent buffer: buffers) {
length += ((ChannelBuffer) buffer.getMessage()).readableBytes();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
buffer.readBytes(out, buffer.readableBytes());
out.flush();
}
response.setHeader("JSESSIONID", session.getId());
response.setContentLength(length);
response.setStatus(HttpServletResponse.SC_OK);
for (MessageEvent event: buffers) {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
try {
response.getOutputStream().write(b);
event.getFuture().setSuccess();
} catch (IOException e) {
event.getFuture().setFailure(e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
logger.warn("Unexpected exception while HTTP tunneling", e.getCause());
e.getChannel().close();
}
}
@Override
protected void doGet(
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws ServletException, IOException {
doRequest(httpServletRequest, httpServletResponse);
}
@Override
protected void doPost(
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws ServletException, IOException {
doRequest(httpServletRequest, httpServletResponse);
}
}

View File

@ -1,91 +0,0 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.http;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.channel.socket.http.HttpTunnelingContextListener.*;
import static org.jboss.netty.channel.socket.http.HttpTunnelingServlet.*;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.local.LocalAddress;
/**
* An {@link HttpSessionListener} that creates an outbound connection to the
* actual server behind {@link HttpServlet}. The outbound connection is open
* when a new session is created, and closed when the session is destroyed.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Andy Taylor (andy.taylor@jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public class HttpTunnelingSessionListener implements HttpSessionListener, ChannelHandler {
public void sessionCreated(HttpSessionEvent event) {
HttpSession session = event.getSession();
ClientBootstrap bootstrap = (ClientBootstrap) session.getServletContext().getAttribute(BOOTSTRAP_PROP);
Boolean streaming = (Boolean) session.getServletContext().getAttribute(STREAMING_PROP);
if (streaming) {
session.setMaxInactiveInterval(-1);
}
final HttpTunnelingChannelHandler handler = new HttpTunnelingChannelHandler(streaming, session, (Long) session.getServletContext().getAttribute(RECONNECT_PROP));
session.setAttribute(HANDLER_PROP, handler);
bootstrap.setPipelineFactory(new HttpTunnelingChannelPipelineFactory(handler));
ChannelFuture future = bootstrap.connect(new LocalAddress((String) session.getServletContext().getAttribute(SERVER_CHANNEL_PROP)));
future.awaitUninterruptibly();
final Channel ch = future.getChannel();
session.setAttribute(CHANNEL_PROP, ch);
}
public void sessionDestroyed(HttpSessionEvent event) {
Channel channel = (Channel) event.getSession().getAttribute(CHANNEL_PROP);
if (channel != null) {
channel.close();
}
}
private static final class HttpTunnelingChannelPipelineFactory implements ChannelPipelineFactory {
private final HttpTunnelingChannelHandler handler;
HttpTunnelingChannelPipelineFactory(HttpTunnelingChannelHandler handler) {
this.handler = handler;
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast(HttpTunnelingSessionListener.class.getName(), handler);
return pipeline;
}
}
}

View File

@ -70,6 +70,8 @@ import org.jboss.netty.util.internal.ConversionUtil;
public final class HttpTunnelingSocketChannelConfig implements SocketChannelConfig {
private final HttpTunnelingClientSocketChannel channel;
private volatile String serverName;
private volatile String serverPath = "/netty-tunnel";
private volatile SSLContext sslContext;
private volatile String[] enabledSslCipherSuites;
private volatile String[] enabledSslProtocols;
@ -82,6 +84,22 @@ public final class HttpTunnelingSocketChannelConfig implements SocketChannelConf
this.channel = channel;
}
public String getServerName() {
return serverName;
}
public void setServerName(String serverName) {
this.serverName = serverName;
}
public String getServerPath() {
return serverPath;
}
public void setServerPath(String serverPath) {
this.serverPath = serverPath;
}
/**
* Returns the {@link SSLContext} which is used to establish an HTTPS
* connection. If {@code null}, a plain-text HTTP connection is established.
@ -181,11 +199,15 @@ public final class HttpTunnelingSocketChannelConfig implements SocketChannelConf
}
public boolean setOption(String key, Object value) {
if (channel.channel.getConfig().setOption(key, value)) {
if (channel.realChannel.getConfig().setOption(key, value)) {
return true;
}
if (key.equals("sslContext")) {
if (key.equals("serverName")){
setServerName(String.valueOf(value));
} else if (key.equals("serverPath")){
setServerPath(String.valueOf(value));
} else if (key.equals("sslContext")) {
setSslContext((SSLContext) value);
} else if (key.equals("enabledSslCipherSuites")){
setEnabledSslCipherSuites(ConversionUtil.toStringArray(value));
@ -201,98 +223,98 @@ public final class HttpTunnelingSocketChannelConfig implements SocketChannelConf
}
public int getReceiveBufferSize() {
return channel.channel.getConfig().getReceiveBufferSize();
return channel.realChannel.getConfig().getReceiveBufferSize();
}
public int getSendBufferSize() {
return channel.channel.getConfig().getSendBufferSize();
return channel.realChannel.getConfig().getSendBufferSize();
}
public int getSoLinger() {
return channel.channel.getConfig().getSoLinger();
return channel.realChannel.getConfig().getSoLinger();
}
public int getTrafficClass() {
return channel.channel.getConfig().getTrafficClass();
return channel.realChannel.getConfig().getTrafficClass();
}
public boolean isKeepAlive() {
return channel.channel.getConfig().isKeepAlive();
return channel.realChannel.getConfig().isKeepAlive();
}
public boolean isReuseAddress() {
return channel.channel.getConfig().isReuseAddress();
return channel.realChannel.getConfig().isReuseAddress();
}
public boolean isTcpNoDelay() {
return channel.channel.getConfig().isTcpNoDelay();
return channel.realChannel.getConfig().isTcpNoDelay();
}
public void setKeepAlive(boolean keepAlive) {
channel.channel.getConfig().setKeepAlive(keepAlive);
channel.realChannel.getConfig().setKeepAlive(keepAlive);
}
public void setPerformancePreferences(
int connectionTime, int latency, int bandwidth) {
channel.channel.getConfig().setPerformancePreferences(connectionTime, latency, bandwidth);
channel.realChannel.getConfig().setPerformancePreferences(connectionTime, latency, bandwidth);
}
public void setReceiveBufferSize(int receiveBufferSize) {
channel.channel.getConfig().setReceiveBufferSize(receiveBufferSize);
channel.realChannel.getConfig().setReceiveBufferSize(receiveBufferSize);
}
public void setReuseAddress(boolean reuseAddress) {
channel.channel.getConfig().setReuseAddress(reuseAddress);
channel.realChannel.getConfig().setReuseAddress(reuseAddress);
}
public void setSendBufferSize(int sendBufferSize) {
channel.channel.getConfig().setSendBufferSize(sendBufferSize);
channel.realChannel.getConfig().setSendBufferSize(sendBufferSize);
}
public void setSoLinger(int soLinger) {
channel.channel.getConfig().setSoLinger(soLinger);
channel.realChannel.getConfig().setSoLinger(soLinger);
}
public void setTcpNoDelay(boolean tcpNoDelay) {
channel.channel.getConfig().setTcpNoDelay(tcpNoDelay);
channel.realChannel.getConfig().setTcpNoDelay(tcpNoDelay);
}
public void setTrafficClass(int trafficClass) {
channel.channel.getConfig().setTrafficClass(trafficClass);
channel.realChannel.getConfig().setTrafficClass(trafficClass);
}
public ChannelBufferFactory getBufferFactory() {
return channel.channel.getConfig().getBufferFactory();
return channel.realChannel.getConfig().getBufferFactory();
}
public int getConnectTimeoutMillis() {
return channel.channel.getConfig().getConnectTimeoutMillis();
return channel.realChannel.getConfig().getConnectTimeoutMillis();
}
public ChannelPipelineFactory getPipelineFactory() {
return channel.channel.getConfig().getPipelineFactory();
return channel.realChannel.getConfig().getPipelineFactory();
}
@Deprecated
public int getWriteTimeoutMillis() {
return channel.channel.getConfig().getWriteTimeoutMillis();
return channel.realChannel.getConfig().getWriteTimeoutMillis();
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
channel.channel.getConfig().setBufferFactory(bufferFactory);
channel.realChannel.getConfig().setBufferFactory(bufferFactory);
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
channel.channel.getConfig().setConnectTimeoutMillis(connectTimeoutMillis);
channel.realChannel.getConfig().setConnectTimeoutMillis(connectTimeoutMillis);
}
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
channel.channel.getConfig().setPipelineFactory(pipelineFactory);
channel.realChannel.getConfig().setPipelineFactory(pipelineFactory);
}
@Deprecated
public void setWriteTimeoutMillis(int writeTimeoutMillis) {
channel.channel.getConfig().setWriteTimeoutMillis(writeTimeoutMillis);
channel.realChannel.getConfig().setWriteTimeoutMillis(writeTimeoutMillis);
}
}

View File

@ -25,8 +25,97 @@
* An HTTP-based client-side {@link org.jboss.netty.channel.socket.SocketChannel}
* and its corresponding server-side Servlet implementation that make your
* existing server application work in a firewalled network.
* <p>
* Please refer to the example in the <tt>org.jboss.netty.example.http.tunnel</tt>
* package to learn how to configure the HTTP tunneling transport.
*
* <h3>Deploying the HTTP tunnel as a Servlet</h3>
*
* First, {@link org.jboss.netty.channel.socket.http.HttpTunnelingServlet} must be
* configured in a <tt>web.xml</tt>.
*
* <pre>
* &lt;?xml version="1.0" encoding="UTF-8"?&gt;
* &lt;web-app xmlns="http://java.sun.com/xml/ns/j2ee"
* xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
* xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd"
* version="2.4"&gt;
*
* &lt;servlet&gt;
* &lt;servlet-name&gt;NettyTunnelingServlet&lt;/servlet-name&gt;
* &lt;servlet-class&gt;<b>org.jboss.netty.channel.socket.http.HttpTunnelingServlet</b>&lt;/servlet-class&gt;
* &lt;!--
* The name of the channel, this should be a registered local channel.
* See LocalTransportRegister.
* --&gt;
* &lt;init-param&gt;
* &lt;param-name&gt;<b>endpoint</b>&lt;/param-name&gt;
* &lt;param-value&gt;<b>local:myLocalServer</b>&lt;/param-value&gt;
* &lt;/init-param&gt;
* &lt;load-on-startup&gt;<b>1</b>&lt;/load-on-startup&gt;
* &lt;/servlet&gt;
*
* &lt;servlet-mapping&gt;
* &lt;servlet-name&gt;NettyTunnelingServlet&lt;/servlet-name&gt;
* &lt;url-pattern&gt;<b>/netty-tunnel</b>&lt;/url-pattern&gt;
* &lt;/servlet-mapping&gt;
* &lt;/web-app&gt;
* </pre>
*
* Second, you have to bind your Netty-based server application in the same
* Servlet context or shared class loader space using the local transport
* (see {@link org.jboss.netty.channel.local.LocalServerChannelFactory}.)
* You can use your favorite IoC framework such as JBoss Microcontainer, Guice,
* and Spring to do this. The following example shows how to bind an echo
* erver to the endpoint specifed above (<tt>web.xml</tt>) in JBossAS 5:
*
* <pre>
* &lt;bean name="my-local-echo-server"
* class="org.jboss.netty.example.http.tunnel.LocalEchoServerRegistration" /&gt;
*
* ...
*
* package org.jboss.netty.example.http.tunnel;
* ...
*
* public class LocalEchoServerRegistration {
*
* private final ChannelFactory factory = new DefaultLocalServerChannelFactory();
* private volatile Channel serverChannel;
*
* public void start() {
* ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
* EchoHandler handler = new EchoHandler();
* serverBootstrap.getPipeline().addLast("handler", handler);
*
* // Note that "myLocalServer" is the endpoint which was specified in web.xml.
* serverChannel = serverBootstrap.bind(new LocalAddress("<b>myLocalServer</b>"));
* }
*
* public void stop() {
* serverChannel.close();
* }
* }
* </pre>
*
* <h3>Connecting to the HTTP tunnel</h3>
*
* Once the tunnel has been configured, your client-side application needs only
* a couple lines of changes.
*
* <pre>
* ClientBootstrap b = new ClientBootstrap(
* <b>new HttpTunnelingClientSocketChannelFactory(new NioClientSocketChannelFactory(...))</b>);
*
* // Configure the pipeline (or pipeline factory) here.
* ...
*
* // The host name of the HTTP server.
* b.setOption(<b>"serverName"</b>, "example.com");
* // The path to the HTTP tunneling Servlet (set to <b>/netty-tunnel</b> in web.xml)
* b.setOption(<b>"serverPath"</b>, "contextPath<b>/netty-tunnel</b>");
* b.connect(new InetSocketAddress("example.com", 80);
* </pre>
*
* For more configuration parameters such as HTTPS options,
* refer to {@link org.jboss.netty.channel.socket.http.HttpTunnelingSocketChannelConfig}.
*/
package org.jboss.netty.channel.socket.http;

View File

@ -24,6 +24,7 @@ package org.jboss.netty.example.http.tunnel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.Executors;
@ -32,61 +33,18 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.socket.http.HttpTunnelAddress;
import org.jboss.netty.channel.socket.http.HttpTunnelingClientSocketChannelFactory;
import org.jboss.netty.channel.socket.http.HttpTunnelingServlet;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* Make sure that the {@link LocalTransportRegister} bean is deployed along
* with the {@link HttpTunnelingServlet} with the following <tt>web.xml</tt>.
* An HTTP tunneled version of the telnet client example. Please refer to the
* API documentation of the <tt>org.jboss.netty.channel.socket.http</tt> package
* for the detailed instruction on how to deploy the server-side HTTP tunnel in
* your Servlet container.
*
* <pre>
* &lt;?xml version="1.0" encoding="UTF-8"?&gt;
* &lt;web-app xmlns="http://java.sun.com/xml/ns/j2ee"
* xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
* xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd"
* version="2.4"&gt;
* &lt;!--the name of the channel, this should be a registered local channel. see LocalTransportRegister--&gt;
* &lt;context-param&gt;
* &lt;param-name&gt;serverChannelName&lt;/param-name&gt;
* &lt;param-value&gt;myLocalServer&lt;/param-value&gt;
* &lt;/context-param&gt;
*
* &lt;!--Whether or not we are streaming or just polling using normal HTTP requests--&gt;
* &lt;context-param&gt;
* &lt;param-name&gt;streaming&lt;/param-name&gt;
* &lt;param-value&gt;true&lt;/param-value&gt;
* &lt;/context-param&gt;
*
* &lt;!--How long to wait for a client reconnecting in milliseconds--&gt;
* &lt;context-param&gt;
* &lt;param-name&gt;reconnectTimeout&lt;/param-name&gt;
* &lt;param-value&gt;3000&lt;/param-value&gt;
* &lt;/context-param&gt;
*
* &lt;listener&gt;
* &lt;listener-class&gt;org.jboss.netty.channel.socket.http.HttpTunnelingSessionListener&lt;/listener-class&gt;
* &lt;/listener&gt;
*
* &lt;listener&gt;
* &lt;listener-class&gt;org.jboss.netty.channel.socket.http.HttpTunnelingContextListener&lt;/listener-class&gt;
* &lt;/listener&gt;
*
* &lt;servlet&gt;
* &lt;servlet-name&gt;NettyTunnelingServlet&lt;/servlet-name&gt;
* &lt;servlet-class&gt;org.jboss.netty.channel.socket.http.HttpTunnelingServlet&lt;/servlet-class&gt;
* &lt;/servlet&gt;
*
* &lt;servlet-mapping&gt;
* &lt;servlet-name&gt;NettyTunnelingServlet&lt;/servlet-name&gt;
* &lt;url-pattern&gt;/netty-tunnel&lt;/url-pattern&gt;
* &lt;/servlet-mapping&gt;
* &lt;/web-app&gt;
* </pre>
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Andy Taylor (andy.taylor@jboss.org)
* @version $Rev$, $Date$
@ -108,19 +66,25 @@ public class HttpTunnelingClientExample {
String scheme = uri.getScheme() == null? "http" : uri.getScheme();
if (!scheme.equals("http")) {
// We can actually support HTTPS fairly easily by inserting
// an SslHandler to the pipeline - left as an exercise.
// We can actually support HTTPS fairly easily by setting
// "sslContext" option in the bootstrap, as explained in
// {@link HttpTunnelingSocketChannelConfig}.
System.err.println("Only HTTP is supported.");
return;
}
HttpTunnelingClientSocketChannelFactory factory = new HttpTunnelingClientSocketChannelFactory(new OioClientSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.getPipeline().addLast("decoder", new StringDecoder());
bootstrap.getPipeline().addLast("encoder", new StringEncoder());
bootstrap.getPipeline().addLast("handler", new PrintHandler());
ChannelFuture channelFuture = bootstrap.connect(new HttpTunnelAddress(uri));
bootstrap.setOption("serverName", uri.getHost());
bootstrap.setOption("serverPath", uri.getRawPath());
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));
channelFuture.awaitUninterruptibly();
System.out.println("Enter text (quit to end)");
System.out.println("Enter text ('quit' to exit)");
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

View File

@ -30,18 +30,18 @@ import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.example.echo.EchoHandler;
/**
* Deploy this in JBossAS 5 by adding the following bean.
* Deploy this in JBossAS 5 or other IoC container by adding the following bean.
*
* <pre>
* &lt;bean name="org.jboss.netty.example.http.tunnel.LocalTransportRegister"
* class="org.jboss.netty.example.http.tunnel.LocalTransportRegister" /&gt;
* &lt;bean name="org.jboss.netty.example.http.tunnel.LocalEchoServerRegistration"
* class="org.jboss.netty.example.http.tunnel.LocalEchoServerRegistration" /&gt;
* </pre>
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Andy Taylor (andy.taylor@jboss.org)
* @version $Rev$, $Date$
*/
public class LocalTransportRegister {
public class LocalEchoServerRegistration {
private final ChannelFactory factory = new DefaultLocalServerChannelFactory();
private volatile Channel serverChannel;