added ClientSocketChannelFactory to ServletClientSocketChannelFactory constructor

This commit is contained in:
Andy Taylor 2009-02-16 10:21:44 +00:00
parent 4f391f5bbc
commit 1a96b48026
15 changed files with 265 additions and 168 deletions

View File

@ -22,29 +22,36 @@
*/
package org.jboss.netty.channel.socket.servlet;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.channel.socket.servlet.ServletClientSocketPipelineSink.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URL;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
import static org.jboss.netty.channel.Channels.pipeline;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import static org.jboss.netty.channel.socket.servlet.ServletClientSocketPipelineSink.LINE_TERMINATOR;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.util.LinkedTransferQueue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
@ -52,58 +59,67 @@ import org.jboss.netty.channel.socket.SocketChannelConfig;
* @version $Rev$, $Date$
*/
class ServletClientSocketChannel extends AbstractChannel
implements org.jboss.netty.channel.socket.SocketChannel {
implements org.jboss.netty.channel.socket.SocketChannel {
private final Lock lock = new ReentrantLock();
private final Lock reconnectLock = new ReentrantLock();
private volatile boolean awaitingInitialResponse = true;
private final Object writeLock = new Object();
private Socket socket;
private ServletSocketChannelConfig config;
volatile Thread workerThread;
private volatile PushbackInputStream in;
private volatile OutputStream out;
private String sessionId;
private boolean closed = false;
LinkedTransferQueue<byte[]> messages = new LinkedTransferQueue<byte[]>();
private final URL url;
private ClientSocketChannelFactory clientSocketChannelFactory;
private SocketChannel channel;
private DelimiterBasedFrameDecoder handler = new DelimiterBasedFrameDecoder(8092, ChannelBuffers.wrappedBuffer(new byte[] { '\r', '\n' }));
private ServletClientSocketChannel.ServletChannelHandler servletHandler = new ServletChannelHandler();
ServletClientSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink, URL url) {
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink, URL url, ClientSocketChannelFactory clientSocketChannelFactory) {
super(null, factory, pipeline, sink);
this.url = url;
socket = new Socket();
config = new ServletSocketChannelConfig(socket);
this.clientSocketChannelFactory = clientSocketChannelFactory;
DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
channelPipeline.addLast("DelimiterBasedFrameDecoder", handler);
channelPipeline.addLast("servletHandler", servletHandler);
channel = clientSocketChannelFactory.newChannel(channelPipeline);
fireChannelOpen(this);
}
public SocketChannelConfig getConfig() {
return config;
return channel.getConfig();
}
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) socket.getLocalSocketAddress();
return channel.getLocalAddress();
}
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) socket.getRemoteSocketAddress();
return channel.getRemoteAddress();
}
public boolean isBound() {
return isOpen() && socket.isBound();
return channel.isOpen();
}
public boolean isConnected() {
return isOpen() && socket.isConnected();
return channel.isConnected();
}
@Override
@ -126,172 +142,129 @@ class ServletClientSocketChannel extends AbstractChannel
}
}
PushbackInputStream getInputStream() {
return in;
}
OutputStream getOutputStream() {
return out;
}
void connectAndSendHeaders(boolean reconnect, SocketAddress remoteAddress) throws IOException {
if (reconnect) {
System.out.println("reconnecting");
socket.close();
socket = new Socket();
config = config.copyConfig(socket);
DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
channelPipeline.addLast("DelimiterBasedFrameDecoder", handler);
channelPipeline.addLast("servletHandler", servletHandler);
channel = clientSocketChannelFactory.newChannel(channelPipeline);
}
socket.connect(
remoteAddress, getConfig().getConnectTimeoutMillis());
// Obtain I/O stream.
in = new PushbackInputStream(socket.getInputStream(), 1);
out = socket.getOutputStream();
//write and read headers
channel.connect(remoteAddress);
StringBuilder builder = new StringBuilder();
builder.append("POST ").append(url.toExternalForm()).append(" HTTP/1.1").append(LINE_TERMINATOR).
append("HOST: ").append(url.getHost()).append(":").append(url.getPort()).append(LINE_TERMINATOR).
append("Content-Type: application/octet-stream").append(LINE_TERMINATOR).append("Transfer-Encoding: chunked").
append(LINE_TERMINATOR).append("Content-Transfer-Encoding: Binary").append(LINE_TERMINATOR).append("Connection: Keep-Alive").
append(LINE_TERMINATOR);
append("HOST: ").append(url.getHost()).append(":").append(url.getPort()).append(LINE_TERMINATOR).
append("Content-Type: application/octet-stream").append(LINE_TERMINATOR).append("Transfer-Encoding: chunked").
append(LINE_TERMINATOR).append("Content-Transfer-Encoding: Binary").append(LINE_TERMINATOR).append("Connection: Keep-Alive").
append(LINE_TERMINATOR);
if (reconnect) {
builder.append("Cookie: JSESSIONID=").append(sessionId).append(LINE_TERMINATOR);
}
builder.append(LINE_TERMINATOR);
String msg = builder.toString();
socket.getOutputStream().write(msg.getBytes("ASCII7"));
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while ((line = br.readLine()) != null) {
if (!reconnect) {
if (line.contains("Set-Cookie")) {
int start = line.indexOf("JSESSIONID=") + 11;
int end = line.indexOf(";", start);
sessionId = line.substring(start, end);
}
}
if (line.equals(LINE_TERMINATOR) || line.equals("")) {
break;
}
}
channel.write(ChannelBuffers.wrappedBuffer(msg.getBytes("ASCII7")));
}
public void sendChunk(ChannelBuffer a) throws IOException {
int size = a.readableBytes();
String hex = Integer.toHexString(size) + LINE_TERMINATOR;
try {
synchronized (writeLock) {
out.write(hex.getBytes());
a.getBytes(a.readerIndex(), out, a.readableBytes());
out.write(LINE_TERMINATOR.getBytes());
}
}
catch (SocketException e) {
if (closed) {
throw e;
}
if (lock.tryLock()) {
try {
connectAndSendHeaders(true, getRemoteAddress());
}
finally {
lock.unlock();
}
}
else {
try {
lock.lock();
}
finally {
lock.unlock();
}
}
// try {
synchronized (writeLock) {
a.writeBytes(LINE_TERMINATOR.getBytes());
channel.write(ChannelBuffers.wrappedBuffer(hex.getBytes()));
channel.write(a).awaitUninterruptibly();
//channel.write(ChannelBuffers.wrappedBuffer(LINE_TERMINATOR.getBytes()));
}
}
public byte[] receiveChunk() throws IOException {
byte[] buf;
byte[] buf = null;
try {
buf = read();
buf = messages.take();
}
catch (SocketException e) {
if (closed) {
throw e;
catch (InterruptedException e) {
e.printStackTrace();
}
return buf;
}
private void reConnect() throws Exception{
if (closed) {
throw new IllegalStateException("channel closed");
}
if (lock.tryLock()) {
if (reconnectLock.tryLock()) {
try {
connectAndSendHeaders(true, socket.getRemoteSocketAddress());
awaitingInitialResponse = true;
connectAndSendHeaders(true, channel.getRemoteAddress());
}
finally {
lock.unlock();
reconnectLock.unlock();
}
}
else {
try {
lock.lock();
reconnectLock.lock();
}
finally {
lock.unlock();
reconnectLock.unlock();
}
}
buf = read();
}
return buf;
}
private byte[] read() throws IOException {
//
byte[] buf;
StringBuffer hex = new StringBuffer();
int b;
while ((b = in.read()) != -1) {
if (b == 13) {
int end = in.read();
if (end != 10) {
in.unread(end);
}
break;
}
hex.append((char) b);
}
int bytesToRead = Integer.parseInt(hex.toString(), 16);
buf = new byte[bytesToRead];
if (in.available() >= bytesToRead) {
in.read(buf, 0, bytesToRead);
}
else {
int readBytes = 0;
do {
readBytes += in.read(buf, readBytes, bytesToRead - readBytes);
}
while (bytesToRead != readBytes);
}
int end = in.read();
if (end != 13) {
in.unread(end);
}
else {
end = in.read();
if (end != 10) {
in.unread(end);
}
}
return buf;
}
public void closeSocket() throws IOException {
setClosed();
closed = true;
socket.close();
setClosed();
closed = true;
channel.close();
}
public void bindSocket(SocketAddress localAddress) throws IOException {
socket.bind(localAddress);
channel.bind(localAddress);
}
@ChannelPipelineCoverage("one")
class ServletChannelHandler extends SimpleChannelHandler {
int nextChunkSize = -1;
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(0, bytes);
if (awaitingInitialResponse) {
String line = new String(bytes);
if (line.contains("Set-Cookie")) {
int start = line.indexOf("JSESSIONID=") + 11;
int end = line.indexOf(";", start);
sessionId = line.substring(start, end);
}
else if (line.equals("")) {
awaitingInitialResponse = false;
}
}
else {
if(nextChunkSize == -1) {
String hex = new String(bytes);
nextChunkSize = Integer.parseInt(hex, 16);
if(nextChunkSize == 0) {
if(!closed) {
nextChunkSize = -1;
awaitingInitialResponse = true;
reConnect();
}
}
}
else {
messages.put(bytes);
nextChunkSize = -1;
}
}
}
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
channel.close();
}
}
}

View File

@ -41,13 +41,15 @@ public class ServletClientSocketChannelFactory implements ClientSocketChannelFac
private final Executor workerExecutor;
private final ChannelSink sink;
private final URL url;
ClientSocketChannelFactory clientSocketChannelFactory;
/**
*
* @param workerExecutor
*/
public ServletClientSocketChannelFactory(Executor workerExecutor, URL url) {
public ServletClientSocketChannelFactory(ClientSocketChannelFactory clientSocketChannelFactory, Executor workerExecutor, URL url) {
this(url, workerExecutor, Runtime.getRuntime().availableProcessors());
this.clientSocketChannelFactory = clientSocketChannelFactory;
}
/**
@ -80,7 +82,7 @@ public class ServletClientSocketChannelFactory implements ClientSocketChannelFac
}
public SocketChannel newChannel(ChannelPipeline pipeline) {
return new ServletClientSocketChannel(this, pipeline, sink, url);
return new ServletClientSocketChannel(this, pipeline, sink, url, clientSocketChannelFactory);
}
public void releaseExternalResources() {

View File

@ -33,6 +33,7 @@ import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.socket.servlet.ServletClientSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
@ -86,7 +87,7 @@ import org.jboss.netty.handler.codec.string.StringEncoder;
public class ServletClientExample {
public static void main(String[] args) throws Exception {
URL url = new URL("http", "localhost", 8080, "/netty/nettyServlet");
ServletClientSocketChannelFactory factory = new ServletClientSocketChannelFactory(Executors.newCachedThreadPool(), url);
ServletClientSocketChannelFactory factory = new ServletClientSocketChannelFactory(new OioClientSocketChannelFactory(Executors.newCachedThreadPool()), Executors.newCachedThreadPool(), url);
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.getPipeline().addLast("decoder", new StringDecoder());
bootstrap.getPipeline().addLast("encoder", new StringEncoder());

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.Collection;
import org.jboss.netty.buffer.ChannelBuffer;
@ -44,6 +45,7 @@ public class DefaultHttpMessage implements HttpMessage {
private final HttpVersion version;
private final Map<String, List<String>> headers = new TreeMap<String, List<String>>(caseIgnoringComparator);
private final Map<String, HttpCookie> cookies = new TreeMap<String, HttpCookie>(caseIgnoringComparator);
private ChannelBuffer content;
protected DefaultHttpMessage(final HttpVersion version) {
@ -140,6 +142,21 @@ public class DefaultHttpMessage implements HttpMessage {
return content;
}
public void addCookie(HttpCookie cookie) {
cookies.put(cookie.getName(), cookie);
}
public HttpCookie getCookie(String name) {
return cookies.get(name);
}
public Collection<HttpCookie> getCookies() {
return cookies.values();
}
public Collection<String> getCookieNames() {
return cookies.keySet();
}
private static final class CaseIgnoringComparator
implements Comparator<String>, Serializable {

View File

@ -58,6 +58,11 @@ class HttpCodecUtil {
*/
static final byte COLON = 58;
/**
* Semicolon ';'
*/
static final byte SEMICOLON = 59;
private HttpCodecUtil() {
super();
}

View File

@ -0,0 +1,45 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.handler.codec.http;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class HttpCookie {
private final String name;
private final String value;
public HttpCookie(String name, String value) {
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public String getValue() {
return value;
}
}

View File

@ -23,6 +23,7 @@ package org.jboss.netty.handler.codec.http;
import java.util.List;
import java.util.Set;
import java.util.Collection;
import org.jboss.netty.buffer.ChannelBuffer;
@ -64,4 +65,13 @@ public interface HttpMessage {
boolean isChunked();
void clearHeaders();
void addCookie(HttpCookie cookie);
HttpCookie getCookie(String name);
Collection<HttpCookie> getCookies();
Collection<String> getCookieNames();
}

View File

@ -258,6 +258,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<HttpMessageDec
}
protected abstract boolean isDecodingRequest();
protected abstract String getCookieHeaderName();
protected abstract void readInitial(ChannelBuffer buffer) throws Exception;
private int getChunkSize(String hex) {

View File

@ -23,9 +23,11 @@ package org.jboss.netty.handler.codec.http;
import static org.jboss.netty.buffer.ChannelBuffers.*;
import static org.jboss.netty.handler.codec.http.HttpCodecUtil.*;
import static org.jboss.netty.handler.codec.http.HttpCodecUtil.SEMICOLON;
import java.util.List;
import java.util.Set;
import java.util.Collection;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@ -55,6 +57,7 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder {
channel.getConfig().getBufferFactory());
encodeInitialLine(header, request);
encodeHeaders(header, request);
encodeCookies(header, request);
header.writeBytes(CRLF);
ChannelBuffer content = request.getContent();
@ -108,5 +111,24 @@ public abstract class HttpMessageEncoder extends OneToOneEncoder {
}
}
public void encodeCookies(ChannelBuffer buf, HttpMessage message) {
Collection<String> cookieNames = message.getCookieNames();
if(cookieNames.isEmpty()) {
return;
}
buf.writeBytes(getCookieHeaderName());
buf.writeByte(COLON);
buf.writeByte(SP);
for (String cookieName : cookieNames) {
buf.writeBytes(cookieName.getBytes());
buf.writeByte(EQUALS);
buf.writeBytes(message.getCookie(cookieName).getValue().getBytes());
buf.writeByte(SEMICOLON);
}
buf.writeBytes(CRLF);
}
public abstract byte[] getCookieHeaderName();
protected abstract void encodeInitialLine(ChannelBuffer buf, HttpMessage message) throws Exception;
}

View File

@ -54,4 +54,9 @@ public class HttpRequestDecoder extends HttpMessageDecoder {
protected boolean isDecodingRequest() {
return true;
}
@Override
protected String getCookieHeaderName() {
return "Cookie";
}
}

View File

@ -34,7 +34,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
* @version $Rev$, $Date$
*/
public class HttpRequestEncoder extends HttpMessageEncoder {
private static final byte[] COOKIE_HEADER = "Cookie".getBytes();
/**
* writes the initial line i.e. 'GET /path/to/file/index.html HTTP/1.0'
*/
@ -49,4 +49,7 @@ public class HttpRequestEncoder extends HttpMessageEncoder {
buf.writeBytes(CRLF);
}
public byte[] getCookieHeaderName() {
return COOKIE_HEADER;
}
}

View File

@ -53,4 +53,9 @@ public class HttpResponseDecoder extends HttpMessageDecoder {
protected boolean isDecodingRequest() {
return false;
}
@Override
protected String getCookieHeaderName() {
return "Set-Cookie";
}
}

View File

@ -34,6 +34,8 @@ import org.jboss.netty.buffer.ChannelBuffer;
* @version $Rev$, $Date$
*/
public class HttpResponseEncoder extends HttpMessageEncoder {
private static final byte[] COOKIE_HEADER = "Set Cookie: ".getBytes();
@Override
protected void encodeInitialLine(ChannelBuffer buf, HttpMessage message) {
HttpResponse response = (HttpResponse) message;
@ -44,4 +46,8 @@ public class HttpResponseEncoder extends HttpMessageEncoder {
buf.writeBytes(String.valueOf(response.getStatus().getReasonPhrase()).getBytes());
buf.writeBytes(CRLF);
}
public byte[] getCookieHeaderName() {
return COOKIE_HEADER;
}
}

View File

@ -58,6 +58,8 @@ public class NettyServletContextListener implements ServletContextListener {
context.getServletContext().setAttribute(RECONNECT_PROP, timeoutParam == null?DEFAULT_RECONNECT_TIMEOUT:Long.decode(timeoutParam.trim()));
String streaming = context.getServletContext().getInitParameter(STREAMING_PROP);
context.getServletContext().setAttribute(STREAMING_PROP, streaming == null?DEFAULT_IS_STREAMING: Boolean.valueOf(streaming.trim()));
String serverChannel = context.getServletContext().getInitParameter(SERVER_CHANNEL_PROP);
context.getServletContext().setAttribute(SERVER_CHANNEL_PROP, serverChannel);
}
public void contextDestroyed(ServletContextEvent context) {

View File

@ -62,7 +62,7 @@ public class NettySessionListener implements HttpSessionListener, ChannelHandler
return pipeline;
}
});
ChannelFuture future = bootstrap.connect(new LocalAddress("netty"));
ChannelFuture future = bootstrap.connect(new LocalAddress((String) session.getServletContext().getAttribute(SERVER_CHANNEL_PROP)));
future.awaitUninterruptibly();
final Channel ch = future.getChannel();
session.setAttribute(CHANNEL_PROP, ch);