* Merged Andy's Local transport

* Merged Andy's HTTP tunnel
* Both needs some tidying up, but seems to work OK
This commit is contained in:
Trustin Lee 2009-02-09 01:31:50 +00:00
parent 9445fd1b6f
commit e7aeffe14d
22 changed files with 2579 additions and 1 deletions

11
pom.xml
View File

@ -67,7 +67,16 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!-- Servlet API - completely optional -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!-- IoC/DI containers - completely optional -->
<dependency>
<groupId>org.jboss.microcontainer</groupId>

View File

@ -0,0 +1,68 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import java.net.SocketAddress;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author Trustin Lee (tlee@redhat.com)
*/
public class LocalAddress extends SocketAddress implements Comparable<LocalAddress> {
private static final long serialVersionUID = -3601961747680808645L;
private final String id;
public LocalAddress(String id) {
if (id == null) {
throw new NullPointerException("id");
}
this.id = id;
}
public String getId() {
return id;
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof LocalAddress)) {
return false;
}
return getId().equals(((LocalAddress) o).getId());
}
public int compareTo(LocalAddress o) {
return getId().compareTo(o.getId());
}
@Override
public String toString() {
return getId();
}
}

View File

@ -0,0 +1,98 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import java.util.Queue;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.LinkedTransferQueue;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author Trustin Lee (tlee@redhat.com)
*/
public class LocalChannel extends AbstractChannel {
//final BlockingQueue<MessageEvent> writeBuffer = new LinkedBlockingQueue<MessageEvent>();
private final ThreadLocal<Boolean> delivering = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
volatile LocalChannel pairedChannel = null;
private final LocalChannelConfig config;
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
protected LocalChannel(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) {
super(null, factory, pipeline, sink);
config = new LocalChannelConfig();
fireChannelOpen(this);
}
public LocalChannelConfig getConfig() {
return config;
}
public boolean isBound() {
return true;
}
public boolean isConnected() {
return true;
}
public LocalAddress getLocalAddress() {
// FIXME: should return LocalAddress
return null;
}
public LocalAddress getRemoteAddress() {
// FIXME: should return LocalAddress
return null;
}
void writeNow(LocalChannel pairedChannel) {
if (!delivering.get()) {
delivering.set(true);
try {
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
}
e.getFuture().setSuccess();
fireMessageReceived(pairedChannel, e.getMessage());
}
} finally {
delivering.set(false);
}
}
}
}

View File

@ -0,0 +1,95 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelPipelineFactory;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author Trustin Lee (tlee@redhat.com)
*/
public class LocalChannelConfig implements ChannelConfig {
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
private volatile ChannelPipelineFactory pipelineFactory;
public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue());
}
}
/**
* Sets an individual option. You can override this method to support
* additional configuration parameters.
*/
protected boolean setOption(String key, Object value) {
if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
} else if (key.equals("bufferFactory")) {
setBufferFactory((ChannelBufferFactory) value);
} else {
return false;
}
return true;
}
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory;
}
public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory;
}
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
this.pipelineFactory = pipelineFactory;
}
public int getConnectTimeoutMillis() {
return 0;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
// Unused
}
@Deprecated
public int getWriteTimeoutMillis() {
return 0;
}
@Deprecated
public void setWriteTimeoutMillis(int writeTimeoutMillis) {
// Unused
}
}

View File

@ -0,0 +1,48 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author Trustin Lee (tlee@redhat.com)
*/
public class LocalClientChannelFactory implements ChannelFactory {
private final ChannelSink sink;
public LocalClientChannelFactory(LocalServerChannelFactory serverFactory) {
sink = new LocalClientChannelSink(serverFactory.channel, serverFactory.sink);
}
public Channel newChannel(ChannelPipeline pipeline) {
return new LocalChannel(this, pipeline, sink);
}
public void releaseExternalResources() {
// No external resources.
}
}

View File

@ -0,0 +1,96 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
final class LocalClientChannelSink extends AbstractChannelSink {
private final Channel serverChannel;
private final ChannelSink serverSink;
LocalClientChannelSink(Channel channel, ChannelSink sink) {
serverChannel = channel;
serverSink = sink;
}
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
LocalChannel channel =
(LocalChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
future.setSuccess();
fireChannelDisconnected(channel);
fireChannelClosed(channel);
fireChannelDisconnected(channel.pairedChannel);
fireChannelClosed(channel.pairedChannel);
}
break;
case BOUND:
break;
case CONNECTED:
connect(channel, future, (LocalAddress) value);
break;
case INTEREST_OPS:
break;
}
}
else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
LocalChannel channel = (LocalChannel) event.getChannel();
channel.pairedChannel.writeBuffer.offer(event);
channel.pairedChannel.writeNow(channel.pairedChannel);
}
}
private void connect(LocalChannel channel, ChannelFuture future, LocalAddress localAddress) throws Exception {
future.setSuccess();
ChannelPipeline pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
LocalChannel acceptedChannel = new LocalChannel(serverChannel.getFactory(), pipeline, serverSink);
channel.pairedChannel = acceptedChannel;
acceptedChannel.pairedChannel = channel;
Channels.fireChannelConnected(channel, localAddress);
Channels.fireChannelConnected(acceptedChannel, localAddress);
}
}

View File

@ -0,0 +1,64 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
import java.net.SocketAddress;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class LocalServerChannel extends AbstractServerChannel {
final ChannelConfig channelConfig;
protected LocalServerChannel(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) {
super(factory, pipeline, sink);
channelConfig = new LocalChannelConfig();
fireChannelOpen(this);
}
public ChannelConfig getConfig() {
return channelConfig;
}
public boolean isBound() {
return true;
}
public boolean isConnected() {
return true;
}
public SocketAddress getLocalAddress() {
return null;
}
public SocketAddress getRemoteAddress() {
return null;
}
}

View File

@ -0,0 +1,53 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class LocalServerChannelFactory implements ChannelFactory {
private final String channelName;
ChannelSink sink;
Channel channel;
public LocalServerChannelFactory(String channelName) {
this.channelName = channelName;
sink = new LocalServerChannelSink();
}
public Channel newChannel(ChannelPipeline pipeline) {
if (channel == null) {
channel = new LocalServerChannel(this, pipeline, sink);
}
return channel;
}
public void releaseExternalResources() {
LocalServerChannels.unregisterServerChannel(channelName);
}
}

View File

@ -0,0 +1,103 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
final class LocalServerChannelSink extends AbstractChannelSink {
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
if (e.getChannel() instanceof LocalServerChannel) {
handleServerChannel(event);
}
else if (e.getChannel() instanceof LocalChannel) {
handleLocalChannel(event);
}
}
else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
LocalChannel channel = (LocalChannel) event.getChannel();
channel.pairedChannel.writeBuffer.offer(event);
channel.pairedChannel.writeNow(channel.pairedChannel);
}
}
private void handleLocalChannel(ChannelStateEvent event) {
LocalChannel localChannel =
(LocalChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
// FIXME: Proper event emission.
case OPEN:
if (Boolean.FALSE.equals(value)) {
future.setSuccess();
fireChannelDisconnected(localChannel);
fireChannelUnbound(localChannel);
fireChannelClosed(localChannel);
fireChannelDisconnected(localChannel.pairedChannel);
fireChannelUnbound(localChannel.pairedChannel);
fireChannelClosed(localChannel.pairedChannel);
}
break;
case BOUND:
break;
}
}
private void handleServerChannel(ChannelStateEvent event) {
LocalServerChannel serverChannel =
(LocalServerChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
break;
case BOUND:
if (value != null) {
bind(future, serverChannel);
}
break;
}
}
private void bind(ChannelFuture future, LocalServerChannel serverChannel) {
future.setSuccess();
fireChannelBound(serverChannel, serverChannel.getLocalAddress());
}
}

View File

@ -0,0 +1,55 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import java.util.HashMap;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author Trustin Lee (tlee@redhat.com)
*/
public final class LocalServerChannels {
private static final Map<String, LocalServerChannelFactory> factoryMap = new HashMap<String, LocalServerChannelFactory>();
public static LocalServerChannelFactory registerServerChannel(String channelName) {
if (factoryMap.keySet().contains(channelName)) {
throw new IllegalArgumentException("server channel already registered: " + channelName);
}
LocalServerChannelFactory factory = new LocalServerChannelFactory(channelName);
factoryMap.put(channelName, factory);
return factory;
}
public static LocalClientChannelFactory getClientChannelFactory(String channelName) {
LocalServerChannelFactory localServerChannelFactory = factoryMap.get(channelName);
return localServerChannelFactory == null?null:new LocalClientChannelFactory(localServerChannelFactory);
}
public static void unregisterServerChannel(String channelName) {
factoryMap.remove(channelName);
}
private LocalServerChannels() {
// Unused.
}
}

View File

@ -0,0 +1,304 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.servlet;
import org.jboss.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import static org.jboss.netty.channel.socket.servlet.ServletClientSocketPipelineSink.LINE_TERMINATOR;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URL;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
class ServletClientSocketChannel extends AbstractChannel
implements org.jboss.netty.channel.socket.SocketChannel {
private static Logger log = Logger.getLogger(ServletClientSocketChannel.class);
private Lock lock = new ReentrantLock();
private final Object writeLock = new Object();
private Socket socket;
private ServletSocketChannelConfig config;
volatile Thread workerThread;
private volatile PushbackInputStream in;
private volatile OutputStream out;
private String sessionId;
private boolean closed = false;
private final URL url;
ServletClientSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink, URL url) {
super(null, factory, pipeline, sink);
this.url = url;
socket = new Socket();
config = new ServletSocketChannelConfig(socket);
fireChannelOpen(this);
}
public SocketChannelConfig getConfig() {
return config;
}
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) socket.getLocalSocketAddress();
}
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) socket.getRemoteSocketAddress();
}
public boolean isBound() {
return isOpen() && socket.isBound();
}
public boolean isConnected() {
return isOpen() && socket.isConnected();
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected void setInterestOpsNow(int interestOps) {
super.setInterestOpsNow(interestOps);
}
@Override
protected ChannelFuture getSucceededFuture() {
return super.getSucceededFuture();
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
}
else {
return getUnsupportedOperationFuture();
}
}
PushbackInputStream getInputStream() {
return in;
}
OutputStream getOutputStream() {
return out;
}
void connectAndSendHeaders(boolean reconnect, SocketAddress remoteAddress) throws IOException {
if (reconnect) {
System.out.println("reconnecting");
socket.close();
socket = new Socket();
config = config.copyConfig(socket);
}
socket.connect(
remoteAddress, getConfig().getConnectTimeoutMillis());
// Obtain I/O stream.
in = new PushbackInputStream(socket.getInputStream(), 1);
out = socket.getOutputStream();
//write and read headers
StringBuilder builder = new StringBuilder();
builder.append("POST ").append(url.toExternalForm()).append(" HTTP/1.1").append(LINE_TERMINATOR).
append("HOST: ").append(url.getHost()).append(":").append(url.getPort()).append(LINE_TERMINATOR).
append("Content-Type: application/octet-stream").append(LINE_TERMINATOR).append("Transfer-Encoding: chunked").
append(LINE_TERMINATOR).append("Content-Transfer-Encoding: Binary").append(LINE_TERMINATOR).append("Connection: Keep-Alive").
append(LINE_TERMINATOR);
if (reconnect) {
builder.append("Cookie: JSESSIONID=").append(sessionId).append(LINE_TERMINATOR);
}
builder.append(LINE_TERMINATOR);
String msg = builder.toString();
socket.getOutputStream().write(msg.getBytes("ASCII7"));
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line;
while ((line = br.readLine()) != null) {
if (!reconnect) {
if (line.contains("Set-Cookie")) {
int start = line.indexOf("JSESSIONID=") + 11;
int end = line.indexOf(";", start);
sessionId = line.substring(start, end);
}
}
if (line.equals(LINE_TERMINATOR) || line.equals("")) {
break;
}
}
}
public void sendChunk(ChannelBuffer a) throws IOException {
int size = a.readableBytes();
String hex = Integer.toHexString(size) + LINE_TERMINATOR;
try {
synchronized (writeLock) {
out.write(hex.getBytes());
a.getBytes(a.readerIndex(), out, a.readableBytes());
out.write(LINE_TERMINATOR.getBytes());
}
}
catch (SocketException e) {
if (closed) {
throw e;
}
lock.lock();
if (lock.tryLock()) {
try {
connectAndSendHeaders(true, getRemoteAddress());
}
finally {
lock.unlock();
}
}
else {
try {
lock.lock();
}
finally {
lock.unlock();
}
}
}
}
public byte[] receiveChunk() throws IOException {
byte[] buf;
try {
buf = read();
}
catch (SocketException e) {
if (closed) {
throw e;
}
if (lock.tryLock()) {
try {
connectAndSendHeaders(true, socket.getRemoteSocketAddress());
}
finally {
lock.unlock();
}
}
else {
try {
lock.lock();
}
finally {
lock.unlock();
}
}
buf = read();
}
return buf;
}
private byte[] read() throws IOException {
//
byte[] buf;
StringBuffer hex = new StringBuffer();
int b;
while ((b = in.read()) != -1) {
if (b == 13) {
int end = in.read();
if (end != 10) {
in.unread(end);
}
break;
}
hex.append((char) b);
}
int bytesToRead = Integer.parseInt(hex.toString(), 16);
buf = new byte[bytesToRead];
if (in.available() >= bytesToRead) {
in.read(buf, 0, bytesToRead);
}
else {
int readBytes = 0;
do {
readBytes += in.read(buf, readBytes, bytesToRead - readBytes);
}
while (bytesToRead != readBytes);
}
int end = in.read();
if (end != 13) {
in.unread(end);
}
else {
end = in.read();
if (end != 10) {
in.unread(end);
}
}
return buf;
}
public void closeSocket() throws IOException {
setClosed();
closed = true;
socket.close();
}
public void bindSocket(SocketAddress localAddress) throws IOException {
socket.bind(localAddress);
}
}

View File

@ -0,0 +1,87 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.servlet;
import java.net.URL;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.util.ExecutorUtil;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class ServletClientSocketChannelFactory implements ClientSocketChannelFactory {
private final Executor workerExecutor;
private final ChannelSink sink;
private final URL url;
/**
*
* @param workerExecutor
*/
public ServletClientSocketChannelFactory(Executor workerExecutor, URL url) {
this(url, workerExecutor, Runtime.getRuntime().availableProcessors());
}
/**
* Creates a new instance.
*
* the {@link java.util.concurrent.Executor} which will execute the boss thread
* @param url
* @param workerExecutor
* the {@link java.util.concurrent.Executor} which will execute the I/O worker threads
* @param workerCount
*/
public ServletClientSocketChannelFactory(
URL url, Executor workerExecutor,
int workerCount) {
if (url == null) {
throw new NullPointerException("Url is null");
}
this.url = url;
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
this.workerExecutor = workerExecutor;
sink = new ServletClientSocketPipelineSink(workerExecutor);
}
public SocketChannel newChannel(ChannelPipeline pipeline) {
return new ServletClientSocketChannel(this, pipeline, sink, url);
}
public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -0,0 +1,155 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.servlet;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import static org.jboss.netty.channel.Channels.fireChannelBound;
import static org.jboss.netty.channel.Channels.fireExceptionCaught;
import static org.jboss.netty.channel.Channels.fireChannelConnected;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.logging.Logger;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PushbackInputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URL;
import java.util.concurrent.Executor;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
class ServletClientSocketPipelineSink extends AbstractChannelSink {
static String LINE_TERMINATOR = "\r\n";
private static Logger log = Logger.getLogger(ServletClientSocketPipelineSink.class);
private final Executor workerExecutor;
ServletClientSocketPipelineSink(Executor workerExecutor) {
this.workerExecutor = workerExecutor;
}
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
ServletClientSocketChannel channel = (ServletClientSocketChannel) e.getChannel();
ChannelFuture future = e.getFuture();
if (e instanceof ChannelStateEvent) {
ChannelStateEvent stateEvent = (ChannelStateEvent) e;
ChannelState state = stateEvent.getState();
Object value = stateEvent.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
ServletWorker.close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
ServletWorker.close(channel, future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
ServletWorker.close(channel, future);
}
break;
case INTEREST_OPS:
ServletWorker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
ServletWorker.write(
channel, future,
((MessageEvent) e).getMessage());
}
}
private void bind(
ServletClientSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
try {
channel.bindSocket(localAddress);
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void connect(
ServletClientSocketChannel channel, ChannelFuture future,
SocketAddress remoteAddress) {
boolean bound = channel.isBound();
boolean connected = false;
boolean workerStarted = false;
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (future.isCancelled()) {
future.getChannel().close();
}
}
});
try {
channel.connectAndSendHeaders(false, remoteAddress);
// Fire events.
future.setSuccess();
if (!bound) {
fireChannelBound(channel, channel.getLocalAddress());
}
fireChannelConnected(channel, channel.getRemoteAddress());
// Start the business.
workerExecutor.execute(new ThreadRenamingRunnable(
new ServletWorker(channel),
"Old I/O client worker (channelId: " + channel.getId() + ", " +
channel.getLocalAddress() + " => " +
channel.getRemoteAddress() + ')'));
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (connected && !workerStarted) {
ServletWorker.close(channel, future);
}
}
}
}

View File

@ -0,0 +1,351 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.servlet;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.util.ConversionUtil;
import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.Map.Entry;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class ServletSocketChannelConfig implements SocketChannelConfig {
final Socket socket;
private volatile ChannelBufferFactory bufferFactory = HeapChannelBufferFactory.getInstance();
private volatile int connectTimeoutMillis = 10000; // 10 seconds
private Integer trafficClass;
private Boolean tcpNoDelay;
private Integer soLinger;
private Integer sendBufferSize;
private Boolean reuseAddress;
private Integer receiveBufferSize;
private Integer connectionTime;
private Integer latency;
private Integer bandwidth;
private Boolean keepAlive;
/**
* Creates a new instance.
*/
public ServletSocketChannelConfig(Socket socket) {
this.socket = socket;
}
public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e : options.entrySet()) {
setOption(e.getKey(), e.getValue());
}
}
/**
* Sets an individual option. You can override this method to support
* additional configuration parameters.
*/
protected boolean setOption(String key, Object value) {
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
}
else if (key.equals("sendBufferSize")) {
setSendBufferSize(ConversionUtil.toInt(value));
}
else if (key.equals("tcpNoDelay")) {
setTcpNoDelay(ConversionUtil.toBoolean(value));
}
else if (key.equals("keepAlive")) {
setKeepAlive(ConversionUtil.toBoolean(value));
}
else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
}
else if (key.equals("soLinger")) {
setSoLinger(ConversionUtil.toInt(value));
}
else if (key.equals("trafficClass")) {
setTrafficClass(ConversionUtil.toInt(value));
}
else if (key.equals("writeTimeoutMillis")) {
setWriteTimeoutMillis(ConversionUtil.toInt(value));
}
else if (key.equals("connectTimeoutMillis")) {
setConnectTimeoutMillis(ConversionUtil.toInt(value));
}
else if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
}
else if (key.equals("bufferFactory")) {
setBufferFactory((ChannelBufferFactory) value);
}
else {
return false;
}
return true;
}
public int getReceiveBufferSize() {
try {
return socket.getReceiveBufferSize();
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public int getSendBufferSize() {
try {
return socket.getSendBufferSize();
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public int getSoLinger() {
try {
return socket.getSoLinger();
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public int getTrafficClass() {
try {
return socket.getTrafficClass();
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public boolean isKeepAlive() {
try {
return socket.getKeepAlive();
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public boolean isReuseAddress() {
try {
return socket.getReuseAddress();
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public boolean isTcpNoDelay() {
try {
return socket.getTcpNoDelay();
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
try {
socket.setKeepAlive(keepAlive);
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public void setPerformancePreferences(
int connectionTime, int latency, int bandwidth) {
this.connectionTime = connectionTime;
this.latency = latency;
this.bandwidth = bandwidth;
socket.setPerformancePreferences(connectionTime, latency, bandwidth);
}
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
try {
socket.setReceiveBufferSize(receiveBufferSize);
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public void setReuseAddress(boolean reuseAddress) {
this.reuseAddress = reuseAddress;
try {
socket.setReuseAddress(reuseAddress);
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
if (socket != null) {
try {
socket.setSendBufferSize(sendBufferSize);
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
}
public void setSoLinger(int soLinger) {
this.soLinger = soLinger;
try {
if (soLinger < 0) {
socket.setSoLinger(false, 0);
}
else {
socket.setSoLinger(true, soLinger);
}
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
try {
socket.setTcpNoDelay(tcpNoDelay);
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public void setTrafficClass(int trafficClass) {
this.trafficClass = trafficClass;
try {
socket.setTrafficClass(trafficClass);
}
catch (SocketException e) {
throw new ChannelException(e);
}
}
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
if (bufferFactory == null) {
throw new NullPointerException("bufferFactory");
}
this.bufferFactory = bufferFactory;
}
public ChannelPipelineFactory getPipelineFactory() {
return null;
}
public int getWriteTimeoutMillis() {
return 0;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
if (connectTimeoutMillis < 0) {
throw new IllegalArgumentException("connectTimeoutMillis: " + connectTimeoutMillis);
}
this.connectTimeoutMillis = connectTimeoutMillis;
}
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
// Unused
}
public void setWriteTimeoutMillis(int writeTimeoutMillis) {
// Unused
}
ServletSocketChannelConfig copyConfig(Socket socket) {
ServletSocketChannelConfig config = new ServletSocketChannelConfig(socket);
config.setConnectTimeoutMillis(connectTimeoutMillis);
if (trafficClass != null) {
config.setTrafficClass(trafficClass);
}
if (tcpNoDelay != null) {
config.setTcpNoDelay(tcpNoDelay);
}
if (soLinger != null) {
config.setSoLinger(soLinger);
}
if (sendBufferSize != null) {
config.setSendBufferSize(sendBufferSize);
}
if (reuseAddress != null) {
config.setReuseAddress(reuseAddress);
}
if (receiveBufferSize != null) {
config.setReceiveBufferSize(receiveBufferSize);
}
if (keepAlive != null) {
config.setKeepAlive(keepAlive);
}
if (connectionTime != null) {
config.setPerformancePreferences(connectionTime, latency, bandwidth);
}
return config;
}
}

View File

@ -0,0 +1,172 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.socket.servlet;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import static org.jboss.netty.channel.Channels.fireChannelClosed;
import static org.jboss.netty.channel.Channels.fireChannelDisconnected;
import static org.jboss.netty.channel.Channels.fireChannelInterestChanged;
import static org.jboss.netty.channel.Channels.fireChannelUnbound;
import static org.jboss.netty.channel.Channels.fireExceptionCaught;
import static org.jboss.netty.channel.Channels.fireMessageReceived;
import java.io.PushbackInputStream;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
class ServletWorker implements Runnable {
private final ServletClientSocketChannel channel;
ServletWorker(ServletClientSocketChannel channel) {
this.channel = channel;
}
public void run() {
channel.workerThread = Thread.currentThread();
final PushbackInputStream in = channel.getInputStream();
while (channel.isOpen()) {
synchronized (this) {
while (!channel.isReadable()) {
try {
// notify() is not called at all.
// close() and setInterestOps() calls Thread.interrupt()
this.wait();
}
catch (InterruptedException e) {
if (!channel.isOpen()) {
break;
}
}
}
}
byte[] buf;
try {
buf = channel.receiveChunk();
}
catch (Throwable t) {
if (!channel.isOpen()) {
fireExceptionCaught(channel, t);
}
break;
}
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(buf);
fireMessageReceived(channel, buffer);
}
// Setting the workerThread to null will prevent any channel
// operations from interrupting this thread from now on.
channel.workerThread = null;
// Clean up.
close(channel, channel.getSucceededFuture());
}
static void write(
ServletClientSocketChannel channel, ChannelFuture future,
Object message) {
try {
channel.sendChunk((ChannelBuffer) message);
future.setSuccess();
}
catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
static void setInterestOps(
ServletClientSocketChannel channel, ChannelFuture future, int interestOps) {
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false;
try {
if (channel.getInterestOps() != interestOps) {
if ((interestOps & Channel.OP_READ) != 0) {
channel.setInterestOpsNow(Channel.OP_READ);
}
else {
channel.setInterestOpsNow(Channel.OP_NONE);
}
changed = true;
}
future.setSuccess();
if (changed) {
// Notify the worker so it stops or continues reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
channel.setInterestOpsNow(interestOps);
fireChannelInterestChanged(channel);
}
}
catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
static void close(ServletClientSocketChannel channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
try {
channel.closeSocket();
future.setSuccess();
if (channel.setClosed()) {
if (connected) {
// Notify the worker so it stops reading.
Thread currentThread = Thread.currentThread();
Thread workerThread = channel.workerThread;
if (workerThread != null && currentThread != workerThread) {
workerThread.interrupt();
}
fireChannelDisconnected(channel);
}
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
}
}
catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
}

View File

@ -0,0 +1,100 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.example.local;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalClientChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.example.echo.EchoHandler;
import org.jboss.netty.example.echo.ThroughputMonitor;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import java.util.concurrent.Executors;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class LocalExample {
public static void main(String[] args) throws Exception {
LocalServerChannelFactory factory = LocalServerChannels.registerServerChannel("localChannel");
ServerBootstrap bootstrap = new ServerBootstrap(factory);
EchoHandler handler = new EchoHandler();
LocalAddress socketAddress = new LocalAddress("1");
bootstrap.getPipeline().addLast("handler", handler);
bootstrap.bind(socketAddress);
ChannelFactory channelFactory = LocalServerChannels.getClientChannelFactory("localChannel");
ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
clientBootstrap.getPipeline().addLast("decoder", new StringDecoder());
clientBootstrap.getPipeline().addLast("encoder", new StringEncoder());
clientBootstrap.getPipeline().addLast("handler", new PrintHandler());
ChannelFuture channelFuture = clientBootstrap.connect(socketAddress);
channelFuture.awaitUninterruptibly();
System.out.println("Enter text (quit to end)");
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (; ;) {
String line = in.readLine();
if (line == null || "quit".equalsIgnoreCase(line)) {
break;
}
// Sends the received line to the server.
lastWriteFuture = channelFuture.getChannel().write(line);
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
channelFuture.getChannel().close();
// Wait until the connection is closed or the connection attempt fails.
channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
channelFactory.releaseExternalResources();
factory.releaseExternalResources();
}
@ChannelPipelineCoverage("all")
static class PrintHandler extends OneToOneDecoder {
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
String message = (String) msg;
System.out.println("received message back '" + message + "'");
return null;
}
}
}

View File

@ -0,0 +1,52 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.example.servlet;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.example.echo.EchoHandler;
/**
* you can deploy this in jboss 5 by adding the following bean.
*
* <bean name="org.jboss.netty.example.servlet.LocalTransportRegister"
class="org.jboss.netty.example.servlet.LocalTransportRegister" />
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class LocalTransportRegister {
public void start() {
LocalServerChannelFactory serverChannelFactory = LocalServerChannels.registerServerChannel("org.jboss.netty.exampleChannel");
ServerBootstrap serverBootstrap = new ServerBootstrap(serverChannelFactory);
EchoHandler handler = new EchoHandler();
serverBootstrap.getPipeline().addLast("handler", handler);
Channel channel = serverBootstrap.bind(new LocalAddress("localAddress"));
}
public void stop() {
LocalServerChannels.unregisterServerChannel("org.jboss.netty.exampleChannel");
}
}

View File

@ -0,0 +1,130 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.example.servlet;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.socket.servlet.ServletClientSocketChannelFactory;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* make sure that the LocalTransportRegister bean is deployed along with the NettyServlet with the following web.xml
*
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd"
version="2.4">
<!--the name of the channel, this should be a registered local channel. see LocalTransportRegister-->
<context-param>
<param-name>serverChannelName</param-name>
<param-value>org.jboss.netty.exampleChannel</param-value>
</context-param>
<!--Whether or not we are streaming or just polling using normal http requests-->
<context-param>
<param-name>streaming</param-name>
<param-value>true</param-value>
</context-param>
<!--how long to wait for a client reconnecting in milliseconds-->
<context-param>
<param-name>reconnectTimeout</param-name>
<param-value>3000</param-value>
</context-param>
<listener>
<listener-class>org.jboss.netty.servlet.NettySessionListener</listener-class>
</listener>
<listener>
<listener-class>org.jboss.netty.servlet.NettyServletContextListener</listener-class>
</listener>
<servlet>
<servlet-name>NettyServlet</servlet-name>
<servlet-class>org.jboss.netty.servlet.NettyServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>NettyServlet</servlet-name>
<url-pattern>/nettyServlet</url-pattern>
</servlet-mapping>
</web-app>
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class ServletClientExample {
public static void main(String[] args) throws Exception {
URL url = new URL("http", "localhost", 8080, "/netty/nettyServlet");
ServletClientSocketChannelFactory factory = new ServletClientSocketChannelFactory(Executors.newCachedThreadPool(), url);
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.getPipeline().addLast("decoder", new StringDecoder());
bootstrap.getPipeline().addLast("encoder", new StringEncoder());
bootstrap.getPipeline().addLast("handler", new PrintHandler());
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("localhost", 8080));
channelFuture.awaitUninterruptibly();
System.out.println("Enter text (quit to end)");
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (; ;) {
String line = in.readLine();
if (line == null || "quit".equalsIgnoreCase(line)) {
break;
}
// Sends the received line to the server.
lastWriteFuture = channelFuture.getChannel().write(line);
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
channelFuture.getChannel().close();
// Wait until the connection is closed or the connection attempt fails.
channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
factory.releaseExternalResources();
}
@ChannelPipelineCoverage("all")
static class PrintHandler extends OneToOneDecoder {
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
String message = (String) msg;
System.out.println("received message back '" + message + "'");
return null;
}
}
}

View File

@ -0,0 +1,199 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.servlet;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.MessageEvent;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.util.List;
/**
* A servlet that acts as a proxy for a netty channel
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class NettyServlet extends HttpServlet
{
final static String CHANNEL_PROP = "channel";
final static String HANDLER_PROP = "handler";
protected void doRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
HttpSession session = request.getSession();
Channel channel = (Channel) session.getAttribute(CHANNEL_PROP);
ServletChannelHandler handler = (ServletChannelHandler) session.getAttribute(HANDLER_PROP);
if (handler.isStreaming())
{
streamResponse(request, response, session, handler, channel);
}
else
{
pollResponse(channel, request, response, session, handler);
}
}
private void streamResponse(final HttpServletRequest request, final HttpServletResponse response, HttpSession session, ServletChannelHandler handler, Channel channel) throws IOException
{
response.setHeader("jsessionid", session.getId());
response.setHeader("Content-Type", "application/octet-stream");
response.setContentLength(-1);
response.setStatus(HttpServletResponse.SC_OK);
response.getOutputStream().flush();
handler.setOutputStream(response.getOutputStream());
PushbackInputStream in = new PushbackInputStream(request.getInputStream());
do
{
try
{
ChannelBuffer buffer = read(in);
if (buffer == null)
{
break;
}
channel.write(buffer);
}
catch (IOException e)
{
// this is ok, the client can reconnect.
break;
}
}
while (true);
if (!handler.awaitReconnect())
{
channel.close();
}
}
private ChannelBuffer read(PushbackInputStream in) throws IOException
{
byte[] buf;
int readBytes;
do
{
int bytesToRead = in.available();
if (bytesToRead > 0)
{
buf = new byte[bytesToRead];
readBytes = in.read(buf);
break;
}
else if (bytesToRead == 0)
{
int b = in.read();
if (b < 0 || in.available() < 0)
{
return null;
}
if (b == 13)
{
in.read();
}
else
{
in.unread(b);
}
}
else
{
return null;
}
}
while (true);
ChannelBuffer buffer;
if (readBytes == buf.length)
{
buffer = ChannelBuffers.wrappedBuffer(buf);
}
else
{
// A rare case, but it sometimes happen.
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
}
return buffer;
}
private void pollResponse(Channel channel, HttpServletRequest request, HttpServletResponse response, HttpSession session, ServletChannelHandler handler) throws IOException
{
int length = request.getContentLength();
if (length > 0)
{
byte[] bytes = new byte[length];
request.getInputStream().read(bytes);
ChannelBuffer cb = ChannelBuffers.copiedBuffer(bytes);
channel.write(cb);
}
handler.setOutputStream(response.getOutputStream());
List<MessageEvent> buffers = handler.getAwaitingEvents();
length = 0;
if (buffers.size() > 0)
{
for (MessageEvent buffer : buffers)
{
length += ((ChannelBuffer)buffer.getMessage()).readableBytes();
}
}
response.setHeader("jsessionid", session.getId());
response.setContentLength(length);
response.setStatus(HttpServletResponse.SC_OK);
for (MessageEvent event : buffers)
{
ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
try {
response.getOutputStream().write(b);
event.getFuture().setSuccess();
}
catch (IOException e) {
event.getFuture().setFailure(e);
}
}
}
protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException
{
doRequest(httpServletRequest, httpServletResponse);
}
protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException
{
doRequest(httpServletRequest, httpServletResponse);
}
}

View File

@ -0,0 +1,77 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.servlet;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannels;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
/**
* A context listener that creates a client bootstrap that uses a local channel factory. The local channel factory should
* already be registered before the contect is loaded.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class NettyServletContextListener implements ServletContextListener {
private static final long DEFAULT_RECONNECT_TIMEOUT = 5000;
private static final boolean DEFAULT_IS_STREAMING = true;
static final String SERVER_CHANNEL_PROP = "serverChannelName";
static final String RECONNECT_PROP = "reconnectTimeout";
static final String STREAMING_PROP = "streaming";
static final String BOOTSTRAP_PROP = "bootstrap";
public void contextInitialized(ServletContextEvent context) {
String channelName = context.getServletContext().getInitParameter(SERVER_CHANNEL_PROP);
if (channelName != null) {
String name = channelName.trim();
ChannelFactory channelFactory = LocalServerChannels.getClientChannelFactory(name);
if (channelFactory != null) {
context.getServletContext().setAttribute(BOOTSTRAP_PROP, new ClientBootstrap(channelFactory));
}
else {
throw new IllegalArgumentException("channel factory " + name + " not registered");
}
String timeoutParam = context.getServletContext().getInitParameter(RECONNECT_PROP);
context.getServletContext().setAttribute(RECONNECT_PROP, timeoutParam == null?DEFAULT_RECONNECT_TIMEOUT:Long.decode(timeoutParam.trim()));
String streaming = context.getServletContext().getInitParameter(STREAMING_PROP);
context.getServletContext().setAttribute(STREAMING_PROP, streaming == null?DEFAULT_IS_STREAMING: Boolean.valueOf(streaming.trim()));
}
}
public void contextDestroyed(ServletContextEvent context) {
}
}

View File

@ -0,0 +1,77 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.servlet;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import static org.jboss.netty.channel.Channels.pipeline;
import org.jboss.netty.channel.local.LocalAddress;
import static org.jboss.netty.servlet.NettyServletContextListener.BOOTSTRAP_PROP;
import static org.jboss.netty.servlet.NettyServletContextListener.STREAMING_PROP;
import static org.jboss.netty.servlet.NettyServletContextListener.RECONNECT_PROP;
import static org.jboss.netty.servlet.NettyServlet.CHANNEL_PROP;
import static org.jboss.netty.servlet.NettyServlet.HANDLER_PROP;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionListener;
/**
* A session listenor that uses the client bootstrap to create a channel.
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class NettySessionListener implements HttpSessionListener, ChannelHandler {
public void sessionCreated(HttpSessionEvent event) {
HttpSession session = event.getSession();
ClientBootstrap bootstrap = (ClientBootstrap) session.getServletContext().getAttribute(BOOTSTRAP_PROP);
Boolean streaming = (Boolean) session.getServletContext().getAttribute(STREAMING_PROP);
if(streaming) {
session.setMaxInactiveInterval(-1);
}
final ServletChannelHandler handler = new ServletChannelHandler(streaming, session, (Long) session.getServletContext().getAttribute(RECONNECT_PROP));
session.setAttribute(HANDLER_PROP, handler);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast(NettySessionListener.class.getName(), handler);
return pipeline;
}
});
ChannelFuture future = bootstrap.connect(new LocalAddress("netty"));
future.awaitUninterruptibly();
final Channel ch = future.getChannel();
session.setAttribute(CHANNEL_PROP, ch);
}
public void sessionDestroyed(HttpSessionEvent event) {
Channel channel = (Channel) event.getSession().getAttribute(CHANNEL_PROP);
if (channel != null) {
channel.close();
}
}
}

View File

@ -0,0 +1,185 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.servlet;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A channel handler taht proxies messages to the servlet output stream
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
@ChannelPipelineCoverage("one")
class ServletChannelHandler extends SimpleChannelHandler {
List<MessageEvent> awaitingEvents = new ArrayList<MessageEvent>();
private Lock reconnectLock = new ReentrantLock();
private Condition reconnectCondition = reconnectLock.newCondition();
private long reconnectTimeout;
boolean connected = false;
AtomicBoolean invalidated = new AtomicBoolean(false);
private ServletOutputStream outputStream;
final boolean stream;
private final HttpSession session;
public ServletChannelHandler(boolean stream, HttpSession session, long reconnectTimeout) {
this.stream = stream;
this.session = session;
this.reconnectTimeout = reconnectTimeout;
}
public synchronized void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
if (stream) {
reconnectLock.lock();
if (outputStream == null) {
awaitingEvents.add(e);
return;
}
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
try {
outputStream.write(b);
outputStream.flush();
e.getFuture().setSuccess();
}
catch (IOException e1) {
connected = false;
reconnectCondition.await(reconnectTimeout, TimeUnit.MILLISECONDS);
if (connected) {
outputStream.write(b);
outputStream.flush();
e.getFuture().setSuccess();
}
else {
e.getFuture().setFailure(e1);
if (invalidated.compareAndSet(false, true)) {
session.invalidate();
}
e.getChannel().close();
}
}
finally {
reconnectLock.unlock();
}
}
else {
awaitingEvents.add(e);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (invalidated.compareAndSet(false, true)) {
session.invalidate();
}
e.getChannel().close();
}
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if (invalidated.compareAndSet(false, true)) {
session.invalidate();
}
}
public synchronized List<MessageEvent> getAwaitingEvents() {
List<MessageEvent> list = new ArrayList<MessageEvent>();
list.addAll(awaitingEvents);
awaitingEvents.clear();
return list;
}
public void setOutputStream(ServletOutputStream outputStream) {
reconnectLock.lock();
try {
this.outputStream = outputStream;
connected = true;
for (MessageEvent awaitingEvent : awaitingEvents) {
ChannelBuffer buffer = (ChannelBuffer) awaitingEvent.getMessage();
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
try {
outputStream.write(b);
outputStream.flush();
awaitingEvent.getFuture().setSuccess();
}
catch (IOException e) {
awaitingEvent.getFuture().setFailure(e);
}
}
reconnectCondition.signalAll();
}
finally {
reconnectLock.unlock();
}
}
public boolean isStreaming() {
return stream;
}
public ServletOutputStream getOutputStream() {
return outputStream;
}
public boolean awaitReconnect() {
reconnectLock.lock();
connected = false;
try {
reconnectCondition.await(reconnectTimeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
return connected;
}
finally {
reconnectLock.unlock();
}
return connected;
}
}