Code cleanup

This commit is contained in:
Trustin Lee 2009-02-09 01:39:42 +00:00
parent e7aeffe14d
commit 8622b885b6
12 changed files with 211 additions and 246 deletions

View File

@ -21,14 +21,15 @@
*/
package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
import java.net.SocketAddress;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>

View File

@ -22,16 +22,8 @@
*/
package org.jboss.netty.channel.socket.servlet;
import org.jboss.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import static org.jboss.netty.channel.Channels.fireChannelOpen;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import static org.jboss.netty.channel.socket.servlet.ServletClientSocketPipelineSink.LINE_TERMINATOR;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.channel.socket.servlet.ServletClientSocketPipelineSink.*;
import java.io.BufferedReader;
import java.io.IOException;
@ -43,19 +35,24 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URL;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.SocketChannelConfig;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
class ServletClientSocketChannel extends AbstractChannel
implements org.jboss.netty.channel.socket.SocketChannel {
private static Logger log = Logger.getLogger(ServletClientSocketChannel.class);
private Lock lock = new ReentrantLock();
private final Lock lock = new ReentrantLock();
private final Object writeLock = new Object();

View File

@ -22,27 +22,20 @@
*/
package org.jboss.netty.channel.socket.servlet;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import static org.jboss.netty.channel.Channels.fireChannelBound;
import static org.jboss.netty.channel.Channels.fireExceptionCaught;
import static org.jboss.netty.channel.Channels.fireChannelConnected;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.logging.Logger;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PushbackInputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URL;
import java.util.concurrent.Executor;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
@ -50,7 +43,6 @@ import java.util.concurrent.Executor;
class ServletClientSocketPipelineSink extends AbstractChannelSink {
static String LINE_TERMINATOR = "\r\n";
private static Logger log = Logger.getLogger(ServletClientSocketPipelineSink.class);
private final Executor workerExecutor;
ServletClientSocketPipelineSink(Executor workerExecutor) {

View File

@ -22,6 +22,11 @@
*/
package org.jboss.netty.channel.socket.servlet;
import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.Map.Entry;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.HeapChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
@ -29,11 +34,6 @@ import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.util.ConversionUtil;
import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
import java.util.Map.Entry;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/

View File

@ -22,18 +22,12 @@
*/
package org.jboss.netty.channel.socket.servlet;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import static org.jboss.netty.channel.Channels.fireChannelClosed;
import static org.jboss.netty.channel.Channels.fireChannelDisconnected;
import static org.jboss.netty.channel.Channels.fireChannelInterestChanged;
import static org.jboss.netty.channel.Channels.fireChannelUnbound;
import static org.jboss.netty.channel.Channels.fireExceptionCaught;
import static org.jboss.netty.channel.Channels.fireMessageReceived;
import java.io.PushbackInputStream;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
@ -47,7 +41,6 @@ class ServletWorker implements Runnable {
public void run() {
channel.workerThread = Thread.currentThread();
final PushbackInputStream in = channel.getInputStream();
while (channel.isOpen()) {
synchronized (this) {

View File

@ -21,26 +21,23 @@
*/
package org.jboss.netty.example.local;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalClientChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.example.echo.EchoHandler;
import org.jboss.netty.example.echo.ThroughputMonitor;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import java.util.concurrent.Executors;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
@ -91,6 +88,7 @@ public class LocalExample {
@ChannelPipelineCoverage("all")
static class PrintHandler extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
String message = (String) msg;
System.out.println("received message back '" + message + "'");

View File

@ -21,11 +21,11 @@
*/
package org.jboss.netty.example.servlet;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.example.echo.EchoHandler;
/**

View File

@ -21,23 +21,22 @@
*/
package org.jboss.netty.example.servlet;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.socket.servlet.ServletClientSocketChannelFactory;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
/**
* make sure that the LocalTransportRegister bean is deployed along with the NettyServlet with the following web.xml
*
@ -121,6 +120,7 @@ public class ServletClientExample {
@ChannelPipelineCoverage("all")
static class PrintHandler extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
String message = (String) msg;
System.out.println("received message back '" + message + "'");

View File

@ -21,49 +21,51 @@
*/
package org.jboss.netty.servlet;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.MessageEvent;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.util.List;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.MessageEvent;
/**
* A servlet that acts as a proxy for a netty channel
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class NettyServlet extends HttpServlet
{
public class NettyServlet extends HttpServlet {
private static final long serialVersionUID = -872309493835745385L;
final static String CHANNEL_PROP = "channel";
final static String HANDLER_PROP = "handler";
protected void doRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
protected void doRequest(
HttpServletRequest request,
HttpServletResponse response) throws IOException {
HttpSession session = request.getSession();
Channel channel = (Channel) session.getAttribute(CHANNEL_PROP);
ServletChannelHandler handler = (ServletChannelHandler) session.getAttribute(HANDLER_PROP);
if (handler.isStreaming())
{
ServletChannelHandler handler =
(ServletChannelHandler) session.getAttribute(HANDLER_PROP);
if (handler.isStreaming()) {
streamResponse(request, response, session, handler, channel);
}
else
{
} else {
pollResponse(channel, request, response, session, handler);
}
}
private void streamResponse(final HttpServletRequest request, final HttpServletResponse response, HttpSession session, ServletChannelHandler handler, Channel channel) throws IOException
{
private void streamResponse(
final HttpServletRequest request,
final HttpServletResponse response, HttpSession session,
ServletChannelHandler handler, Channel channel) throws IOException {
response.setHeader("jsessionid", session.getId());
response.setHeader("Content-Type", "application/octet-stream");
@ -72,87 +74,68 @@ public class NettyServlet extends HttpServlet
response.getOutputStream().flush();
handler.setOutputStream(response.getOutputStream());
PushbackInputStream in = new PushbackInputStream(request.getInputStream());
do
{
try
{
PushbackInputStream in =
new PushbackInputStream(request.getInputStream());
do {
try {
ChannelBuffer buffer = read(in);
if (buffer == null)
{
if (buffer == null) {
break;
}
channel.write(buffer);
}
catch (IOException e)
{
} catch (IOException e) {
// this is ok, the client can reconnect.
break;
}
}
while (true);
} while (true);
if (!handler.awaitReconnect())
{
if (!handler.awaitReconnect()) {
channel.close();
}
}
private ChannelBuffer read(PushbackInputStream in) throws IOException
{
private ChannelBuffer read(PushbackInputStream in) throws IOException {
byte[] buf;
int readBytes;
do
{
do {
int bytesToRead = in.available();
if (bytesToRead > 0)
{
if (bytesToRead > 0) {
buf = new byte[bytesToRead];
readBytes = in.read(buf);
break;
}
else if (bytesToRead == 0)
{
} else if (bytesToRead == 0) {
int b = in.read();
if (b < 0 || in.available() < 0)
{
if (b < 0 || in.available() < 0) {
return null;
}
if (b == 13)
{
if (b == 13) {
in.read();
}
else
{
} else {
in.unread(b);
}
}
else
{
} else {
return null;
}
}
while (true);
} while (true);
ChannelBuffer buffer;
if (readBytes == buf.length)
{
if (readBytes == buf.length) {
buffer = ChannelBuffers.wrappedBuffer(buf);
}
else
{
} else {
// A rare case, but it sometimes happen.
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
}
return buffer;
}
private void pollResponse(Channel channel, HttpServletRequest request, HttpServletResponse response, HttpSession session, ServletChannelHandler handler) throws IOException
{
private void pollResponse(
Channel channel,
HttpServletRequest request,
HttpServletResponse response, HttpSession session,
ServletChannelHandler handler) throws IOException {
int length = request.getContentLength();
if (length > 0)
{
if (length > 0) {
byte[] bytes = new byte[length];
request.getInputStream().read(bytes);
ChannelBuffer cb = ChannelBuffers.copiedBuffer(bytes);
@ -161,39 +144,38 @@ public class NettyServlet extends HttpServlet
handler.setOutputStream(response.getOutputStream());
List<MessageEvent> buffers = handler.getAwaitingEvents();
length = 0;
if (buffers.size() > 0)
{
for (MessageEvent buffer : buffers)
{
length += ((ChannelBuffer)buffer.getMessage()).readableBytes();
if (buffers.size() > 0) {
for (MessageEvent buffer: buffers) {
length += ((ChannelBuffer) buffer.getMessage()).readableBytes();
}
}
response.setHeader("jsessionid", session.getId());
response.setContentLength(length);
response.setStatus(HttpServletResponse.SC_OK);
for (MessageEvent event : buffers)
{
for (MessageEvent event: buffers) {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
try {
response.getOutputStream().write(b);
event.getFuture().setSuccess();
}
catch (IOException e) {
} catch (IOException e) {
event.getFuture().setFailure(e);
}
}
}
protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException
{
@Override
protected void doGet(
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws ServletException, IOException {
doRequest(httpServletRequest, httpServletResponse);
}
protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException
{
@Override
protected void doPost(
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws ServletException, IOException {
doRequest(httpServletRequest, httpServletResponse);
}
}

View File

@ -21,13 +21,13 @@
*/
package org.jboss.netty.servlet;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannels;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
/**
* A context listener that creates a client bootstrap that uses a local channel factory. The local channel factory should
* already be registered before the contect is loaded.

View File

@ -21,23 +21,21 @@
*/
package org.jboss.netty.servlet;
import static org.jboss.netty.channel.Channels.*;
import static org.jboss.netty.servlet.NettyServlet.*;
import static org.jboss.netty.servlet.NettyServletContextListener.*;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import static org.jboss.netty.channel.Channels.pipeline;
import org.jboss.netty.channel.local.LocalAddress;
import static org.jboss.netty.servlet.NettyServletContextListener.BOOTSTRAP_PROP;
import static org.jboss.netty.servlet.NettyServletContextListener.STREAMING_PROP;
import static org.jboss.netty.servlet.NettyServletContextListener.RECONNECT_PROP;
import static org.jboss.netty.servlet.NettyServlet.CHANNEL_PROP;
import static org.jboss.netty.servlet.NettyServlet.HANDLER_PROP;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionListener;
/**
* A session listenor that uses the client bootstrap to create a channel.

View File

@ -21,16 +21,6 @@
*/
package org.jboss.netty.servlet;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -40,6 +30,17 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpSession;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* A channel handler taht proxies messages to the servlet output stream
*
@ -49,11 +50,11 @@ import java.util.concurrent.locks.ReentrantLock;
class ServletChannelHandler extends SimpleChannelHandler {
List<MessageEvent> awaitingEvents = new ArrayList<MessageEvent>();
private Lock reconnectLock = new ReentrantLock();
private final Lock reconnectLock = new ReentrantLock();
private Condition reconnectCondition = reconnectLock.newCondition();
private final Condition reconnectCondition = reconnectLock.newCondition();
private long reconnectTimeout;
private final long reconnectTimeout;
boolean connected = false;
@ -71,6 +72,7 @@ class ServletChannelHandler extends SimpleChannelHandler {
this.reconnectTimeout = reconnectTimeout;
}
@Override
public synchronized void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
@ -114,6 +116,7 @@ class ServletChannelHandler extends SimpleChannelHandler {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (invalidated.compareAndSet(false, true)) {
session.invalidate();
@ -121,6 +124,7 @@ class ServletChannelHandler extends SimpleChannelHandler {
e.getChannel().close();
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if (invalidated.compareAndSet(false, true)) {
session.invalidate();