diff --git a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannel.java index 3469b8c8f2..efb8ed1f8e 100644 --- a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketChannel.java @@ -36,6 +36,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineCoverage; @@ -164,56 +165,102 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel } } - void connectAndSendHeaders(boolean reconnect, HttpTunnelAddress remoteAddress) throws SSLException { + void connectAndSendHeaders(boolean reconnect, final HttpTunnelAddress remoteAddress, final ChannelFuture future) { this.remoteAddress = remoteAddress; - URI url = remoteAddress.getUri(); + final URI url = remoteAddress.getUri(); if (reconnect) { closeSocket(); createSocketChannel(); } + + future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + SocketAddress connectAddress = new InetSocketAddress(url.getHost(), url.getPort()); - channel.connect(connectAddress).awaitUninterruptibly(); + channel.connect(connectAddress).addListener( + new ChannelFutureListener() { + public void operationComplete(ChannelFuture f) { + if (f.isSuccess()) { + // Configure SSL + HttpTunnelingSocketChannelConfig config = getConfig(); + SSLContext sslContext = config.getSslContext(); + ChannelFuture sslHandshakeFuture = null; + if (sslContext != null) { + URI uri = remoteAddress.getUri(); + SSLEngine engine = sslContext.createSSLEngine( + uri.getHost(), uri.getPort()); - // Configure SSL - HttpTunnelingSocketChannelConfig config = getConfig(); - SSLContext sslContext = config.getSslContext(); - if (sslContext != null) { - URI uri = remoteAddress.getUri(); - SSLEngine engine = sslContext.createSSLEngine( - uri.getHost(), uri.getPort()); + // Configure the SSLEngine. + engine.setUseClientMode(true); + engine.setEnableSessionCreation(config.isEnableSslSessionCreation()); + String[] enabledCipherSuites = config.getEnabledSslCipherSuites(); + if (enabledCipherSuites != null) { + engine.setEnabledCipherSuites(enabledCipherSuites); + } + String[] enabledProtocols = config.getEnabledSslProtocols(); + if (enabledProtocols != null) { + engine.setEnabledProtocols(enabledProtocols); + } - // Configure the SSLEngine. - engine.setUseClientMode(true); - engine.setEnableSessionCreation(config.isEnableSslSessionCreation()); - String[] enabledCipherSuites = config.getEnabledSslCipherSuites(); - if (enabledCipherSuites != null) { - engine.setEnabledCipherSuites(enabledCipherSuites); - } - String[] enabledProtocols = config.getEnabledSslProtocols(); - if (enabledProtocols != null) { - engine.setEnabledProtocols(enabledProtocols); - } + SslHandler sslHandler = new SslHandler(engine); + channel.getPipeline().addFirst("ssl", sslHandler); + try { + sslHandshakeFuture = sslHandler.handshake(channel); + } catch (SSLException e) { + future.setFailure(e); + fireExceptionCaught(channel, e); + return; + } + } - SslHandler sslHandler = new SslHandler(engine); - channel.getPipeline().addFirst("ssl", sslHandler); - sslHandler.handshake(channel).awaitUninterruptibly(); - } + // Send the HTTP request. + final HttpRequest req = new DefaultHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.POST, url.getRawPath()); + req.setHeader(HttpHeaders.Names.HOST, url.getHost()); + req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream"); + req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED); + req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY); + + if (sessionId != null) { + CookieEncoder ce = new CookieEncoder(false); + ce.addCookie(JSESSIONID, sessionId); + String cookie = ce.encode(); + //System.out.println("COOKIE: " + cookie); + req.setHeader(HttpHeaders.Names.COOKIE, cookie); + } - // Send the HTTP request. - HttpRequest req = new DefaultHttpRequest( - HttpVersion.HTTP_1_1, HttpMethod.POST, url.getRawPath()); - req.setHeader(HttpHeaders.Names.HOST, url.getHost()); - req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream"); - req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED); - req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY); - - if (sessionId != null) { - CookieEncoder ce = new CookieEncoder(false); - ce.addCookie(JSESSIONID, sessionId); - req.setHeader(HttpHeaders.Names.COOKIE, ce.encode()); - } - - channel.write(req); + if (sslHandshakeFuture == null) { + channel.write(req); + future.setSuccess(); + if (!channel.isBound()) { + fireChannelBound(HttpTunnelingClientSocketChannel.this, channel.getLocalAddress()); + } + fireChannelConnected(HttpTunnelingClientSocketChannel.this, channel.getRemoteAddress()); + } else { + sslHandshakeFuture.addListener(new ChannelFutureListener() { + public void operationComplete( + ChannelFuture f) + throws Exception { + if (f.isSuccess()) { + channel.write(req); + future.setSuccess(); + if (!isBound()) { + // FIXME: channelBound is not fired. + fireChannelBound(HttpTunnelingClientSocketChannel.this, channel.getLocalAddress()); + } + fireChannelConnected(HttpTunnelingClientSocketChannel.this, channel.getRemoteAddress()); + } else { + future.setFailure(f.getCause()); + fireExceptionCaught(HttpTunnelingClientSocketChannel.this, f.getCause()); + } + } + }); + } + } else { + future.setFailure(f.getCause()); + fireExceptionCaught(channel, f.getCause()); + } + } + }); } private void createSocketChannel() { @@ -225,15 +272,20 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel channel = clientSocketChannelFactory.newChannel(channelPipeline); } - int sendChunk(ChannelBuffer a) { - int size = a.readableBytes(); - String hex = Integer.toHexString(size) + HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR; - - synchronized (writeLock) { - channel.write(new DefaultHttpChunk(a)).awaitUninterruptibly(); - } - - return size + hex.length() + HttpTunnelingClientSocketPipelineSink.LINE_TERMINATOR.length(); + void sendChunk(ChannelBuffer a, final ChannelFuture future) { + final int size = a.readableBytes(); + channel.write(new DefaultHttpChunk(a)).addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture f) + throws Exception { + if (f.isSuccess()) { + future.setSuccess(); + fireWriteComplete(HttpTunnelingClientSocketChannel.this, size); + } else { + future.setFailure(f.getCause()); + fireExceptionCaught(HttpTunnelingClientSocketChannel.this, f.getCause()); + } + } + }); } void closeSocket() { @@ -256,7 +308,6 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel class ServletChannelHandler extends SimpleChannelUpstreamHandler { private volatile boolean readingChunks; - int nextChunkSize = -1; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { @@ -268,14 +319,20 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel newSessionId = getSessionId(res, HttpHeaders.Names.SET_COOKIE2); } + //System.out.println("NEW_SESSION_ID: " + newSessionId); + // XXX: Utilize keep-alive if possible to reduce reconnection overhead. // XXX: Consider non-200 status code. // If the status code is not 200, no more reconnection attempt // should be made. - // XXX: If the session ID in the response is different from - // the session ID specified in the request, then it means - // the session has timed out. If so, channel must be closed. + // If the session ID has been changed, it means the session has + // been timed out and a new session has been created. If so, + // channel must be closed. + if (sessionId != null && !sessionId.equals(newSessionId)) { + closeSocket(); + return; + } sessionId = newSessionId; if (res.isChunked()) { @@ -283,13 +340,13 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel } else { ChannelBuffer content = res.getContent(); if (content.readable()) { - fireMessageReceived(channel, content); + fireMessageReceived(HttpTunnelingClientSocketChannel.this, content); } } } else { HttpChunk chunk = (HttpChunk) e.getMessage(); if (!chunk.isLast()) { - fireMessageReceived(channel, chunk.getContent()); + fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent()); } else { readingChunks = false; } diff --git a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java index 9aa2ed2425..f61bef0dcb 100644 --- a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java @@ -73,7 +73,7 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink { break; case CONNECTED: if (value != null) { - connect(channel, future, (HttpTunnelAddress) value); + channel.connectAndSendHeaders(false, ((HttpTunnelAddress) value), future); } else { close(channel, future); } @@ -94,19 +94,7 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink { break; } } else if (e instanceof MessageEvent) { - write(channel, (ChannelBuffer) ((MessageEvent) e).getMessage(), future); - } - } - - private void write(HttpTunnelingClientSocketChannel channel, ChannelBuffer msg, ChannelFuture future) { - try { - int writtenBytes = channel.sendChunk(msg); - future.setSuccess(); - fireWriteComplete(channel, writtenBytes); - } - catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); + channel.sendChunk(((ChannelBuffer) ((MessageEvent) e).getMessage()), future); } } @@ -123,30 +111,6 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink { } } - private void connect( - HttpTunnelingClientSocketChannel channel, ChannelFuture future, - HttpTunnelAddress remoteAddress) { - - boolean bound = channel.isBound(); - - future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - - try { - channel.connectAndSendHeaders(false, remoteAddress); - // Fire events. - future.setSuccess(); - if (!bound) { - fireChannelBound(channel, channel.getLocalAddress()); - } - fireChannelConnected(channel, channel.getRemoteAddress()); - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } finally { - // FIXME: Rewrite exception handling. - } - } - private void close( HttpTunnelingClientSocketChannel channel, ChannelFuture future) { boolean connected = channel.isConnected();