Fixed bugs caused by today's refactoring

This commit is contained in:
Trustin Lee 2009-06-30 11:16:01 +00:00
parent d35a8d29b1
commit 78dbbe7661
2 changed files with 114 additions and 93 deletions

View File

@ -36,6 +36,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelPipelineCoverage;
@ -164,56 +165,102 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
} }
} }
void connectAndSendHeaders(boolean reconnect, HttpTunnelAddress remoteAddress) throws SSLException { void connectAndSendHeaders(boolean reconnect, final HttpTunnelAddress remoteAddress, final ChannelFuture future) {
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
URI url = remoteAddress.getUri(); final URI url = remoteAddress.getUri();
if (reconnect) { if (reconnect) {
closeSocket(); closeSocket();
createSocketChannel(); createSocketChannel();
} }
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
SocketAddress connectAddress = new InetSocketAddress(url.getHost(), url.getPort()); SocketAddress connectAddress = new InetSocketAddress(url.getHost(), url.getPort());
channel.connect(connectAddress).awaitUninterruptibly(); channel.connect(connectAddress).addListener(
new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
if (f.isSuccess()) {
// Configure SSL
HttpTunnelingSocketChannelConfig config = getConfig();
SSLContext sslContext = config.getSslContext();
ChannelFuture sslHandshakeFuture = null;
if (sslContext != null) {
URI uri = remoteAddress.getUri();
SSLEngine engine = sslContext.createSSLEngine(
uri.getHost(), uri.getPort());
// Configure SSL // Configure the SSLEngine.
HttpTunnelingSocketChannelConfig config = getConfig(); engine.setUseClientMode(true);
SSLContext sslContext = config.getSslContext(); engine.setEnableSessionCreation(config.isEnableSslSessionCreation());
if (sslContext != null) { String[] enabledCipherSuites = config.getEnabledSslCipherSuites();
URI uri = remoteAddress.getUri(); if (enabledCipherSuites != null) {
SSLEngine engine = sslContext.createSSLEngine( engine.setEnabledCipherSuites(enabledCipherSuites);
uri.getHost(), uri.getPort()); }
String[] enabledProtocols = config.getEnabledSslProtocols();
if (enabledProtocols != null) {
engine.setEnabledProtocols(enabledProtocols);
}
// Configure the SSLEngine. SslHandler sslHandler = new SslHandler(engine);
engine.setUseClientMode(true); channel.getPipeline().addFirst("ssl", sslHandler);
engine.setEnableSessionCreation(config.isEnableSslSessionCreation()); try {
String[] enabledCipherSuites = config.getEnabledSslCipherSuites(); sslHandshakeFuture = sslHandler.handshake(channel);
if (enabledCipherSuites != null) { } catch (SSLException e) {
engine.setEnabledCipherSuites(enabledCipherSuites); future.setFailure(e);
} fireExceptionCaught(channel, e);
String[] enabledProtocols = config.getEnabledSslProtocols(); return;
if (enabledProtocols != null) { }
engine.setEnabledProtocols(enabledProtocols); }
}
SslHandler sslHandler = new SslHandler(engine); // Send the HTTP request.
channel.getPipeline().addFirst("ssl", sslHandler); final HttpRequest req = new DefaultHttpRequest(
sslHandler.handshake(channel).awaitUninterruptibly(); HttpVersion.HTTP_1_1, HttpMethod.POST, url.getRawPath());
} req.setHeader(HttpHeaders.Names.HOST, url.getHost());
req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
if (sessionId != null) {
CookieEncoder ce = new CookieEncoder(false);
ce.addCookie(JSESSIONID, sessionId);
String cookie = ce.encode();
//System.out.println("COOKIE: " + cookie);
req.setHeader(HttpHeaders.Names.COOKIE, cookie);
}
// Send the HTTP request. if (sslHandshakeFuture == null) {
HttpRequest req = new DefaultHttpRequest( channel.write(req);
HttpVersion.HTTP_1_1, HttpMethod.POST, url.getRawPath()); future.setSuccess();
req.setHeader(HttpHeaders.Names.HOST, url.getHost()); if (!channel.isBound()) {
req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream"); fireChannelBound(HttpTunnelingClientSocketChannel.this, channel.getLocalAddress());
req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED); }
req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY); fireChannelConnected(HttpTunnelingClientSocketChannel.this, channel.getRemoteAddress());
} else {
if (sessionId != null) { sslHandshakeFuture.addListener(new ChannelFutureListener() {
CookieEncoder ce = new CookieEncoder(false); public void operationComplete(
ce.addCookie(JSESSIONID, sessionId); ChannelFuture f)
req.setHeader(HttpHeaders.Names.COOKIE, ce.encode()); throws Exception {
} if (f.isSuccess()) {
channel.write(req);
channel.write(req); future.setSuccess();
if (!isBound()) {
// FIXME: channelBound is not fired.
fireChannelBound(HttpTunnelingClientSocketChannel.this, channel.getLocalAddress());
}
fireChannelConnected(HttpTunnelingClientSocketChannel.this, channel.getRemoteAddress());
} else {
future.setFailure(f.getCause());
fireExceptionCaught(HttpTunnelingClientSocketChannel.this, f.getCause());
}
}
});
}
} else {
future.setFailure(f.getCause());
fireExceptionCaught(channel, f.getCause());
}
}
});
} }
private void createSocketChannel() { private void createSocketChannel() {
@ -225,15 +272,20 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
channel = clientSocketChannelFactory.newChannel(channelPipeline); channel = clientSocketChannelFactory.newChannel(channelPipeline);
} }
int sendChunk(ChannelBuffer a) { void sendChunk(ChannelBuffer a, final ChannelFuture future) {
int size = a.readableBytes(); final int size = a.readableBytes();
String hex = Integer.toHexString(size) + HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR; channel.write(new DefaultHttpChunk(a)).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
synchronized (writeLock) { throws Exception {
channel.write(new DefaultHttpChunk(a)).awaitUninterruptibly(); if (f.isSuccess()) {
} future.setSuccess();
fireWriteComplete(HttpTunnelingClientSocketChannel.this, size);
return size + hex.length() + HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR.length(); } else {
future.setFailure(f.getCause());
fireExceptionCaught(HttpTunnelingClientSocketChannel.this, f.getCause());
}
}
});
} }
void closeSocket() { void closeSocket() {
@ -256,7 +308,6 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
class ServletChannelHandler extends SimpleChannelUpstreamHandler { class ServletChannelHandler extends SimpleChannelUpstreamHandler {
private volatile boolean readingChunks; private volatile boolean readingChunks;
int nextChunkSize = -1;
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
@ -268,14 +319,20 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
newSessionId = getSessionId(res, HttpHeaders.Names.SET_COOKIE2); newSessionId = getSessionId(res, HttpHeaders.Names.SET_COOKIE2);
} }
//System.out.println("NEW_SESSION_ID: " + newSessionId);
// XXX: Utilize keep-alive if possible to reduce reconnection overhead. // XXX: Utilize keep-alive if possible to reduce reconnection overhead.
// XXX: Consider non-200 status code. // XXX: Consider non-200 status code.
// If the status code is not 200, no more reconnection attempt // If the status code is not 200, no more reconnection attempt
// should be made. // should be made.
// XXX: If the session ID in the response is different from
// the session ID specified in the request, then it means
// the session has timed out. If so, channel must be closed.
// If the session ID has been changed, it means the session has
// been timed out and a new session has been created. If so,
// channel must be closed.
if (sessionId != null && !sessionId.equals(newSessionId)) {
closeSocket();
return;
}
sessionId = newSessionId; sessionId = newSessionId;
if (res.isChunked()) { if (res.isChunked()) {
@ -283,13 +340,13 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
} else { } else {
ChannelBuffer content = res.getContent(); ChannelBuffer content = res.getContent();
if (content.readable()) { if (content.readable()) {
fireMessageReceived(channel, content); fireMessageReceived(HttpTunnelingClientSocketChannel.this, content);
} }
} }
} else { } else {
HttpChunk chunk = (HttpChunk) e.getMessage(); HttpChunk chunk = (HttpChunk) e.getMessage();
if (!chunk.isLast()) { if (!chunk.isLast()) {
fireMessageReceived(channel, chunk.getContent()); fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent());
} else { } else {
readingChunks = false; readingChunks = false;
} }

View File

@ -73,7 +73,7 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
break; break;
case CONNECTED: case CONNECTED:
if (value != null) { if (value != null) {
connect(channel, future, (HttpTunnelAddress) value); channel.connectAndSendHeaders(false, ((HttpTunnelAddress) value), future);
} else { } else {
close(channel, future); close(channel, future);
} }
@ -94,19 +94,7 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
break; break;
} }
} else if (e instanceof MessageEvent) { } else if (e instanceof MessageEvent) {
write(channel, (ChannelBuffer) ((MessageEvent) e).getMessage(), future); channel.sendChunk(((ChannelBuffer) ((MessageEvent) e).getMessage()), future);
}
}
private void write(HttpTunnelingClientSocketChannel channel, ChannelBuffer msg, ChannelFuture future) {
try {
int writtenBytes = channel.sendChunk(msg);
future.setSuccess();
fireWriteComplete(channel, writtenBytes);
}
catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} }
} }
@ -123,30 +111,6 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
} }
} }
private void connect(
HttpTunnelingClientSocketChannel channel, ChannelFuture future,
HttpTunnelAddress remoteAddress) {
boolean bound = channel.isBound();
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
try {
channel.connectAndSendHeaders(false, remoteAddress);
// Fire events.
future.setSuccess();
if (!bound) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
// FIXME: Rewrite exception handling.
}
}
private void close( private void close(
HttpTunnelingClientSocketChannel channel, ChannelFuture future) { HttpTunnelingClientSocketChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected(); boolean connected = channel.isConnected();