Merge remote branch 'upstream/master'

This commit is contained in:
Jestan Nirojan 2011-10-10 03:34:46 +05:30
commit 8f25312be8
85 changed files with 1418 additions and 537 deletions

View File

@ -337,7 +337,7 @@ public class TimeServerHandler extends &SimpleChannelHandler; {
&Channel; ch = e.getChannel(); &Channel; ch = e.getChannel();
&ChannelBuffer; time = &ChannelBuffers;.buffer(4);<co id="example.time.co2"/> &ChannelBuffer; time = &ChannelBuffers;.buffer(4);<co id="example.time.co2"/>
time.writeInt(System.currentTimeMillis() / 1000); time.writeInt((int) (System.currentTimeMillis() / 1000));
&ChannelFuture; f = ch.write(time);<co id="example.time.co3"/> &ChannelFuture; f = ch.write(time);<co id="example.time.co3"/>

View File

@ -979,8 +979,8 @@ public class ChannelBuffers {
} }
for (int i = byteCount; i > 0; i --) { for (int i = byteCount; i > 0; i --) {
byte va = bufferA.getByte(aIndex); short va = bufferA.getUnsignedByte(aIndex);
byte vb = bufferB.getByte(bIndex); short vb = bufferB.getUnsignedByte(bIndex);
if (va > vb) { if (va > vb) {
return 1; return 1;
} else if (va < vb) { } else if (va < vb) {

View File

@ -32,7 +32,6 @@ import org.jboss.netty.util.internal.ConcurrentHashMap;
public abstract class AbstractChannel implements Channel { public abstract class AbstractChannel implements Channel {
static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>(); static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
private static final IdDeallocator ID_DEALLOCATOR = new IdDeallocator();
private static Integer allocateId(Channel channel) { private static Integer allocateId(Channel channel) {
Integer id = Integer.valueOf(System.identityHashCode(channel)); Integer id = Integer.valueOf(System.identityHashCode(channel));
@ -49,17 +48,6 @@ public abstract class AbstractChannel implements Channel {
} }
} }
private static final class IdDeallocator implements ChannelFutureListener {
IdDeallocator() {
super();
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
allChannels.remove(future.getChannel().getId());
}
}
private final Integer id; private final Integer id;
private final Channel parent; private final Channel parent;
private final ChannelFactory factory; private final ChannelFactory factory;
@ -94,7 +82,6 @@ public abstract class AbstractChannel implements Channel {
this.pipeline = pipeline; this.pipeline = pipeline;
id = allocateId(this); id = allocateId(this);
closeFuture.addListener(ID_DEALLOCATOR);
pipeline.attach(this, sink); pipeline.attach(this, sink);
} }
@ -200,6 +187,10 @@ public abstract class AbstractChannel implements Channel {
* closed yet * closed yet
*/ */
protected boolean setClosed() { protected boolean setClosed() {
// Deallocate the current channel's ID from allChannels so that other
// new channels can use it.
allChannels.remove(id);
return closeFuture.setClosed(); return closeFuture.setClosed();
} }

View File

@ -51,7 +51,7 @@ package org.jboss.netty.channel;
* public void login(String username, password) { * public void login(String username, password) {
* {@link Channels}.write( * {@link Channels}.write(
* <b>this.ctx</b>, * <b>this.ctx</b>,
* {@link Channels}.succeededFuture(<b>this.ctx</t>.getChannel()), * {@link Channels}.succeededFuture(<b>this.ctx</t>.getChannel()</b>),
* new LoginMessage(username, password)); * new LoginMessage(username, password));
* } * }
* ... * ...

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -488,6 +489,11 @@ public interface ChannelPipeline {
*/ */
boolean isAttached(); boolean isAttached();
/**
* Returns the {@link List} of the handler names.
*/
List<String> getNames();
/** /**
* Converts this pipeline into an ordered {@link Map} whose keys are * Converts this pipeline into an ordered {@link Map} whose keys are
* handler names and whose values are handlers. * handler names and whose values are handlers.

View File

@ -15,8 +15,10 @@
*/ */
package org.jboss.netty.channel; package org.jboss.netty.channel;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -510,6 +512,24 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return null; return null;
} }
@Override
public List<String> getNames() {
List<String> list = new ArrayList<String>();
if (name2ctx.isEmpty()) {
return list;
}
DefaultChannelHandlerContext ctx = head;
for (;;) {
list.add(ctx.getName());
ctx = ctx.next;
if (ctx == null) {
break;
}
}
return list;
}
@Override @Override
public Map<String, ChannelHandler> toMap() { public Map<String, ChannelHandler> toMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>(); Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
@ -590,9 +610,19 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) { void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try { try {
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e); ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
} catch (Throwable t) { } catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
e.getFuture().setFailure(t);
notifyHandlerException(e, t); notifyHandlerException(e, t);
} }
} }

View File

@ -48,12 +48,21 @@ import org.jboss.netty.util.ExternalResourceReleasable;
* transfer, sending a file with {@link FileRegion} might fail or yield worse * transfer, sending a file with {@link FileRegion} might fail or yield worse
* performance. For example, sending a large file doesn't work well in Windows. * performance. For example, sending a large file doesn't work well in Windows.
* *
* <h3>Not all transports support it</h3>
*
* Currently, the NIO transport is the only transport that supports {@link FileRegion}.
* Attempting to write a {@link FileRegion} to non-NIO {@link Channel} will trigger
* a {@link ClassCastException} or a similar exception.
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
*/ */
public interface FileRegion extends ExternalResourceReleasable { public interface FileRegion extends ExternalResourceReleasable {
// FIXME Make sure all transports support writing a FileRegion
// Even if zero copy cannot be achieved, all transports should emulate it.
/** /**
* Returns the offset in the file where the transfer began. * Returns the offset in the file where the transfer began.
*/ */

View File

@ -15,8 +15,10 @@
*/ */
package org.jboss.netty.channel; package org.jboss.netty.channel;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
@ -340,6 +342,15 @@ public class StaticChannelPipeline implements ChannelPipeline {
return null; return null;
} }
@Override
public List<String> getNames() {
List<String> list = new ArrayList<String>();
for (StaticChannelHandlerContext ctx: contexts) {
list.add(ctx.getName());
}
return list;
}
@Override @Override
public Map<String, ChannelHandler> toMap() { public Map<String, ChannelHandler> toMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>(); Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
@ -407,9 +418,19 @@ public class StaticChannelPipeline implements ChannelPipeline {
} }
void sendDownstream(StaticChannelHandlerContext ctx, ChannelEvent e) { void sendDownstream(StaticChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try { try {
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e); ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
} catch (Throwable t) { } catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
e.getFuture().setFailure(t);
notifyHandlerException(e, t); notifyHandlerException(e, t);
} }
} }

View File

@ -152,17 +152,18 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
@Override @Override
public synchronized boolean isPartialSuccess() { public synchronized boolean isPartialSuccess() {
return !futures.isEmpty() && successCount != 0; return successCount != 0 && successCount != futures.size();
} }
@Override @Override
public synchronized boolean isPartialFailure() { public synchronized boolean isPartialFailure() {
return !futures.isEmpty() && failureCount != 0; return failureCount != 0 && failureCount != futures.size();
} }
@Override @Override
public synchronized boolean isCompleteFailure() { public synchronized boolean isCompleteFailure() {
return failureCount == futures.size(); int futureCnt = futures.size();
return futureCnt != 0 && failureCount == futureCnt;
} }
@Override @Override

View File

@ -41,6 +41,7 @@ class AcceptedServerChannelPipelineFactory implements ChannelPipelineFactory {
this.messageSwitch = messageSwitch; this.messageSwitch = messageSwitch;
} }
@Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline(); ChannelPipeline pipeline = Channels.pipeline();

View File

@ -109,7 +109,7 @@ class AcceptedServerChannelRequestDispatch extends SimpleChannelUpstreamHandler
LOG.debug("send data request received for tunnel " + tunnelId); LOG.debug("send data request received for tunnel " + tunnelId);
} }
if (HttpHeaders.getContentLength(request) == 0 || if (HttpHeaders.getContentLength(request, 0) == 0 ||
request.getContent() == null || request.getContent() == null ||
request.getContent().readableBytes() == 0) { request.getContent().readableBytes() == 0) {
respondWithRejection(ctx, request, respondWithRejection(ctx, request,

View File

@ -47,6 +47,7 @@ class ChannelFutureAggregator implements ChannelFutureListener {
future.addListener(this); future.addListener(this);
} }
@Override
public synchronized void operationComplete(ChannelFuture future) public synchronized void operationComplete(ChannelFuture future)
throws Exception { throws Exception {
if (future.isCancelled()) { if (future.isCancelled()) {

View File

@ -39,6 +39,7 @@ public class DefaultTunnelIdGenerator implements TunnelIdGenerator {
this.generator = generator; this.generator = generator;
} }
@Override
public synchronized String generateId() { public synchronized String generateId() {
// synchronized to ensure that this code is thread safe. The Sun // synchronized to ensure that this code is thread safe. The Sun
// standard implementations seem to be synchronized or lock free // standard implementations seem to be synchronized or lock free

View File

@ -62,36 +62,44 @@ class HttpTunnelAcceptedChannel extends AbstractChannel implements
fireChannelConnected(this, getRemoteAddress()); fireChannelConnected(this, getRemoteAddress());
} }
@Override
public SocketChannelConfig getConfig() { public SocketChannelConfig getConfig() {
return config; return config;
} }
@Override
public InetSocketAddress getLocalAddress() { public InetSocketAddress getLocalAddress() {
return ((HttpTunnelServerChannel) getParent()).getLocalAddress(); return ((HttpTunnelServerChannel) getParent()).getLocalAddress();
} }
@Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
return remoteAddress; return remoteAddress;
} }
@Override
public boolean isBound() { public boolean isBound() {
return sink.isActive(); return sink.isActive();
} }
@Override
public boolean isConnected() { public boolean isConnected() {
return sink.isActive(); return sink.isActive();
} }
@Override
public void clientClosed() { public void clientClosed() {
this.setClosed(); this.setClosed();
Channels.fireChannelClosed(this); Channels.fireChannelClosed(this);
} }
@Override
public void dataReceived(ChannelBuffer data) { public void dataReceived(ChannelBuffer data) {
Channels.fireMessageReceived(this, data); Channels.fireMessageReceived(this, data);
} }
@Override
public void updateInterestOps(SaturationStateChange transition) { public void updateInterestOps(SaturationStateChange transition) {
switch (transition) { switch (transition) {
case SATURATED: case SATURATED:

View File

@ -39,15 +39,15 @@ import org.jboss.netty.channel.MessageEvent;
*/ */
class HttpTunnelAcceptedChannelSink extends AbstractChannelSink { class HttpTunnelAcceptedChannelSink extends AbstractChannelSink {
private final SaturationManager saturationManager; final SaturationManager saturationManager;
private final ServerMessageSwitchDownstreamInterface messageSwitch; private final ServerMessageSwitchDownstreamInterface messageSwitch;
private final String tunnelId; private final String tunnelId;
private AtomicBoolean active = new AtomicBoolean(false); private final AtomicBoolean active = new AtomicBoolean(false);
private HttpTunnelAcceptedChannelConfig config; private final HttpTunnelAcceptedChannelConfig config;
public HttpTunnelAcceptedChannelSink( public HttpTunnelAcceptedChannelSink(
ServerMessageSwitchDownstreamInterface messageSwitch, ServerMessageSwitchDownstreamInterface messageSwitch,
@ -55,11 +55,12 @@ class HttpTunnelAcceptedChannelSink extends AbstractChannelSink {
this.messageSwitch = messageSwitch; this.messageSwitch = messageSwitch;
this.tunnelId = tunnelId; this.tunnelId = tunnelId;
this.config = config; this.config = config;
this.saturationManager = saturationManager =
new SaturationManager(config.getWriteBufferLowWaterMark(), new SaturationManager(config.getWriteBufferLowWaterMark(),
config.getWriteBufferHighWaterMark()); config.getWriteBufferHighWaterMark());
} }
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception { throws Exception {
if (e instanceof MessageEvent) { if (e instanceof MessageEvent) {

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.http;
import org.jboss.netty.channel.DefaultChannelConfig; import org.jboss.netty.channel.DefaultChannelConfig;
import org.jboss.netty.channel.socket.SocketChannelConfig; import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
/** /**
* Configuration for HTTP tunnels. Where possible, properties set on this configuration will * Configuration for HTTP tunnels. Where possible, properties set on this configuration will
@ -91,7 +92,7 @@ public abstract class HttpTunnelChannelConfig extends DefaultChannelConfig
* @param level the number of queued bytes required to flip {@link org.jboss.netty.channel.Channel#isWritable()} to * @param level the number of queued bytes required to flip {@link org.jboss.netty.channel.Channel#isWritable()} to
* false. * false.
* *
* @see {@link org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferHighWaterMark(int) NioSocketChannelConfig.setWriteBufferHighWaterMark()} * @see NioSocketChannelConfig#setWriteBufferHighWaterMark(int)
*/ */
public void setWriteBufferHighWaterMark(int level) { public void setWriteBufferHighWaterMark(int level) {
if (level <= writeBufferLowWaterMark) { if (level <= writeBufferLowWaterMark) {
@ -121,7 +122,7 @@ public abstract class HttpTunnelChannelConfig extends DefaultChannelConfig
* will return false until the buffer drops below this level. By creating a sufficient gap between the high and low * will return false until the buffer drops below this level. By creating a sufficient gap between the high and low
* water marks, rapid oscillation between "write enabled" and "write disabled" can be avoided. * water marks, rapid oscillation between "write enabled" and "write disabled" can be avoided.
* *
* @see {@link org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferLowWaterMark(int) NioSocketChannelConfig.setWriteBufferLowWaterMark()} * @see org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferLowWaterMark(int)
*/ */
public void setWriteBufferLowWaterMark(int level) { public void setWriteBufferLowWaterMark(int level) {
if (level >= writeBufferHighWaterMark) { if (level >= writeBufferHighWaterMark) {

View File

@ -48,33 +48,33 @@ import org.jboss.netty.logging.InternalLoggerFactory;
public class HttpTunnelClientChannel extends AbstractChannel implements public class HttpTunnelClientChannel extends AbstractChannel implements
SocketChannel { SocketChannel {
private static final InternalLogger LOG = InternalLoggerFactory static final InternalLogger LOG = InternalLoggerFactory
.getInstance(HttpTunnelClientChannel.class); .getInstance(HttpTunnelClientChannel.class);
private final HttpTunnelClientChannelConfig config; private final HttpTunnelClientChannelConfig config;
private final SocketChannel sendChannel; final SocketChannel sendChannel;
private final SocketChannel pollChannel; final SocketChannel pollChannel;
private volatile String tunnelId; volatile String tunnelId;
private volatile ChannelFuture connectFuture; volatile ChannelFuture connectFuture;
private volatile boolean connected; volatile boolean connected;
private volatile boolean bound; volatile boolean bound;
volatile InetSocketAddress serverAddress; volatile InetSocketAddress serverAddress;
private volatile String serverHostName; volatile String serverHostName;
private final WorkerCallbacks callbackProxy; private final WorkerCallbacks callbackProxy;
private final SaturationManager saturationManager; private final SaturationManager saturationManager;
/** /**
* @see {@link HttpTunnelClientChannelFactory#newChannel(ChannelPipeline)} * @see HttpTunnelClientChannelFactory#newChannel(ChannelPipeline)
*/ */
protected HttpTunnelClientChannel(ChannelFactory factory, protected HttpTunnelClientChannel(ChannelFactory factory,
ChannelPipeline pipeline, HttpTunnelClientChannelSink sink, ChannelPipeline pipeline, HttpTunnelClientChannelSink sink,
@ -82,7 +82,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
ChannelGroup realConnections) { ChannelGroup realConnections) {
super(null, factory, pipeline, sink); super(null, factory, pipeline, sink);
this.callbackProxy = new WorkerCallbacks(); callbackProxy = new WorkerCallbacks();
sendChannel = outboundFactory.newChannel(createSendPipeline()); sendChannel = outboundFactory.newChannel(createSendPipeline());
pollChannel = outboundFactory.newChannel(createPollPipeline()); pollChannel = outboundFactory.newChannel(createPollPipeline());
@ -100,26 +100,36 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
Channels.fireChannelOpen(this); Channels.fireChannelOpen(this);
} }
@Override
public HttpTunnelClientChannelConfig getConfig() { public HttpTunnelClientChannelConfig getConfig() {
return config; return config;
} }
@Override
public boolean isBound() { public boolean isBound() {
return bound; return bound;
} }
@Override
public boolean isConnected() { public boolean isConnected() {
return connected; return connected;
} }
@Override
public InetSocketAddress getLocalAddress() { public InetSocketAddress getLocalAddress() {
return sendChannel.getLocalAddress(); return sendChannel.getLocalAddress();
} }
@Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
return serverAddress; return serverAddress;
} }
@Override
protected boolean setClosed() {
return super.setClosed();
}
void onConnectRequest(ChannelFuture connectFuture, void onConnectRequest(ChannelFuture connectFuture,
InetSocketAddress remoteAddress) { InetSocketAddress remoteAddress) {
this.connectFuture = connectFuture; this.connectFuture = connectFuture;
@ -146,6 +156,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
pollChannel.disconnect().addListener(disconnectListener); pollChannel.disconnect().addListener(disconnectListener);
disconnectFuture.addListener(new ChannelFutureListener() { disconnectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) public void operationComplete(ChannelFuture future)
throws Exception { throws Exception {
serverAddress = null; serverAddress = null;
@ -214,7 +225,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
return pipeline; return pipeline;
} }
private void setTunnelIdForPollChannel() { void setTunnelIdForPollChannel() {
HttpTunnelClientPollHandler pollHandler = HttpTunnelClientPollHandler pollHandler =
pollChannel.getPipeline() pollChannel.getPipeline()
.get(HttpTunnelClientPollHandler.class); .get(HttpTunnelClientPollHandler.class);
@ -230,6 +241,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
updateSaturationStatus(messageSize); updateSaturationStatus(messageSize);
Channels.write(sendChannel, e.getMessage()).addListener( Channels.write(sendChannel, e.getMessage()).addListener(
new ChannelFutureListener() { new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) public void operationComplete(ChannelFuture future)
throws Exception { throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
@ -242,7 +254,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
}); });
} }
private void updateSaturationStatus(int queueSizeDelta) { void updateSaturationStatus(int queueSizeDelta) {
SaturationStateChange transition = SaturationStateChange transition =
saturationManager.queueSizeChanged(queueSizeDelta); saturationManager.queueSizeChanged(queueSizeDelta);
switch (transition) { switch (transition) {
@ -279,6 +291,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
eventsLeft = new AtomicInteger(numToConsolidate); eventsLeft = new AtomicInteger(numToConsolidate);
} }
@Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
futureFailed(future); futureFailed(future);
@ -315,7 +328,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
protected void futureFailed(ChannelFuture future) { protected void futureFailed(ChannelFuture future) {
LOG.warn("Failed to close one of the child channels of tunnel " + LOG.warn("Failed to close one of the child channels of tunnel " +
tunnelId); tunnelId);
HttpTunnelClientChannel.this.setClosed(); setClosed();
} }
@Override @Override
@ -323,7 +336,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Tunnel " + tunnelId + " closed"); LOG.debug("Tunnel " + tunnelId + " closed");
} }
HttpTunnelClientChannel.this.setClosed(); setClosed();
} }
} }
@ -332,20 +345,23 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
* Contains the implementing methods of HttpTunnelClientWorkerOwner, so that these are hidden * Contains the implementing methods of HttpTunnelClientWorkerOwner, so that these are hidden
* from the public API. * from the public API.
*/ */
private class WorkerCallbacks implements HttpTunnelClientWorkerOwner { class WorkerCallbacks implements HttpTunnelClientWorkerOwner {
@Override
public void onConnectRequest(ChannelFuture connectFuture, public void onConnectRequest(ChannelFuture connectFuture,
InetSocketAddress remoteAddress) { InetSocketAddress remoteAddress) {
HttpTunnelClientChannel.this.onConnectRequest(connectFuture, HttpTunnelClientChannel.this.onConnectRequest(connectFuture,
remoteAddress); remoteAddress);
} }
@Override
public void onTunnelOpened(String tunnelId) { public void onTunnelOpened(String tunnelId) {
HttpTunnelClientChannel.this.tunnelId = tunnelId; HttpTunnelClientChannel.this.tunnelId = tunnelId;
setTunnelIdForPollChannel(); setTunnelIdForPollChannel();
Channels.connect(pollChannel, sendChannel.getRemoteAddress()); Channels.connect(pollChannel, sendChannel.getRemoteAddress());
} }
@Override
public void fullyEstablished() { public void fullyEstablished() {
if (!bound) { if (!bound) {
bound = true; bound = true;
@ -359,10 +375,12 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
getRemoteAddress()); getRemoteAddress());
} }
@Override
public void onMessageReceived(ChannelBuffer content) { public void onMessageReceived(ChannelBuffer content) {
Channels.fireMessageReceived(HttpTunnelClientChannel.this, content); Channels.fireMessageReceived(HttpTunnelClientChannel.this, content);
} }
@Override
public String getServerHostName() { public String getServerHostName() {
if (serverHostName == null) { if (serverHostName == null) {
serverHostName = serverHostName =

View File

@ -31,8 +31,8 @@ import org.jboss.netty.channel.socket.SocketChannelConfig;
* <th>Name</th><th>Associated setter method</th> * <th>Name</th><th>Associated setter method</th>
* </tr> * </tr>
* <tr><td>{@code "proxyAddress"}</td><td>{@link #setProxyAddress(SocketAddress)}</td></tr> * <tr><td>{@code "proxyAddress"}</td><td>{@link #setProxyAddress(SocketAddress)}</td></tr>
* <tr><td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(long)}</td></tr> * <tr><td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(int)}</td></tr>
* <tr><td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(long)}</td></tr> * <tr><td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(int)}</td></tr>
* </table> * </table>
* *
* @author The Netty Project (netty-dev@lists.jboss.org) * @author The Netty Project (netty-dev@lists.jboss.org)
@ -88,39 +88,48 @@ public class HttpTunnelClientChannelConfig extends HttpTunnelChannelConfig {
/* GENERIC SOCKET CHANNEL CONFIGURATION */ /* GENERIC SOCKET CHANNEL CONFIGURATION */
@Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
return pollChannelConfig.getReceiveBufferSize(); return pollChannelConfig.getReceiveBufferSize();
} }
@Override
public int getSendBufferSize() { public int getSendBufferSize() {
return pollChannelConfig.getSendBufferSize(); return pollChannelConfig.getSendBufferSize();
} }
@Override
public int getSoLinger() { public int getSoLinger() {
return pollChannelConfig.getSoLinger(); return pollChannelConfig.getSoLinger();
} }
@Override
public int getTrafficClass() { public int getTrafficClass() {
return pollChannelConfig.getTrafficClass(); return pollChannelConfig.getTrafficClass();
} }
@Override
public boolean isKeepAlive() { public boolean isKeepAlive() {
return pollChannelConfig.isKeepAlive(); return pollChannelConfig.isKeepAlive();
} }
@Override
public boolean isReuseAddress() { public boolean isReuseAddress() {
return pollChannelConfig.isReuseAddress(); return pollChannelConfig.isReuseAddress();
} }
@Override
public boolean isTcpNoDelay() { public boolean isTcpNoDelay() {
return pollChannelConfig.isTcpNoDelay(); return pollChannelConfig.isTcpNoDelay();
} }
@Override
public void setKeepAlive(boolean keepAlive) { public void setKeepAlive(boolean keepAlive) {
pollChannelConfig.setKeepAlive(keepAlive); pollChannelConfig.setKeepAlive(keepAlive);
sendChannelConfig.setKeepAlive(keepAlive); sendChannelConfig.setKeepAlive(keepAlive);
} }
@Override
public void setPerformancePreferences(int connectionTime, int latency, public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) { int bandwidth) {
pollChannelConfig.setPerformancePreferences(connectionTime, latency, pollChannelConfig.setPerformancePreferences(connectionTime, latency,
@ -129,31 +138,37 @@ public class HttpTunnelClientChannelConfig extends HttpTunnelChannelConfig {
bandwidth); bandwidth);
} }
@Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
pollChannelConfig.setReceiveBufferSize(receiveBufferSize); pollChannelConfig.setReceiveBufferSize(receiveBufferSize);
sendChannelConfig.setReceiveBufferSize(receiveBufferSize); sendChannelConfig.setReceiveBufferSize(receiveBufferSize);
} }
@Override
public void setReuseAddress(boolean reuseAddress) { public void setReuseAddress(boolean reuseAddress) {
pollChannelConfig.setReuseAddress(reuseAddress); pollChannelConfig.setReuseAddress(reuseAddress);
sendChannelConfig.setReuseAddress(reuseAddress); sendChannelConfig.setReuseAddress(reuseAddress);
} }
@Override
public void setSendBufferSize(int sendBufferSize) { public void setSendBufferSize(int sendBufferSize) {
pollChannelConfig.setSendBufferSize(sendBufferSize); pollChannelConfig.setSendBufferSize(sendBufferSize);
sendChannelConfig.setSendBufferSize(sendBufferSize); sendChannelConfig.setSendBufferSize(sendBufferSize);
} }
@Override
public void setSoLinger(int soLinger) { public void setSoLinger(int soLinger) {
pollChannelConfig.setSoLinger(soLinger); pollChannelConfig.setSoLinger(soLinger);
sendChannelConfig.setSoLinger(soLinger); sendChannelConfig.setSoLinger(soLinger);
} }
@Override
public void setTcpNoDelay(boolean tcpNoDelay) { public void setTcpNoDelay(boolean tcpNoDelay) {
pollChannelConfig.setTcpNoDelay(true); pollChannelConfig.setTcpNoDelay(true);
sendChannelConfig.setTcpNoDelay(true); sendChannelConfig.setTcpNoDelay(true);
} }
@Override
public void setTrafficClass(int trafficClass) { public void setTrafficClass(int trafficClass) {
pollChannelConfig.setTrafficClass(1); pollChannelConfig.setTrafficClass(1);
sendChannelConfig.setTrafficClass(1); sendChannelConfig.setTrafficClass(1);

View File

@ -41,11 +41,13 @@ public class HttpTunnelClientChannelFactory implements
this.factory = factory; this.factory = factory;
} }
@Override
public HttpTunnelClientChannel newChannel(ChannelPipeline pipeline) { public HttpTunnelClientChannel newChannel(ChannelPipeline pipeline) {
return new HttpTunnelClientChannel(this, pipeline, return new HttpTunnelClientChannel(this, pipeline,
new HttpTunnelClientChannelSink(), factory, realConnections); new HttpTunnelClientChannelSink(), factory, realConnections);
} }
@Override
public void releaseExternalResources() { public void releaseExternalResources() {
realConnections.close().awaitUninterruptibly(); realConnections.close().awaitUninterruptibly();
factory.releaseExternalResources(); factory.releaseExternalResources();

View File

@ -33,6 +33,7 @@ import org.jboss.netty.channel.MessageEvent;
*/ */
class HttpTunnelClientChannelSink extends AbstractChannelSink { class HttpTunnelClientChannelSink extends AbstractChannelSink {
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception { throws Exception {
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {

View File

@ -241,7 +241,7 @@ public class HttpTunnelMessageUtils {
public static boolean hasContents(HttpResponse response, public static boolean hasContents(HttpResponse response,
byte[] expectedContents) { byte[] expectedContents) {
if (response.getContent() != null && if (response.getContent() != null &&
HttpHeaders.getContentLength(response) == expectedContents.length && HttpHeaders.getContentLength(response, 0) == expectedContents.length &&
response.getContent().readableBytes() == expectedContents.length) { response.getContent().readableBytes() == expectedContents.length) {
byte[] compareBytes = new byte[expectedContents.length]; byte[] compareBytes = new byte[expectedContents.length];
response.getContent().readBytes(compareBytes); response.getContent().readBytes(compareBytes);
@ -300,7 +300,7 @@ public class HttpTunnelMessageUtils {
public static Object extractErrorMessage(HttpResponse response) { public static Object extractErrorMessage(HttpResponse response) {
if (response.getContent() == null || if (response.getContent() == null ||
HttpHeaders.getContentLength(response) == 0) { HttpHeaders.getContentLength(response, 0) == 0) {
return ""; return "";
} }

View File

@ -36,12 +36,13 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
private final ServerSocketChannel realChannel; private final ServerSocketChannel realChannel;
private final HttpTunnelServerChannelConfig config; final HttpTunnelServerChannelConfig config;
private final ServerMessageSwitch messageSwitch; final ServerMessageSwitch messageSwitch;
private final ChannelFutureListener CLOSE_FUTURE_PROXY = private final ChannelFutureListener CLOSE_FUTURE_PROXY =
new ChannelFutureListener() { new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) public void operationComplete(ChannelFuture future)
throws Exception { throws Exception {
HttpTunnelServerChannel.this.setClosed(); HttpTunnelServerChannel.this.setClosed();
@ -62,31 +63,45 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
Channels.fireChannelOpen(this); Channels.fireChannelOpen(this);
} }
@Override
public ServerSocketChannelConfig getConfig() { public ServerSocketChannelConfig getConfig() {
return config; return config;
} }
@Override
public InetSocketAddress getLocalAddress() { public InetSocketAddress getLocalAddress() {
return realChannel.getLocalAddress(); return realChannel.getLocalAddress();
} }
@Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
// server channels never have a remote address // server channels never have a remote address
return null; return null;
} }
@Override
public boolean isBound() { public boolean isBound() {
return realChannel.isBound(); return realChannel.isBound();
} }
@Override
protected boolean setClosed() {
return super.setClosed();
}
/** /**
* Used to hide the newChannel method from the public API. * Used to hide the newChannel method from the public API.
*/ */
private final class TunnelCreator implements private final class TunnelCreator implements
HttpTunnelAcceptedChannelFactory { HttpTunnelAcceptedChannelFactory {
public HttpTunnelAcceptedChannelReceiver newChannel(String newTunnelId, TunnelCreator() {
InetSocketAddress remoteAddress) { super();
}
@Override
public HttpTunnelAcceptedChannelReceiver newChannel(
String newTunnelId, InetSocketAddress remoteAddress) {
ChannelPipeline childPipeline = null; ChannelPipeline childPipeline = null;
try { try {
childPipeline = getConfig().getPipelineFactory().getPipeline(); childPipeline = getConfig().getPipelineFactory().getPipeline();
@ -103,6 +118,7 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
getFactory(), childPipeline, sink, remoteAddress, config); getFactory(), childPipeline, sink, remoteAddress, config);
} }
@Override
public String generateTunnelId() { public String generateTunnelId() {
return config.getTunnelIdGenerator().generateId(); return config.getTunnelIdGenerator().generateId();
} }

View File

@ -45,56 +45,69 @@ public class HttpTunnelServerChannelConfig implements ServerSocketChannelConfig
return realChannel.getConfig(); return realChannel.getConfig();
} }
@Override
public int getBacklog() { public int getBacklog() {
return getWrappedConfig().getBacklog(); return getWrappedConfig().getBacklog();
} }
@Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
return getWrappedConfig().getReceiveBufferSize(); return getWrappedConfig().getReceiveBufferSize();
} }
@Override
public boolean isReuseAddress() { public boolean isReuseAddress() {
return getWrappedConfig().isReuseAddress(); return getWrappedConfig().isReuseAddress();
} }
@Override
public void setBacklog(int backlog) { public void setBacklog(int backlog) {
getWrappedConfig().setBacklog(backlog); getWrappedConfig().setBacklog(backlog);
} }
@Override
public void setPerformancePreferences(int connectionTime, int latency, public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) { int bandwidth) {
getWrappedConfig().setPerformancePreferences(connectionTime, latency, getWrappedConfig().setPerformancePreferences(connectionTime, latency,
bandwidth); bandwidth);
} }
@Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
getWrappedConfig().setReceiveBufferSize(receiveBufferSize); getWrappedConfig().setReceiveBufferSize(receiveBufferSize);
} }
@Override
public void setReuseAddress(boolean reuseAddress) { public void setReuseAddress(boolean reuseAddress) {
getWrappedConfig().setReuseAddress(reuseAddress); getWrappedConfig().setReuseAddress(reuseAddress);
} }
@Override
public ChannelBufferFactory getBufferFactory() { public ChannelBufferFactory getBufferFactory() {
return getWrappedConfig().getBufferFactory(); return getWrappedConfig().getBufferFactory();
} }
@Override
public int getConnectTimeoutMillis() { public int getConnectTimeoutMillis() {
return getWrappedConfig().getConnectTimeoutMillis(); return getWrappedConfig().getConnectTimeoutMillis();
} }
@Override
public ChannelPipelineFactory getPipelineFactory() { public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory; return pipelineFactory;
} }
@Override
public void setBufferFactory(ChannelBufferFactory bufferFactory) { public void setBufferFactory(ChannelBufferFactory bufferFactory) {
getWrappedConfig().setBufferFactory(bufferFactory); getWrappedConfig().setBufferFactory(bufferFactory);
} }
@Override
public void setConnectTimeoutMillis(int connectTimeoutMillis) { public void setConnectTimeoutMillis(int connectTimeoutMillis) {
getWrappedConfig().setConnectTimeoutMillis(connectTimeoutMillis); getWrappedConfig().setConnectTimeoutMillis(connectTimeoutMillis);
} }
@Override
public boolean setOption(String name, Object value) { public boolean setOption(String name, Object value) {
if (name.equals("pipelineFactory")) { if (name.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value); setPipelineFactory((ChannelPipelineFactory) value);
@ -107,12 +120,14 @@ public class HttpTunnelServerChannelConfig implements ServerSocketChannelConfig
} }
} }
@Override
public void setOptions(Map<String, Object> options) { public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) { for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue()); setOption(e.getKey(), e.getValue());
} }
} }
@Override
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) { public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
this.pipelineFactory = pipelineFactory; this.pipelineFactory = pipelineFactory;
} }

View File

@ -40,6 +40,7 @@ public class HttpTunnelServerChannelFactory implements
realConnections = new DefaultChannelGroup(); realConnections = new DefaultChannelGroup();
} }
@Override
public HttpTunnelServerChannel newChannel(ChannelPipeline pipeline) { public HttpTunnelServerChannel newChannel(ChannelPipeline pipeline) {
return new HttpTunnelServerChannel(this, pipeline); return new HttpTunnelServerChannel(this, pipeline);
} }
@ -58,6 +59,7 @@ public class HttpTunnelServerChannelFactory implements
return newChannel; return newChannel;
} }
@Override
public void releaseExternalResources() { public void releaseExternalResources() {
realConnections.close().awaitUninterruptibly(); realConnections.close().awaitUninterruptibly();
realConnectionFactory.releaseExternalResources(); realConnectionFactory.releaseExternalResources();

View File

@ -36,6 +36,7 @@ class HttpTunnelServerChannelSink extends AbstractChannelSink {
private ServerSocketChannel realChannel; private ServerSocketChannel realChannel;
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception { throws Exception {
@ -67,6 +68,7 @@ class HttpTunnelServerChannelSink extends AbstractChannelSink {
this.upstreamFuture = upstreamFuture; this.upstreamFuture = upstreamFuture;
} }
@Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
upstreamFuture.setSuccess(); upstreamFuture.setSuccess();

View File

@ -62,6 +62,7 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
tunnelsById = new ConcurrentHashMap<String, TunnelInfo>(); tunnelsById = new ConcurrentHashMap<String, TunnelInfo>();
} }
@Override
public String createTunnel(InetSocketAddress remoteAddress) { public String createTunnel(InetSocketAddress remoteAddress) {
String newTunnelId = String newTunnelId =
String.format("%s_%s", tunnelIdPrefix, String.format("%s_%s", tunnelIdPrefix,
@ -74,11 +75,13 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
return newTunnelId; return newTunnelId;
} }
@Override
public boolean isOpenTunnel(String tunnelId) { public boolean isOpenTunnel(String tunnelId) {
TunnelInfo tunnel = tunnelsById.get(tunnelId); TunnelInfo tunnel = tunnelsById.get(tunnelId);
return tunnel != null; return tunnel != null;
} }
@Override
public void pollOutboundData(String tunnelId, Channel channel) { public void pollOutboundData(String tunnelId, Channel channel) {
TunnelInfo tunnel = tunnelsById.get(tunnelId); TunnelInfo tunnel = tunnelsById.get(tunnelId);
if (tunnel == null) { if (tunnel == null) {
@ -136,6 +139,7 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
new RelayedChannelFutureListener(originalFuture)); new RelayedChannelFutureListener(originalFuture));
} }
@Override
public TunnelStatus routeInboundData(String tunnelId, public TunnelStatus routeInboundData(String tunnelId,
ChannelBuffer inboundData) { ChannelBuffer inboundData) {
TunnelInfo tunnel = tunnelsById.get(tunnelId); TunnelInfo tunnel = tunnelsById.get(tunnelId);
@ -156,14 +160,34 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
return TunnelStatus.ALIVE; return TunnelStatus.ALIVE;
} }
@Override
public void clientCloseTunnel(String tunnelId) { public void clientCloseTunnel(String tunnelId) {
TunnelInfo tunnel = tunnelsById.get(tunnelId); TunnelInfo tunnel = tunnelsById.get(tunnelId);
if (tunnel == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("attempt made to close tunnel id " +
tunnelId + " which is unknown or closed");
}
return;
}
tunnel.localChannel.clientClosed(); tunnel.localChannel.clientClosed();
tunnelsById.remove(tunnelId); tunnelsById.remove(tunnelId);
} }
@Override
public void serverCloseTunnel(String tunnelId) { public void serverCloseTunnel(String tunnelId) {
TunnelInfo tunnel = tunnelsById.get(tunnelId); TunnelInfo tunnel = tunnelsById.get(tunnelId);
if (tunnel == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("attempt made to close tunnel id " +
tunnelId + " which is unknown or closed");
}
return;
}
tunnel.closing.set(true); tunnel.closing.set(true);
Channel responseChannel = tunnel.responseChannel.getAndSet(null); Channel responseChannel = tunnel.responseChannel.getAndSet(null);
@ -179,6 +203,7 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
tunnelsById.remove(tunnelId); tunnelsById.remove(tunnelId);
} }
@Override
public void routeOutboundData(String tunnelId, ChannelBuffer data, public void routeOutboundData(String tunnelId, ChannelBuffer data,
ChannelFuture writeFuture) { ChannelFuture writeFuture) {
TunnelInfo tunnel = tunnelsById.get(tunnelId); TunnelInfo tunnel = tunnelsById.get(tunnelId);
@ -217,10 +242,11 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
ChannelFutureListener { ChannelFutureListener {
private final ChannelFuture originalFuture; private final ChannelFuture originalFuture;
private RelayedChannelFutureListener(ChannelFuture originalFuture) { RelayedChannelFutureListener(ChannelFuture originalFuture) {
this.originalFuture = originalFuture; this.originalFuture = originalFuture;
} }
@Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
originalFuture.setSuccess(); originalFuture.setSuccess();
@ -231,6 +257,10 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
} }
private static final class TunnelInfo { private static final class TunnelInfo {
TunnelInfo() {
super();
}
public String tunnelId; public String tunnelId;
public HttpTunnelAcceptedChannelReceiver localChannel; public HttpTunnelAcceptedChannelReceiver localChannel;

View File

@ -568,10 +568,13 @@ class NioDatagramWorker implements Runnable {
} }
} }
channel.inWriteNowLoop = false; channel.inWriteNowLoop = false;
}
fireWriteComplete(channel, writtenBytes);
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (addOpWrite) { if (addOpWrite) {
setOpWrite(channel); setOpWrite(channel);
} else if (removeOpWrite) { } else if (removeOpWrite) {
@ -579,6 +582,9 @@ class NioDatagramWorker implements Runnable {
} }
} }
fireWriteComplete(channel, writtenBytes);
}
private void setOpWrite(final NioDatagramChannel channel) { private void setOpWrite(final NioDatagramChannel channel) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.getDatagramChannel().keyFor(selector); SelectionKey key = channel.getDatagramChannel().keyFor(selector);

View File

@ -83,15 +83,6 @@ class NioSocketChannel extends AbstractChannel
this.socket = socket; this.socket = socket;
this.worker = worker; this.worker = worker;
config = new DefaultNioSocketChannelConfig(socket.socket()); config = new DefaultNioSocketChannelConfig(socket.socket());
// TODO Move the state variable to AbstractChannel so that we don't need
// to add many listeners.
getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
state = ST_CLOSED;
}
});
} }
@Override @Override
@ -157,6 +148,7 @@ class NioSocketChannel extends AbstractChannel
@Override @Override
protected boolean setClosed() { protected boolean setClosed() {
state = ST_CLOSED;
return super.setClosed(); return super.setClosed();
} }

View File

@ -506,10 +506,13 @@ class NioWorker implements Runnable {
} }
} }
channel.inWriteNowLoop = false; channel.inWriteNowLoop = false;
}
fireWriteComplete(channel, writtenBytes);
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (open) { if (open) {
if (addOpWrite) { if (addOpWrite) {
setOpWrite(channel); setOpWrite(channel);
@ -519,6 +522,9 @@ class NioWorker implements Runnable {
} }
} }
fireWriteComplete(channel, writtenBytes);
}
private void setOpWrite(NioSocketChannel channel) { private void setOpWrite(NioSocketChannel channel) {
Selector selector = this.selector; Selector selector = this.selector;
SelectionKey key = channel.socket.keyFor(selector); SelectionKey key = channel.socket.keyFor(selector);

View File

@ -52,7 +52,7 @@ public class HttpResponseHandler extends SimpleChannelUpstreamHandler {
System.out.println(); System.out.println();
} }
if (response.getStatus().getCode() == 200 && response.isChunked()) { if (response.isChunked()) {
readingChunks = true; readingChunks = true;
System.out.println("CHUNKED CONTENT {"); System.out.println("CHUNKED CONTENT {");
} else { } else {

View File

@ -51,7 +51,7 @@ public class SecureChatClientPipelineFactory implements
SSLEngine engine = SSLEngine engine =
SecureChatSslContextFactory.getClientContext().createSSLEngine(); SecureChatSslContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true); //engine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(engine)); pipeline.addLast("ssl", new SslHandler(engine));

View File

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
@ -70,11 +71,16 @@ public class UptimeClient {
// Configure the pipeline factory. // Configure the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
private final ChannelHandler timeoutHandler =
new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
new UptimeClientHandler(bootstrap, timer);
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline( return Channels.pipeline(
new ReadTimeoutHandler(timer, READ_TIMEOUT), timeoutHandler, uptimeHandler);
new UptimeClientHandler(bootstrap, timer));
} }
}); });

View File

@ -24,6 +24,7 @@ import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.jboss.netty.util.Timeout; import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer; import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask; import org.jboss.netty.util.TimerTask;
@ -85,6 +86,13 @@ public class UptimeClientHandler extends SimpleChannelUpstreamHandler {
startTime = -1; startTime = -1;
println("Failed to connect: " + cause.getMessage()); println("Failed to connect: " + cause.getMessage());
} }
if (cause instanceof ReadTimeoutException) {
// The connection was OK but there was no traffic for last period.
println("Disconnecting due to no inbound traffic");
}
else {
cause.printStackTrace();
}
ctx.getChannel().close(); ctx.getChannel().close();
} }

View File

@ -187,6 +187,11 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
return productQueue.size(); return productQueue.size();
} }
@Override
public ChannelPipeline getPipeline() {
return pipeline;
}
private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler { private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
EmbeddedChannelSink() { EmbeddedChannelSink() {
super(); super();

View File

@ -17,6 +17,8 @@ package org.jboss.netty.handler.codec.embedder;
import java.util.Collection; import java.util.Collection;
import org.jboss.netty.channel.ChannelPipeline;
/** /**
* A helper that wraps an encoder or a decoder (codec) so that they can be used * A helper that wraps an encoder or a decoder (codec) so that they can be used
* without doing actual I/O in unit tests or higher level codecs. Please refer * without doing actual I/O in unit tests or higher level codecs. Please refer
@ -93,4 +95,9 @@ public interface CodecEmbedder<E> {
* Returns the number of encoded or decoded output in the product queue. * Returns the number of encoded or decoded output in the product queue.
*/ */
int size(); int size();
/**
* Returns the {@link ChannelPipeline} that handles the input.
*/
ChannelPipeline getPipeline();
} }

View File

@ -290,17 +290,7 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
buffer.skipBytes(localBytesToDiscard); buffer.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard; bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard; this.bytesToDiscard = bytesToDiscard;
if (bytesToDiscard == 0) { failIfNecessary(ctx);
// Reset to the initial state and tell the handlers that
// the frame was too large.
// TODO Let user choose when the exception should be raised - early or late?
// If early, fail() should be called when discardingTooLongFrame is set to true.
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
fail(ctx, tooLongFrameLength);
} else {
// Keep discarding.
}
return null; return null;
} }
@ -350,6 +340,7 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
tooLongFrameLength = frameLength; tooLongFrameLength = frameLength;
bytesToDiscard = frameLength - buffer.readableBytes(); bytesToDiscard = frameLength - buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes()); buffer.skipBytes(buffer.readableBytes());
failIfNecessary(ctx);
return null; return null;
} }
@ -375,6 +366,21 @@ public class LengthFieldBasedFrameDecoder extends FrameDecoder {
return frame; return frame;
} }
private void failIfNecessary(ChannelHandlerContext ctx) {
if (bytesToDiscard == 0) {
// Reset to the initial state and tell the handlers that
// the frame was too large.
// TODO Let user choose when the exception should be raised - early or late?
// If early, fail() should be called when discardingTooLongFrame is set to true.
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
discardingTooLongFrame = false;
fail(ctx, tooLongFrameLength);
} else {
// Keep discarding.
}
}
/** /**
* Extract the sub-region of the specified buffer. This method is called by * Extract the sub-region of the specified buffer. This method is called by
* {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer)} for each * {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer)} for each

View File

@ -1,35 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.http;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.TimeZone;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$
*/
final class CookieDateFormat extends SimpleDateFormat {
private static final long serialVersionUID = 1789486337887402640L;
CookieDateFormat() {
super("E, d-MMM-y HH:mm:ss z", Locale.ENGLISH);
setTimeZone(TimeZone.getTimeZone("GMT"));
}
}

View File

@ -134,7 +134,7 @@ public class CookieDecoder {
} else if (CookieHeaderNames.EXPIRES.equalsIgnoreCase(name)) { } else if (CookieHeaderNames.EXPIRES.equalsIgnoreCase(name)) {
try { try {
long maxAgeMillis = long maxAgeMillis =
new CookieDateFormat().parse(value).getTime() - new HttpHeaderDateFormat().parse(value).getTime() -
System.currentTimeMillis(); System.currentTimeMillis();
if (maxAgeMillis <= 0) { if (maxAgeMillis <= 0) {
maxAge = 0; maxAge = 0;

View File

@ -107,7 +107,7 @@ public class CookieEncoder {
if (cookie.getMaxAge() >= 0) { if (cookie.getMaxAge() >= 0) {
if (cookie.getVersion() == 0) { if (cookie.getVersion() == 0) {
addUnquoted(sb, CookieHeaderNames.EXPIRES, addUnquoted(sb, CookieHeaderNames.EXPIRES,
new CookieDateFormat().format( new HttpHeaderDateFormat().format(
new Date(System.currentTimeMillis() + new Date(System.currentTimeMillis() +
cookie.getMaxAge() * 1000L))); cookie.getMaxAge() * 1000L)));
} else { } else {
@ -167,7 +167,10 @@ public class CookieEncoder {
} }
} }
if (sb.length() > 0) {
sb.setLength(sb.length() - 1); sb.setLength(sb.length() - 1);
}
return sb.toString(); return sb.toString();
} }
@ -205,6 +208,7 @@ public class CookieEncoder {
} }
} }
if(sb.length() > 0)
sb.setLength(sb.length() - 1); sb.setLength(sb.length() - 1);
return sb.toString(); return sb.toString();
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.jboss.netty.handler.codec.http; package org.jboss.netty.handler.codec.http;
import java.io.File;
import java.nio.charset.Charset; import java.nio.charset.Charset;
/** /**
@ -156,7 +157,8 @@ public class DiskFileUpload extends AbstractDiskHttpData implements FileUpload {
@Override @Override
protected String getDiskFilename() { protected String getDiskFilename() {
return filename; File file = new File(filename);
return file.getName();
} }
@Override @Override

View File

@ -136,11 +136,20 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
// message-body, even though the presence of entity-header fields // message-body, even though the presence of entity-header fields
// might lead one to believe they do. // might lead one to believe they do.
if (HttpMethod.HEAD.equals(method)) { if (HttpMethod.HEAD.equals(method)) {
// Interesting edge case: return true;
// Zero-byte chunk will appear if Transfer-Encoding of the
// response is 'chunked'. This is probably because of the // The following code was inserted to work around the servers
// trailing headers. // that behave incorrectly. It has been commented out
return !msg.isChunked(); // because it does not work with well behaving servers.
// Please note, even if the 'Transfer-Encoding: chunked'
// header exists in the HEAD response, the response should
// have absolutely no content.
//
//// Interesting edge case:
//// Some poorly implemented servers will send a zero-byte
//// chunk if Transfer-Encoding of the response is 'chunked'.
////
//// return !msg.isChunked();
} }
break; break;
case 'C': case 'C':

View File

@ -62,7 +62,7 @@ public abstract class HttpContentDecoder extends SimpleChannelUpstreamHandler {
Object msg = e.getMessage(); Object msg = e.getMessage();
if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().getCode() == 100) { if (msg instanceof HttpResponse && ((HttpResponse) msg).getStatus().getCode() == 100) {
// 100-continue response must be passed through. // 100-continue response must be passed through.
ctx.sendDownstream(e); ctx.sendUpstream(e);
} else if (msg instanceof HttpMessage) { } else if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg; HttpMessage m = (HttpMessage) msg;

View File

@ -102,7 +102,8 @@ public abstract class HttpContentEncoder extends SimpleChannelHandler {
throw new IllegalStateException("cannot send more responses than requests"); throw new IllegalStateException("cannot send more responses than requests");
} }
if ((encoder = newContentEncoder(acceptEncoding)) != null) { boolean hasContent = m.isChunked() || m.getContent().readable();
if (hasContent && (encoder = newContentEncoder(acceptEncoding)) != null) {
// Encode the content and remove or replace the existing headers // Encode the content and remove or replace the existing headers
// so that the message looks like a decoded message. // so that the message looks like a decoded message.
m.setHeader( m.setHeader(

View File

@ -0,0 +1,92 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.http;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.TimeZone;
/**
* This DateFormat decodes 3 formats of {@link Date}, but only encodes the one,
* the first:
* <ul>
* <li>Sun, 06 Nov 1994 08:49:37 GMT: standard specification, the only one with
* valid generation</li>
* <li>Sun, 06 Nov 1994 08:49:37 GMT: obsolete specification</li>
* <li>Sun Nov 6 08:49:37 1994: obsolete specification</li>
* </ul>
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @version $Rev$, $Date$
*/
final class HttpHeaderDateFormat extends SimpleDateFormat {
private static final long serialVersionUID = -925286159755905325L;
private final SimpleDateFormat format1 = new HttpHeaderDateFormatObsolete1();
private final SimpleDateFormat format2 = new HttpHeaderDateFormatObsolete2();
/**
* Standard date format<p>
* Sun, 06 Nov 1994 08:49:37 GMT -> E, d MMM yyyy HH:mm:ss z
*/
HttpHeaderDateFormat() {
super("E, dd MMM yyyy HH:mm:ss z", Locale.ENGLISH);
setTimeZone(TimeZone.getTimeZone("GMT"));
}
@Override
public Date parse(String text, ParsePosition pos) {
Date date = super.parse(text, pos);
if (date == null) {
date = format1.parse(text, pos);
}
if (date == null) {
date = format2.parse(text, pos);
}
return date;
}
/**
* First obsolete format<p>
* Sunday, 06-Nov-94 08:49:37 GMT -> E, d-MMM-y HH:mm:ss z
*/
private final class HttpHeaderDateFormatObsolete1 extends SimpleDateFormat {
private static final long serialVersionUID = -3178072504225114298L;
HttpHeaderDateFormatObsolete1() {
super("E, dd-MMM-y HH:mm:ss z", Locale.ENGLISH);
setTimeZone(TimeZone.getTimeZone("GMT"));
}
}
/**
* Second obsolete format
* <p>
* Sun Nov 6 08:49:37 1994 -> EEE, MMM d HH:mm:ss yyyy
*/
private final class HttpHeaderDateFormatObsolete2 extends SimpleDateFormat {
private static final long serialVersionUID = 3010674519968303714L;
HttpHeaderDateFormatObsolete2() {
super("E MMM d HH:mm:ss yyyy", Locale.ENGLISH);
setTimeZone(TimeZone.getTimeZone("GMT"));
}
}
}

View File

@ -15,6 +15,9 @@
*/ */
package org.jboss.netty.handler.codec.http; package org.jboss.netty.handler.codec.http;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -529,6 +532,10 @@ public class HttpHeaders {
/** /**
* Sets a new header with the specified name and value. If there is an * Sets a new header with the specified name and value. If there is an
* existing header with the same name, the existing header is removed. * existing header with the same name, the existing header is removed.
* If the specified value is not a {@link String}, it is converted into a
* {@link String} by {@link Object#toString()}, except for {@link Date}
* and {@link Calendar} which are formatted to the date format defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1">RFC2616</a>.
*/ */
public static void setHeader(HttpMessage message, String name, Object value) { public static void setHeader(HttpMessage message, String name, Object value) {
message.setHeader(name, value); message.setHeader(name, value);
@ -537,6 +544,16 @@ public class HttpHeaders {
/** /**
* Sets a new header with the specified name and values. If there is an * Sets a new header with the specified name and values. If there is an
* existing header with the same name, the existing header is removed. * existing header with the same name, the existing header is removed.
* This method can be represented approximately as the following code:
* <pre>
* removeHeader(message, name);
* for (Object v: values) {
* if (v == null) {
* break;
* }
* addHeader(message, name, v);
* }
* </pre>
*/ */
public static void setHeader(HttpMessage message, String name, Iterable<?> values) { public static void setHeader(HttpMessage message, String name, Iterable<?> values) {
message.setHeader(name, values); message.setHeader(name, values);
@ -544,11 +561,29 @@ public class HttpHeaders {
/** /**
* Adds a new header with the specified name and value. * Adds a new header with the specified name and value.
* If the specified value is not a {@link String}, it is converted into a
* {@link String} by {@link Object#toString()}, except for {@link Date}
* and {@link Calendar} which are formatted to the date format defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1">RFC2616</a>.
*/ */
public static void addHeader(HttpMessage message, String name, Object value) { public static void addHeader(HttpMessage message, String name, Object value) {
message.addHeader(name, value); message.addHeader(name, value);
} }
/**
* Removes the header with the specified name.
*/
public static void removeHeader(HttpMessage message, String name) {
message.removeHeader(name);
}
/**
* Removes all headers from the specified message.
*/
public static void clearHeaders(HttpMessage message) {
message.clearHeaders();
}
/** /**
* Returns the integer header value with the specified header name. If * Returns the integer header value with the specified header name. If
* there are more than one header value for the specified header name, the * there are more than one header value for the specified header name, the
@ -561,7 +596,7 @@ public class HttpHeaders {
public static int getIntHeader(HttpMessage message, String name) { public static int getIntHeader(HttpMessage message, String name) {
String value = getHeader(message, name); String value = getHeader(message, name);
if (value == null) { if (value == null) {
throw new NumberFormatException("null"); throw new NumberFormatException("header not found: " + name);
} }
return Integer.parseInt(value); return Integer.parseInt(value);
} }
@ -610,17 +645,104 @@ public class HttpHeaders {
message.addHeader(name, value); message.addHeader(name, value);
} }
/**
* Returns the date header value with the specified header name. If
* there are more than one header value for the specified header name, the
* first value is returned.
*
* @return the header value
* @throws ParseException
* if there is no such header or the header value is not a formatted date
*/
public static Date getDateHeader(HttpMessage message, String name) throws ParseException {
String value = getHeader(message, name);
if (value == null) {
throw new ParseException("header not found: " + name, 0);
}
return new HttpHeaderDateFormat().parse(value);
}
/**
* Returns the date header value with the specified header name. If
* there are more than one header value for the specified header name, the
* first value is returned.
*
* @return the header value or the {@code defaultValue} if there is no such
* header or the header value is not a formatted date
*/
public static Date getDateHeader(HttpMessage message, String name, Date defaultValue) {
final String value = getHeader(message, name);
if (value == null) {
return defaultValue;
}
try {
return new HttpHeaderDateFormat().parse(value);
} catch (ParseException e) {
return defaultValue;
}
}
/**
* Sets a new date header with the specified name and value. If there
* is an existing header with the same name, the existing header is removed.
* The specified value is formatted as defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1">RFC2616</a>
*/
public static void setDateHeader(HttpMessage message, String name, Date value) {
if (value != null) {
message.setHeader(name, new HttpHeaderDateFormat().format(value));
} else {
message.setHeader(name, null);
}
}
/**
* Sets a new date header with the specified name and values. If there
* is an existing header with the same name, the existing header is removed.
* The specified values are formatted as defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1">RFC2616</a>
*/
public static void setDateHeader(HttpMessage message, String name, Iterable<Date> values) {
message.setHeader(name, values);
}
/**
* Adds a new date header with the specified name and value. The specified
* value is formatted as defined in <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1">RFC2616</a>
*/
public static void addDateHeader(HttpMessage message, String name, Date value) {
message.addHeader(name, value);
}
/** /**
* Returns the length of the content. Please note that this value is * Returns the length of the content. Please note that this value is
* not retrieved from {@link HttpMessage#getContent()} but from the * not retrieved from {@link HttpMessage#getContent()} but from the
* {@code "Content-Length"} header, and thus they are independent from each * {@code "Content-Length"} header, and thus they are independent from each
* other. * other.
* *
* @return the content length or {@code 0} if this message does not have * @return the content length
* the {@code "Content-Length"} header *
* @throws NumberFormatException
* if the message does not have the {@code "Content-Length"} header
* or its value is not a number
*/ */
public static long getContentLength(HttpMessage message) { public static long getContentLength(HttpMessage message) {
return getContentLength(message, 0L); String value = getHeader(message, Names.CONTENT_LENGTH);
if (value != null) {
return Long.parseLong(value);
}
// We know the content length if it's a Web Socket message even if
// Content-Length header is missing.
long webSocketContentLength = getWebSocketContentLength(message);
if (webSocketContentLength >= 0) {
return webSocketContentLength;
}
// Otherwise we don't.
throw new NumberFormatException("header not found: " + Names.CONTENT_LENGTH);
} }
/** /**
@ -630,14 +752,35 @@ public class HttpHeaders {
* other. * other.
* *
* @return the content length or {@code defaultValue} if this message does * @return the content length or {@code defaultValue} if this message does
* not have the {@code "Content-Length"} header * not have the {@code "Content-Length"} header or its value is not
* a number
*/ */
public static long getContentLength(HttpMessage message, long defaultValue) { public static long getContentLength(HttpMessage message, long defaultValue) {
String contentLength = message.getHeader(Names.CONTENT_LENGTH); String contentLength = message.getHeader(Names.CONTENT_LENGTH);
if (contentLength != null) { if (contentLength != null) {
try {
return Long.parseLong(contentLength); return Long.parseLong(contentLength);
} catch (NumberFormatException e) {
return defaultValue;
}
} }
// We know the content length if it's a Web Socket message even if
// Content-Length header is missing.
long webSocketContentLength = getWebSocketContentLength(message);
if (webSocketContentLength >= 0) {
return webSocketContentLength;
}
// Otherwise we don't.
return defaultValue;
}
/**
* Returns the content length of the specified web socket message. If the
* specified message is not a web socket message, {@code -1} is returned.
*/
private static int getWebSocketContentLength(HttpMessage message) {
// WebSockset messages have constant content-lengths. // WebSockset messages have constant content-lengths.
if (message instanceof HttpRequest) { if (message instanceof HttpRequest) {
HttpRequest req = (HttpRequest) message; HttpRequest req = (HttpRequest) message;
@ -655,7 +798,8 @@ public class HttpHeaders {
} }
} }
return defaultValue; // Not a web socket message
return -1;
} }
/** /**
@ -687,6 +831,37 @@ public class HttpHeaders {
message.setHeader(Names.HOST, value); message.setHeader(Names.HOST, value);
} }
/**
* Returns the value of the {@code "Date"} header.
*
* @throws ParseException
* if there is no such header or the header value is not a formatted date
*/
public static Date getDate(HttpMessage message) throws ParseException {
return getDateHeader(message, Names.DATE);
}
/**
* Returns the value of the {@code "Date"} header. If there is no such
* header or the header is not a formatted date, the {@code defaultValue}
* is returned.
*/
public static Date getDate(HttpMessage message, Date defaultValue) {
return getDateHeader(message, Names.DATE, defaultValue);
}
/**
* Sets the {@code "Date"} header.
*/
public static void setDate(HttpMessage message, Date value) {
if (value != null) {
message.setHeader(Names.DATE, new HttpHeaderDateFormat().format(value));
} else {
message.setHeader(Names.DATE, null);
}
}
/** /**
* Returns {@code true} if and only if the specified message contains the * Returns {@code true} if and only if the specified message contains the
* {@code "Expect: 100-continue"} header. * {@code "Expect: 100-continue"} header.
@ -976,6 +1151,18 @@ public class HttpHeaders {
if (value == null) { if (value == null) {
return null; return null;
} }
if (value instanceof String) {
return (String) value;
}
if (value instanceof Number) {
return value.toString();
}
if (value instanceof Date) {
return new HttpHeaderDateFormat().format((Date) value);
}
if (value instanceof Calendar) {
return new HttpHeaderDateFormat().format(((Calendar) value).getTime());
}
return value.toString(); return value.toString();
} }

View File

@ -15,6 +15,8 @@
*/ */
package org.jboss.netty.handler.codec.http; package org.jboss.netty.handler.codec.http;
import java.util.Calendar;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -100,18 +102,36 @@ public interface HttpMessage {
/** /**
* Adds a new header with the specified name and value. * Adds a new header with the specified name and value.
* If the specified value is not a {@link String}, it is converted into a
* {@link String} by {@link Object#toString()}, except for {@link Date}
* and {@link Calendar} which are formatted to the date format defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1">RFC2616</a>.
*/ */
void addHeader(String name, Object value); void addHeader(String name, Object value);
/** /**
* Sets a new header with the specified name and value. If there is an * Sets a new header with the specified name and value. If there is an
* existing header with the same name, the existing header is removed. * existing header with the same name, the existing header is removed.
* If the specified value is not a {@link String}, it is converted into a
* {@link String} by {@link Object#toString()}, except for {@link Date}
* and {@link Calendar} which are formatted to the date format defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1">RFC2616</a>.
*/ */
void setHeader(String name, Object value); void setHeader(String name, Object value);
/** /**
* Sets a new header with the specified name and values. If there is an * Sets a new header with the specified name and values. If there is an
* existing header with the same name, the existing header is removed. * existing header with the same name, the existing header is removed.
* This method can be represented approximately as the following code:
* <pre>
* m.removeHeader(name);
* for (Object v: values) {
* if (v == null) {
* break;
* }
* m.addHeader(name, v);
* }
* </pre>
*/ */
void setHeader(String name, Iterable<?> values); void setHeader(String name, Iterable<?> values);

View File

@ -73,8 +73,10 @@ public class MixedAttribute implements Attribute {
if (attribute.length() + buffer.readableBytes() > limitSize) { if (attribute.length() + buffer.readableBytes() > limitSize) {
DiskAttribute diskAttribute = new DiskAttribute(attribute DiskAttribute diskAttribute = new DiskAttribute(attribute
.getName()); .getName());
if (((MemoryAttribute) attribute).getChannelBuffer() != null) {
diskAttribute.addContent(((MemoryAttribute) attribute) diskAttribute.addContent(((MemoryAttribute) attribute)
.getChannelBuffer(), false); .getChannelBuffer(), last);
}
attribute = diskAttribute; attribute = diskAttribute;
} }
} }

View File

@ -62,8 +62,10 @@ public class MixedFileUpload implements FileUpload {
.getContentType(), fileUpload .getContentType(), fileUpload
.getContentTransferEncoding(), fileUpload.getCharset(), .getContentTransferEncoding(), fileUpload.getCharset(),
definedSize); definedSize);
if (((MemoryFileUpload) fileUpload).getChannelBuffer() != null){
diskFileUpload.addContent(((MemoryFileUpload) fileUpload) diskFileUpload.addContent(((MemoryFileUpload) fileUpload)
.getChannelBuffer(), false); .getChannelBuffer(), last);
}
fileUpload = diskFileUpload; fileUpload = diskFileUpload;
} }
} }

View File

@ -78,9 +78,12 @@ public class ProtobufEncoder extends OneToOneEncoder {
@Override @Override
protected Object encode( protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof MessageLite)) { if (msg instanceof MessageLite) {
return msg;
}
return wrappedBuffer(((MessageLite) msg).toByteArray()); return wrappedBuffer(((MessageLite) msg).toByteArray());
} }
if (msg instanceof MessageLite.Builder) {
return wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}
return msg;
}
} }

View File

@ -21,6 +21,8 @@ import java.io.InputStream;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectStreamClass; import java.io.ObjectStreamClass;
import java.io.StreamCorruptedException; import java.io.StreamCorruptedException;
import java.util.HashMap;
import java.util.Map;
/** /**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
@ -31,6 +33,7 @@ import java.io.StreamCorruptedException;
*/ */
class CompactObjectInputStream extends ObjectInputStream { class CompactObjectInputStream extends ObjectInputStream {
private final Map<String, Class<?>> classCache = new HashMap<String, Class<?>>();
private final ClassLoader classLoader; private final ClassLoader classLoader;
CompactObjectInputStream(InputStream in) throws IOException { CompactObjectInputStream(InputStream in) throws IOException {
@ -65,7 +68,7 @@ class CompactObjectInputStream extends ObjectInputStream {
case CompactObjectOutputStream.TYPE_THIN_DESCRIPTOR: case CompactObjectOutputStream.TYPE_THIN_DESCRIPTOR:
String className = readUTF(); String className = readUTF();
Class<?> clazz = loadClass(className); Class<?> clazz = loadClass(className);
return ObjectStreamClass.lookup(clazz); return ObjectStreamClass.lookupAny(clazz);
default: default:
throw new StreamCorruptedException( throw new StreamCorruptedException(
"Unexpected class descriptor type: " + type); "Unexpected class descriptor type: " + type);
@ -74,16 +77,33 @@ class CompactObjectInputStream extends ObjectInputStream {
@Override @Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
// Query the cache first.
String className = desc.getName(); String className = desc.getName();
try { Class<?> clazz = classCache.get(className);
return loadClass(className); if (clazz != null) {
} catch (ClassNotFoundException ex) { return clazz;
return super.resolveClass(desc);
} }
// And then try to resolve.
try {
clazz = loadClass(className);
} catch (ClassNotFoundException ex) {
clazz = super.resolveClass(desc);
}
classCache.put(className, clazz);
return clazz;
} }
protected Class<?> loadClass(String className) throws ClassNotFoundException { protected Class<?> loadClass(String className) throws ClassNotFoundException {
// Query the cache first.
Class<?> clazz; Class<?> clazz;
clazz = classCache.get(className);
if (clazz != null) {
return clazz;
}
// And then try to load.
ClassLoader classLoader = this.classLoader; ClassLoader classLoader = this.classLoader;
if (classLoader == null) { if (classLoader == null) {
classLoader = Thread.currentThread().getContextClassLoader(); classLoader = Thread.currentThread().getContextClassLoader();
@ -94,6 +114,8 @@ class CompactObjectInputStream extends ObjectInputStream {
} else { } else {
clazz = Class.forName(className); clazz = Class.forName(className);
} }
classCache.put(className, clazz);
return clazz; return clazz;
} }
} }

View File

@ -167,6 +167,17 @@ public abstract class CIDR implements Comparable<CIDR>
*/ */
public abstract boolean contains(InetAddress inetAddress); public abstract boolean contains(InetAddress inetAddress);
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object arg0) {
if (!(arg0 instanceof CIDR)) {
return false;
}
return (this.compareTo((CIDR) arg0) == 0);
}
/** Convert an IPv4 or IPv6 textual representation into an /** Convert an IPv4 or IPv6 textual representation into an
* InetAddress. * InetAddress.
* @param addr * @param addr

View File

@ -329,6 +329,8 @@ public class SslHandler extends FrameDecoder
ChannelHandlerContext ctx = this.ctx; ChannelHandlerContext ctx = this.ctx;
Channel channel = ctx.getChannel(); Channel channel = ctx.getChannel();
ChannelFuture handshakeFuture; ChannelFuture handshakeFuture;
Exception exception = null;
synchronized (handshakeLock) { synchronized (handshakeLock) {
if (handshaking) { if (handshaking) {
return this.handshakeFuture; return this.handshakeFuture;
@ -338,17 +340,24 @@ public class SslHandler extends FrameDecoder
engine.beginHandshake(); engine.beginHandshake();
runDelegatedTasks(); runDelegatedTasks();
handshakeFuture = this.handshakeFuture = future(channel); handshakeFuture = this.handshakeFuture = future(channel);
} catch (SSLException e) { } catch (Exception e) {
handshakeFuture = this.handshakeFuture = failedFuture(channel, e); handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
exception = e;
} }
} }
} }
if (exception == null) { // Began handshake successfully.
try { try {
wrapNonAppData(ctx, channel); wrapNonAppData(ctx, channel);
} catch (SSLException e) { } catch (SSLException e) {
fireExceptionCaught(ctx, e);
handshakeFuture.setFailure(e); handshakeFuture.setFailure(e);
} }
} else { // Failed to initiate handshake.
fireExceptionCaught(ctx, exception);
}
return handshakeFuture; return handshakeFuture;
} }
@ -363,6 +372,7 @@ public class SslHandler extends FrameDecoder
engine.closeOutbound(); engine.closeOutbound();
return wrapNonAppData(ctx, channel); return wrapNonAppData(ctx, channel);
} catch (SSLException e) { } catch (SSLException e) {
fireExceptionCaught(ctx, e);
return failedFuture(channel, e); return failedFuture(channel, e);
} }
} }
@ -659,6 +669,11 @@ public class SslHandler extends FrameDecoder
channel, future, msg, channel.getRemoteAddress()); channel, future, msg, channel.getRemoteAddress());
offerEncryptedWriteRequest(encryptedWrite); offerEncryptedWriteRequest(encryptedWrite);
offered = true; offered = true;
} else if (result.getStatus() == Status.CLOSED) {
// SSLEngine has been closed already.
// Any further write attempts should be denied.
success = false;
break;
} else { } else {
final HandshakeStatus handshakeStatus = result.getHandshakeStatus(); final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
handleRenegotiation(handshakeStatus); handleRenegotiation(handshakeStatus);
@ -1072,6 +1087,8 @@ public class SslHandler extends FrameDecoder
public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception { public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) { if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
Channels.close(context, e.getFuture()); Channels.close(context, e.getFuture());
} else {
e.getFuture().setSuccess();
} }
} }
} }

View File

@ -23,6 +23,12 @@ import java.io.PushbackInputStream;
/** /**
* A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
* chunk. * chunk.
* <p>
* Please note that the {@link InputStream} instance that feeds data into
* {@link ChunkedStream} must implement {@link InputStream#available()} as
* accurately as possible, rather than using the default implementation.
* Otherwise, {@link ChunkedStream} will generate many too small chunks or
* block unnecessarily often.
* *
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>

View File

@ -17,6 +17,7 @@ package org.jboss.netty.handler.stream;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
import java.nio.channels.ClosedChannelException;
import java.util.Queue; import java.util.Queue;
import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.buffer.ChannelBuffers;
@ -150,7 +151,10 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
private synchronized void discard(ChannelHandlerContext ctx) { private void discard(ChannelHandlerContext ctx) {
ClosedChannelException cause = null;
boolean fireExceptionCaught = false;
synchronized (this) {
for (;;) { for (;;) {
if (currentEvent == null) { if (currentEvent == null) {
currentEvent = queue.poll(); currentEvent = queue.poll();
@ -166,19 +170,24 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
Object m = currentEvent.getMessage(); Object m = currentEvent.getMessage();
if (m instanceof ChunkedInput) { if (m instanceof ChunkedInput) {
closeInput((ChunkedInput) m); closeInput((ChunkedInput) m);
}
// Trigger a ClosedChannelException // Trigger a ClosedChannelException
Channels.write( if (cause == null) {
ctx, currentEvent.getFuture(), ChannelBuffers.EMPTY_BUFFER, cause = new ClosedChannelException();
currentEvent.getRemoteAddress());
} else {
// Trigger a ClosedChannelException
ctx.sendDownstream(currentEvent);
} }
currentEvent.getFuture().setFailure(cause);
fireExceptionCaught = true;
currentEvent = null; currentEvent = null;
} }
} }
if (fireExceptionCaught) {
Channels.fireExceptionCaught(ctx.getChannel(), cause);
}
}
private synchronized void flush(ChannelHandlerContext ctx) throws Exception { private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
final Channel channel = ctx.getChannel(); final Channel channel = ctx.getChannel();
if (!channel.isConnected()) { if (!channel.isConnected()) {

View File

@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
@ -73,14 +75,16 @@ import org.jboss.netty.util.TimerTask;
* public class MyPipelineFactory implements {@link ChannelPipelineFactory} { * public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
* *
* private final {@link Timer} timer; * private final {@link Timer} timer;
* private final {@link ChannelHandler} idleStateHandler;
* *
* public MyPipelineFactory({@link Timer} timer) { * public MyPipelineFactory({@link Timer} timer) {
* this.timer = timer; * this.timer = timer;
* this.idleStateHandler = <b>new {@link IdleStateHandler}(timer, 60, 30, 0), // timer must be shared.</b>
* } * }
* *
* public {@link ChannelPipeline} getPipeline() { * public {@link ChannelPipeline} getPipeline() {
* return {@link Channels}.pipeline( * return {@link Channels}.pipeline(
* <b>new {@link IdleStateHandler}(timer, 60, 30, 0), // timer must be shared.</b> * idleStateHandler,
* new MyHandler()); * new MyHandler());
* } * }
* } * }
@ -120,6 +124,7 @@ import org.jboss.netty.util.TimerTask;
* @apiviz.uses org.jboss.netty.util.HashedWheelTimer * @apiviz.uses org.jboss.netty.util.HashedWheelTimer
* @apiviz.has org.jboss.netty.handler.timeout.IdleStateEvent oneway - - triggers * @apiviz.has org.jboss.netty.handler.timeout.IdleStateEvent oneway - - triggers
*/ */
@Sharable
public class IdleStateHandler extends SimpleChannelUpstreamHandler public class IdleStateHandler extends SimpleChannelUpstreamHandler
implements LifeCycleAwareChannelHandler, implements LifeCycleAwareChannelHandler,
ExternalResourceReleasable { ExternalResourceReleasable {
@ -127,15 +132,8 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
final Timer timer; final Timer timer;
final long readerIdleTimeMillis; final long readerIdleTimeMillis;
volatile Timeout readerIdleTimeout;
volatile long lastReadTime;
final long writerIdleTimeMillis; final long writerIdleTimeMillis;
volatile Timeout writerIdleTimeout;
volatile long lastWriteTime;
final long allIdleTimeMillis; final long allIdleTimeMillis;
volatile Timeout allIdleTimeout;
/** /**
* Creates a new instance. * Creates a new instance.
@ -249,7 +247,7 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
destroy(); destroy(ctx);
} }
@Override @Override
@ -270,14 +268,15 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
destroy(); destroy(ctx);
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception { throws Exception {
lastReadTime = System.currentTimeMillis(); State state = (State) ctx.getAttachment();
state.lastReadTime = System.currentTimeMillis();
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
@ -285,42 +284,47 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
throws Exception { throws Exception {
if (e.getWrittenAmount() > 0) { if (e.getWrittenAmount() > 0) {
lastWriteTime = System.currentTimeMillis(); State state = (State) ctx.getAttachment();
state.lastWriteTime = System.currentTimeMillis();
} }
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
private void initialize(ChannelHandlerContext ctx) { private void initialize(ChannelHandlerContext ctx) {
lastReadTime = lastWriteTime = System.currentTimeMillis(); State state = new State();
ctx.setAttachment(state);
state.lastReadTime = state.lastWriteTime = System.currentTimeMillis();
if (readerIdleTimeMillis > 0) { if (readerIdleTimeMillis > 0) {
readerIdleTimeout = timer.newTimeout( state.readerIdleTimeout = timer.newTimeout(
new ReaderIdleTimeoutTask(ctx), new ReaderIdleTimeoutTask(ctx),
readerIdleTimeMillis, TimeUnit.MILLISECONDS); readerIdleTimeMillis, TimeUnit.MILLISECONDS);
} }
if (writerIdleTimeMillis > 0) { if (writerIdleTimeMillis > 0) {
writerIdleTimeout = timer.newTimeout( state.writerIdleTimeout = timer.newTimeout(
new WriterIdleTimeoutTask(ctx), new WriterIdleTimeoutTask(ctx),
writerIdleTimeMillis, TimeUnit.MILLISECONDS); writerIdleTimeMillis, TimeUnit.MILLISECONDS);
} }
if (allIdleTimeMillis > 0) { if (allIdleTimeMillis > 0) {
allIdleTimeout = timer.newTimeout( state.allIdleTimeout = timer.newTimeout(
new AllIdleTimeoutTask(ctx), new AllIdleTimeoutTask(ctx),
allIdleTimeMillis, TimeUnit.MILLISECONDS); allIdleTimeMillis, TimeUnit.MILLISECONDS);
} }
} }
private void destroy() { private void destroy(ChannelHandlerContext ctx) {
if (readerIdleTimeout != null) { State state = (State) ctx.getAttachment();
readerIdleTimeout.cancel(); if (state.readerIdleTimeout != null) {
readerIdleTimeout = null; state.readerIdleTimeout.cancel();
state.readerIdleTimeout = null;
} }
if (writerIdleTimeout != null) { if (state.writerIdleTimeout != null) {
writerIdleTimeout.cancel(); state.writerIdleTimeout.cancel();
writerIdleTimeout = null; state.writerIdleTimeout = null;
} }
if (allIdleTimeout != null) { if (state.allIdleTimeout != null) {
allIdleTimeout.cancel(); state.allIdleTimeout.cancel();
allIdleTimeout = null; state.allIdleTimeout = null;
} }
} }
@ -343,12 +347,13 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long lastReadTime = IdleStateHandler.this.lastReadTime; long lastReadTime = state.lastReadTime;
long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime); long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback. // Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = state.readerIdleTimeout =
timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS); timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
try { try {
channelIdle(ctx, IdleState.READER_IDLE, lastReadTime); channelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
@ -357,7 +362,7 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
} }
} else { } else {
// Read occurred before the timeout - set a new timeout with shorter delay. // Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = state.readerIdleTimeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
@ -378,12 +383,13 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long lastWriteTime = IdleStateHandler.this.lastWriteTime; long lastWriteTime = state.lastWriteTime;
long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime); long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback. // Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = state.writerIdleTimeout =
timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS); timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
try { try {
channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime); channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
@ -392,7 +398,7 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
} }
} else { } else {
// Write occurred before the timeout - set a new timeout with shorter delay. // Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = state.writerIdleTimeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
@ -412,13 +418,14 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long lastIoTime = Math.max(lastReadTime, lastWriteTime); long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime);
long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime); long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and // Both reader and writer are idle - set a new timeout and
// notify the callback. // notify the callback.
allIdleTimeout = state.allIdleTimeout =
timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS); timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
try { try {
channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime); channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
@ -428,9 +435,23 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
} else { } else {
// Either read or write occurred before the timeout - set a new // Either read or write occurred before the timeout - set a new
// timeout with shorter delay. // timeout with shorter delay.
allIdleTimeout = state.allIdleTimeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
} }
private static final class State {
State() {
super();
}
volatile Timeout readerIdleTimeout;
volatile long lastReadTime;
volatile Timeout writerIdleTimeout;
volatile long lastWriteTime;
volatile Timeout allIdleTimeout;
}
} }

View File

@ -20,6 +20,8 @@ import static org.jboss.netty.channel.Channels.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
@ -42,15 +44,17 @@ import org.jboss.netty.util.TimerTask;
* public class MyPipelineFactory implements {@link ChannelPipelineFactory} { * public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
* *
* private final {@link Timer} timer; * private final {@link Timer} timer;
* private final {@link ChannelHandler} timeoutHandler;
* *
* public MyPipelineFactory({@link Timer} timer) { * public MyPipelineFactory({@link Timer} timer) {
* this.timer = timer; * this.timer = timer;
* this.timeoutHandler = <b>new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared.</b>
* } * }
* *
* public {@link ChannelPipeline} getPipeline() { * public {@link ChannelPipeline} getPipeline() {
* // An example configuration that implements 30-second read timeout: * // An example configuration that implements 30-second read timeout:
* return {@link Channels}.pipeline( * return {@link Channels}.pipeline(
* <b>new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared.</b> * timeoutHandler,
* new MyHandler()); * new MyHandler());
* } * }
* } * }
@ -77,6 +81,7 @@ import org.jboss.netty.util.TimerTask;
* @apiviz.uses org.jboss.netty.util.HashedWheelTimer * @apiviz.uses org.jboss.netty.util.HashedWheelTimer
* @apiviz.has org.jboss.netty.handler.timeout.TimeoutException oneway - - raises * @apiviz.has org.jboss.netty.handler.timeout.TimeoutException oneway - - raises
*/ */
@Sharable
public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
implements LifeCycleAwareChannelHandler, implements LifeCycleAwareChannelHandler,
ExternalResourceReleasable { ExternalResourceReleasable {
@ -85,9 +90,6 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
final Timer timer; final Timer timer;
final long timeoutMillis; final long timeoutMillis;
volatile Timeout timeout;
private volatile ReadTimeoutTask task;
volatile long lastReadTime;
/** /**
* Creates a new instance. * Creates a new instance.
@ -159,7 +161,7 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
destroy(); destroy(ctx);
} }
@Override @Override
@ -180,35 +182,32 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
destroy(); destroy(ctx);
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception { throws Exception {
updateLastReadTime(); State state = (State) ctx.getAttachment();
state.lastReadTime = System.currentTimeMillis();
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
private void initialize(ChannelHandlerContext ctx) { private void initialize(ChannelHandlerContext ctx) {
updateLastReadTime(); State state = new State();
task = new ReadTimeoutTask(ctx); ctx.setAttachment(state);
if (timeoutMillis > 0) { if (timeoutMillis > 0) {
timeout = timer.newTimeout(task, timeoutMillis, TimeUnit.MILLISECONDS); state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
} }
} }
private void updateLastReadTime() { private void destroy(ChannelHandlerContext ctx) {
lastReadTime = System.currentTimeMillis(); State state = (State) ctx.getAttachment();
if (state.timeout != null) {
state.timeout.cancel();
state.timeout = null;
} }
private void destroy() {
if (timeout != null) {
timeout.cancel();
}
timeout = null;
task = null;
} }
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
@ -233,22 +232,34 @@ public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long nextDelay = timeoutMillis - (currentTime - lastReadTime); long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback. // Read timed out - set a new timeout and notify the callback.
ReadTimeoutHandler.this.timeout = state.timeout =
timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS); timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
try { try {
// FIXME This should be called from an I/O thread.
// To be fixed in Netty 4.
readTimedOut(ctx); readTimedOut(ctx);
} catch (Throwable t) { } catch (Throwable t) {
fireExceptionCaught(ctx, t); fireExceptionCaught(ctx, t);
} }
} else { } else {
// Read occurred before the timeout - set a new timeout with shorter delay. // Read occurred before the timeout - set a new timeout with shorter delay.
ReadTimeoutHandler.this.timeout = state.timeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
} }
private static final class State {
volatile Timeout timeout;
volatile long lastReadTime = System.currentTimeMillis();
State() {
super();
}
}
} }

View File

@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -472,11 +473,15 @@ public class HashedWheelTimer implements Timer {
private final class HashedWheelTimeout implements Timeout { private final class HashedWheelTimeout implements Timeout {
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private final TimerTask task; private final TimerTask task;
final long deadline; final long deadline;
volatile int stopIndex; volatile int stopIndex;
volatile long remainingRounds; volatile long remainingRounds;
private volatile boolean cancelled; private final AtomicInteger state = new AtomicInteger(ST_INIT);
HashedWheelTimeout(TimerTask task, long deadline) { HashedWheelTimeout(TimerTask task, long deadline) {
this.task = task; this.task = task;
@ -495,28 +500,26 @@ public class HashedWheelTimer implements Timer {
@Override @Override
public void cancel() { public void cancel() {
if (isExpired()) { if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
// TODO return false
return; return;
} }
cancelled = true;
// Might be called more than once, but doesn't matter.
wheel[stopIndex].remove(this); wheel[stopIndex].remove(this);
} }
@Override @Override
public boolean isCancelled() { public boolean isCancelled() {
return cancelled; return state.get() == ST_CANCELLED;
} }
@Override @Override
public boolean isExpired() { public boolean isExpired() {
return cancelled || System.currentTimeMillis() > deadline; return state.get() != ST_INIT;
} }
public void expire() { public void expire() {
if (cancelled) { if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
return; return;
} }

View File

@ -200,6 +200,19 @@ public class ChannelBuffersTest {
assertSame(EMPTY_BUFFER, copiedBuffer(new ChannelBuffer[] { buffer(0), buffer(0) })); assertSame(EMPTY_BUFFER, copiedBuffer(new ChannelBuffer[] { buffer(0), buffer(0) }));
} }
@Test
public void testCompare2() {
assertTrue(ChannelBuffers.compare(
ChannelBuffers.wrappedBuffer(new byte[]{(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF}),
ChannelBuffers.wrappedBuffer(new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00}))
> 0);
assertTrue(ChannelBuffers.compare(
ChannelBuffers.wrappedBuffer(new byte[]{(byte) 0xFF}),
ChannelBuffers.wrappedBuffer(new byte[]{(byte) 0x00}))
> 0);
}
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void shouldDisallowNullEndian1() { public void shouldDisallowNullEndian1() {
buffer(null, 0); buffer(null, 0);

View File

@ -53,6 +53,7 @@ public class FakeChannelConfig implements SocketChannelConfig {
private ChannelPipelineFactory pipelineFactory = private ChannelPipelineFactory pipelineFactory =
new ChannelPipelineFactory() { new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(); return Channels.pipeline();
} }
@ -60,87 +61,108 @@ public class FakeChannelConfig implements SocketChannelConfig {
private int writeTimeout = 3000; private int writeTimeout = 3000;
@Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
return receiveBufferSize; return receiveBufferSize;
} }
@Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize; this.receiveBufferSize = receiveBufferSize;
} }
@Override
public int getSendBufferSize() { public int getSendBufferSize() {
return sendBufferSize; return sendBufferSize;
} }
@Override
public void setSendBufferSize(int sendBufferSize) { public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize; this.sendBufferSize = sendBufferSize;
} }
@Override
public int getSoLinger() { public int getSoLinger() {
return soLinger; return soLinger;
} }
@Override
public void setSoLinger(int soLinger) { public void setSoLinger(int soLinger) {
this.soLinger = soLinger; this.soLinger = soLinger;
} }
@Override
public int getTrafficClass() { public int getTrafficClass() {
return trafficClass; return trafficClass;
} }
@Override
public void setTrafficClass(int trafficClass) { public void setTrafficClass(int trafficClass) {
this.trafficClass = trafficClass; this.trafficClass = trafficClass;
} }
@Override
public boolean isKeepAlive() { public boolean isKeepAlive() {
return keepAlive; return keepAlive;
} }
@Override
public void setKeepAlive(boolean keepAlive) { public void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive; this.keepAlive = keepAlive;
} }
@Override
public boolean isReuseAddress() { public boolean isReuseAddress() {
return reuseAddress; return reuseAddress;
} }
@Override
public void setReuseAddress(boolean reuseAddress) { public void setReuseAddress(boolean reuseAddress) {
this.reuseAddress = reuseAddress; this.reuseAddress = reuseAddress;
} }
@Override
public boolean isTcpNoDelay() { public boolean isTcpNoDelay() {
return tcpNoDelay; return tcpNoDelay;
} }
@Override
public void setTcpNoDelay(boolean tcpNoDelay) { public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay; this.tcpNoDelay = tcpNoDelay;
} }
@Override
public void setPerformancePreferences(int connectionTime, int latency, public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) { int bandwidth) {
// do nothing // do nothing
} }
@Override
public ChannelBufferFactory getBufferFactory() { public ChannelBufferFactory getBufferFactory() {
return bufferFactory; return bufferFactory;
} }
@Override
public void setBufferFactory(ChannelBufferFactory bufferFactory) { public void setBufferFactory(ChannelBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
} }
@Override
public int getConnectTimeoutMillis() { public int getConnectTimeoutMillis() {
return connectTimeout; return connectTimeout;
} }
@Override
public void setConnectTimeoutMillis(int connectTimeoutMillis) { public void setConnectTimeoutMillis(int connectTimeoutMillis) {
connectTimeout = connectTimeoutMillis; connectTimeout = connectTimeoutMillis;
} }
@Override
public ChannelPipelineFactory getPipelineFactory() { public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory; return pipelineFactory;
} }
@Override
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) { public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
this.pipelineFactory = pipelineFactory; this.pipelineFactory = pipelineFactory;
} }
@ -153,6 +175,7 @@ public class FakeChannelConfig implements SocketChannelConfig {
writeTimeout = writeTimeoutMillis; writeTimeout = writeTimeoutMillis;
} }
@Override
public boolean setOption(String key, Object value) { public boolean setOption(String key, Object value) {
if (key.equals("pipelineFactory")) { if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value); setPipelineFactory((ChannelPipelineFactory) value);
@ -180,6 +203,7 @@ public class FakeChannelConfig implements SocketChannelConfig {
return true; return true;
} }
@Override
public void setOptions(Map<String, Object> options) { public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) { for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue()); setOption(e.getKey(), e.getValue());

View File

@ -31,6 +31,7 @@ public class FakeChannelSink extends AbstractChannelSink {
public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>(); public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>();
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception { throws Exception {
events.add(e); events.add(e);

View File

@ -36,6 +36,7 @@ public class FakeClientSocketChannelFactory implements
createdChannels = new ArrayList<FakeSocketChannel>(); createdChannels = new ArrayList<FakeSocketChannel>();
} }
@Override
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
FakeSocketChannel channel = FakeSocketChannel channel =
new FakeSocketChannel(null, this, pipeline, new FakeSocketChannel(null, this, pipeline,
@ -44,6 +45,7 @@ public class FakeClientSocketChannelFactory implements
return channel; return channel;
} }
@Override
public void releaseExternalResources() { public void releaseExternalResources() {
// nothing to do // nothing to do
} }

View File

@ -52,22 +52,27 @@ public class FakeServerSocketChannel extends AbstractChannel implements
super(null, factory, pipeline, sink); super(null, factory, pipeline, sink);
} }
@Override
public ServerSocketChannelConfig getConfig() { public ServerSocketChannelConfig getConfig() {
return config; return config;
} }
@Override
public InetSocketAddress getLocalAddress() { public InetSocketAddress getLocalAddress() {
return localAddress; return localAddress;
} }
@Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
return remoteAddress; return remoteAddress;
} }
@Override
public boolean isBound() { public boolean isBound() {
return bound; return bound;
} }
@Override
public boolean isConnected() { public boolean isConnected() {
return connected; return connected;
} }

View File

@ -43,30 +43,37 @@ public class FakeServerSocketChannelConfig extends DefaultChannelConfig
public ChannelBufferFactory bufferFactory = new HeapChannelBufferFactory(); public ChannelBufferFactory bufferFactory = new HeapChannelBufferFactory();
@Override
public int getBacklog() { public int getBacklog() {
return backlog; return backlog;
} }
@Override
public void setBacklog(int backlog) { public void setBacklog(int backlog) {
this.backlog = backlog; this.backlog = backlog;
} }
@Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
return receiveBufferSize; return receiveBufferSize;
} }
@Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize; this.receiveBufferSize = receiveBufferSize;
} }
@Override
public boolean isReuseAddress() { public boolean isReuseAddress() {
return reuseAddress; return reuseAddress;
} }
@Override
public void setReuseAddress(boolean reuseAddress) { public void setReuseAddress(boolean reuseAddress) {
this.reuseAddress = reuseAddress; this.reuseAddress = reuseAddress;
} }
@Override
public void setPerformancePreferences(int connectionTime, int latency, public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) { int bandwidth) {
// ignore // ignore

View File

@ -32,11 +32,13 @@ public class FakeServerSocketChannelFactory implements
public FakeServerSocketChannel createdChannel; public FakeServerSocketChannel createdChannel;
@Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
createdChannel = new FakeServerSocketChannel(this, pipeline, sink); createdChannel = new FakeServerSocketChannel(this, pipeline, sink);
return createdChannel; return createdChannel;
} }
@Override
public void releaseExternalResources() { public void releaseExternalResources() {
// nothing to do // nothing to do
} }

View File

@ -52,22 +52,27 @@ public class FakeSocketChannel extends AbstractChannel implements SocketChannel
this.sink = sink; this.sink = sink;
} }
@Override
public InetSocketAddress getLocalAddress() { public InetSocketAddress getLocalAddress() {
return localAddress; return localAddress;
} }
@Override
public SocketChannelConfig getConfig() { public SocketChannelConfig getConfig() {
return config; return config;
} }
@Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
return remoteAddress; return remoteAddress;
} }
@Override
public boolean isBound() { public boolean isBound() {
return bound; return bound;
} }
@Override
public boolean isConnected() { public boolean isConnected() {
return connected; return connected;
} }

View File

@ -73,7 +73,7 @@ public class HttpTunnelSoakTester {
final ChannelGroup channels; final ChannelGroup channels;
private final ExecutorService executor; final ExecutorService executor;
final ScheduledExecutorService scheduledExecutor; final ScheduledExecutorService scheduledExecutor;
@ -81,9 +81,9 @@ public class HttpTunnelSoakTester {
final DataSender s2cDataSender = new DataSender("S2C"); final DataSender s2cDataSender = new DataSender("S2C");
private DataVerifier c2sVerifier = new DataVerifier("C2S-Verifier"); final DataVerifier c2sVerifier = new DataVerifier("C2S-Verifier");
private DataVerifier s2cVerifier = new DataVerifier("S2C-Verifier"); final DataVerifier s2cVerifier = new DataVerifier("S2C-Verifier");
private static byte[] SEND_STREAM; private static byte[] SEND_STREAM;
@ -160,6 +160,7 @@ public class HttpTunnelSoakTester {
protected ChannelPipelineFactory createClientPipelineFactory() { protected ChannelPipelineFactory createClientPipelineFactory() {
return new ChannelPipelineFactory() { return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline(); ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("s2cVerifier", s2cVerifier); pipeline.addLast("s2cVerifier", s2cVerifier);
@ -173,6 +174,7 @@ public class HttpTunnelSoakTester {
protected ChannelPipelineFactory createServerPipelineFactory() { protected ChannelPipelineFactory createServerPipelineFactory() {
return new ChannelPipelineFactory() { return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline(); ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("c2sVerifier", c2sVerifier); pipeline.addLast("c2sVerifier", c2sVerifier);
@ -180,6 +182,7 @@ public class HttpTunnelSoakTester {
s2cDataSender)); s2cDataSender));
pipeline.addLast("sendStarter", pipeline.addLast("sendStarter",
new SimpleChannelUpstreamHandler() { new SimpleChannelUpstreamHandler() {
@Override
public void channelConnected( public void channelConnected(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception { ChannelStateEvent e) throws Exception {
@ -187,7 +190,7 @@ public class HttpTunnelSoakTester {
channels.add(childChannel); channels.add(childChannel);
s2cDataSender.setChannel(childChannel); s2cDataSender.setChannel(childChannel);
executor.execute(s2cDataSender); executor.execute(s2cDataSender);
}; }
}); });
return pipeline; return pipeline;
} }
@ -271,15 +274,15 @@ public class HttpTunnelSoakTester {
} }
HttpTunnelClientChannelConfig config = HttpTunnelClientChannelConfig config =
((HttpTunnelClientChannelConfig) clientChannelFuture (HttpTunnelClientChannelConfig) clientChannelFuture
.getChannel().getConfig()); .getChannel().getConfig();
config.setWriteBufferHighWaterMark(2 * 1024 * 1024); config.setWriteBufferHighWaterMark(2 * 1024 * 1024);
config.setWriteBufferLowWaterMark(1024 * 1024); config.setWriteBufferLowWaterMark(1024 * 1024);
return (SocketChannel) clientChannelFuture.getChannel(); return (SocketChannel) clientChannelFuture.getChannel();
} }
private ChannelBuffer createRandomSizeBuffer(AtomicInteger nextWriteByte) { ChannelBuffer createRandomSizeBuffer(AtomicInteger nextWriteByte) {
Random random = new Random(); Random random = new Random();
int arraySize = random.nextInt(MAX_WRITE_SIZE) + 1; int arraySize = random.nextInt(MAX_WRITE_SIZE) + 1;
@ -309,13 +312,13 @@ public class HttpTunnelSoakTester {
} }
private class DataVerifier extends SimpleChannelUpstreamHandler { private class DataVerifier extends SimpleChannelUpstreamHandler {
private String name; private final String name;
private int expectedNext = 0; private int expectedNext = 0;
private int verifiedBytes = 0; private int verifiedBytes = 0;
private CountDownLatch completionLatch = new CountDownLatch(1); private final CountDownLatch completionLatch = new CountDownLatch(1);
public DataVerifier(String name) { public DataVerifier(String name) {
this.name = name; this.name = name;
@ -376,7 +379,7 @@ public class HttpTunnelSoakTester {
private class DataSender implements Runnable { private class DataSender implements Runnable {
private AtomicReference<Channel> channel = private final AtomicReference<Channel> channel =
new AtomicReference<Channel>(); new AtomicReference<Channel>();
private long totalBytesSent = 0; private long totalBytesSent = 0;
@ -387,15 +390,15 @@ public class HttpTunnelSoakTester {
private boolean firstRun = true; private boolean firstRun = true;
private AtomicBoolean writeEnabled = new AtomicBoolean(true); private final AtomicBoolean writeEnabled = new AtomicBoolean(true);
private AtomicBoolean running = new AtomicBoolean(false); private final AtomicBoolean running = new AtomicBoolean(false);
private CountDownLatch finishLatch = new CountDownLatch(1); private final CountDownLatch finishLatch = new CountDownLatch(1);
private String name; private final String name;
private AtomicInteger nextWriteByte = new AtomicInteger(0); private final AtomicInteger nextWriteByte = new AtomicInteger(0);
public DataSender(String name) { public DataSender(String name) {
this.name = name; this.name = name;
@ -407,7 +410,7 @@ public class HttpTunnelSoakTester {
public void setWriteEnabled(boolean enabled) { public void setWriteEnabled(boolean enabled) {
writeEnabled.set(enabled); writeEnabled.set(enabled);
if (enabled && !this.isRunning() && finishLatch.getCount() > 0) { if (enabled && !isRunning() && finishLatch.getCount() > 0) {
executor.execute(this); executor.execute(this);
} }
} }

View File

@ -102,6 +102,7 @@ public class HttpTunnelTest {
clientCaptureHandler = new ClientEndHandler(); clientCaptureHandler = new ClientEndHandler();
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() { clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline(); ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("clientCapture", clientCaptureHandler); pipeline.addLast("clientCapture", clientCaptureHandler);
@ -117,6 +118,7 @@ public class HttpTunnelTest {
connectionCaptureHandler = new ServerEndHandler(); connectionCaptureHandler = new ServerEndHandler();
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline(); ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("capture", connectionCaptureHandler); pipeline.addLast("capture", connectionCaptureHandler);

View File

@ -37,23 +37,28 @@ public class MockChannelStateListener implements HttpTunnelClientWorkerOwner {
public String serverHostName = null; public String serverHostName = null;
@Override
public void fullyEstablished() { public void fullyEstablished() {
fullyEstablished = true; fullyEstablished = true;
} }
@Override
public void onConnectRequest(ChannelFuture connectFuture, public void onConnectRequest(ChannelFuture connectFuture,
InetSocketAddress remoteAddress) { InetSocketAddress remoteAddress) {
// not relevant for test // not relevant for test
} }
@Override
public void onMessageReceived(ChannelBuffer content) { public void onMessageReceived(ChannelBuffer content) {
messages.add(content); messages.add(content);
} }
@Override
public void onTunnelOpened(String tunnelId) { public void onTunnelOpened(String tunnelId) {
this.tunnelId = tunnelId; this.tunnelId = tunnelId;
} }
@Override
public String getServerHostName() { public String getServerHostName() {
return serverHostName; return serverHostName;
} }

View File

@ -28,11 +28,13 @@ import org.jboss.netty.channel.ChannelUpstreamHandler;
public class NullChannelHandler implements ChannelUpstreamHandler, public class NullChannelHandler implements ChannelUpstreamHandler,
ChannelDownstreamHandler { ChannelDownstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception { throws Exception {
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception { throws Exception {
ctx.sendDownstream(e); ctx.sendDownstream(e);

View File

@ -16,7 +16,7 @@
package org.jboss.netty.channel.socket.http; package org.jboss.netty.channel.socket.http;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -55,7 +55,7 @@ public class ServerMessageSwitchTest {
private FakeSocketChannel requesterChannel; private FakeSocketChannel requesterChannel;
private HttpTunnelAcceptedChannelReceiver htunAcceptedChannel; HttpTunnelAcceptedChannelReceiver htunAcceptedChannel;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {

View File

@ -33,6 +33,7 @@ public class UpstreamEventCatcher implements ChannelUpstreamHandler {
public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>(); public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception { throws Exception {
events.add(e); events.add(e);

View File

@ -0,0 +1,49 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.frame;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.embedder.CodecEmbedderException;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
import org.jboss.netty.util.CharsetUtil;
import org.junit.Assert;
import org.junit.Test;
/**
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*/
public class DelimiterBasedFrameDecoderTest {
@Test
public void testTooLongFrameRecovery() throws Exception {
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>(
new DelimiterBasedFrameDecoder(1, Delimiters.nulDelimiter()));
for (int i = 0; i < 2; i ++) {
try {
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2, 0 }));
Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised.");
} catch (CodecEmbedderException e) {
Assert.assertTrue(e.getCause() instanceof TooLongFrameException);
// Expected
}
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 0 }));
ChannelBuffer buf = embedder.poll();
Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.frame;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.embedder.CodecEmbedderException;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
import org.jboss.netty.util.CharsetUtil;
import org.junit.Assert;
import org.junit.Test;
/**
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*/
public class LengthFieldBasedFrameDecoderTest {
@Test
public void testTooLongFrameRecovery() throws Exception {
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>(
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4));
for (int i = 0; i < 2; i ++) {
try {
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2, 0, 0 }));
Assert.fail(CodecEmbedderException.class.getSimpleName() + " must be raised.");
} catch (CodecEmbedderException e) {
Assert.assertTrue(e.getCause() instanceof TooLongFrameException);
// Expected
}
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' }));
ChannelBuffer buf = embedder.poll();
Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
}
}
}

View File

@ -15,7 +15,12 @@
*/ */
package org.jboss.netty.handler.codec.http; package org.jboss.netty.handler.codec.http;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Date; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
@ -33,7 +38,7 @@ public class CookieDecoderTest {
@Test @Test
public void testDecodingSingleCookieV0() { public void testDecodingSingleCookieV0() {
String cookieString = "myCookie=myValue;expires=XXX;path=/apathsomewhere;domain=.adomainsomewhere;secure;"; String cookieString = "myCookie=myValue;expires=XXX;path=/apathsomewhere;domain=.adomainsomewhere;secure;";
cookieString = cookieString.replace("XXX", new CookieDateFormat().format(new Date(System.currentTimeMillis() + 50000))); cookieString = cookieString.replace("XXX", new HttpHeaderDateFormat().format(new Date(System.currentTimeMillis() + 50000)));
CookieDecoder cookieDecoder = new CookieDecoder(); CookieDecoder cookieDecoder = new CookieDecoder();
Set<Cookie> cookies = cookieDecoder.decode(cookieString); Set<Cookie> cookies = cookieDecoder.decode(cookieString);

View File

@ -15,7 +15,9 @@
*/ */
package org.jboss.netty.handler.codec.http; package org.jboss.netty.handler.codec.http;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNotNull;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.Date; import java.util.Date;
@ -33,7 +35,7 @@ public class CookieEncoderTest {
@Test @Test
public void testEncodingSingleCookieV0() { public void testEncodingSingleCookieV0() {
String result = "myCookie=myValue;Expires=XXX;Path=/apathsomewhere;Domain=.adomainsomewhere;Secure"; String result = "myCookie=myValue;Expires=XXX;Path=/apathsomewhere;Domain=.adomainsomewhere;Secure";
DateFormat df = new CookieDateFormat(); DateFormat df = new HttpHeaderDateFormat();
Cookie cookie = new DefaultCookie("myCookie", "myValue"); Cookie cookie = new DefaultCookie("myCookie", "myValue");
CookieEncoder encoder = new CookieEncoder(true); CookieEncoder encoder = new CookieEncoder(true);
encoder.addCookie(cookie); encoder.addCookie(cookie);
@ -131,4 +133,16 @@ public class CookieEncoderTest {
String encodedCookie = encoder.encode(); String encodedCookie = encoder.encode();
assertEquals(c1 + c2 + c3, encodedCookie); assertEquals(c1 + c2 + c3, encodedCookie);
} }
@Test
public void testEncodingWithNoCookies() {
CookieEncoder encoderForServer = new CookieEncoder(true);
String encodedCookie1 = encoderForServer.encode();
CookieEncoder encoderForClient = new CookieEncoder(false);
String encodedCookie2 = encoderForClient.encode();
assertNotNull(encodedCookie1);
assertNotNull(encodedCookie2);
}
} }

View File

@ -0,0 +1,58 @@
package org.jboss.netty.handler.codec.http;
import java.text.ParseException;
import java.util.Date;
import junit.framework.Assert;
import org.junit.Test;
public class HttpHeaderDateFormatTest {
/**
* This date is set at "06 Nov 1994 08:49:37 GMT" (same used in example in
* RFC documentation)
* <p>
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html
*/
private static final Date DATE = new Date(784111777000L);
@Test
public void testParse() throws ParseException {
HttpHeaderDateFormat format = new HttpHeaderDateFormat();
{
final Date parsed = format.parse("Sun, 6 Nov 1994 08:49:37 GMT");
Assert.assertNotNull(parsed);
Assert.assertEquals(DATE, parsed);
}
{
final Date parsed = format.parse("Sun, 06 Nov 1994 08:49:37 GMT");
Assert.assertNotNull(parsed);
Assert.assertEquals(DATE, parsed);
}
{
final Date parsed = format.parse("Sunday, 06-Nov-94 08:49:37 GMT");
Assert.assertNotNull(parsed);
Assert.assertEquals(DATE, parsed);
}
{
final Date parsed = format.parse("Sunday, 6-Nov-94 08:49:37 GMT");
Assert.assertNotNull(parsed);
Assert.assertEquals(DATE, parsed);
}
{
final Date parsed = format.parse("Sun Nov 6 08:49:37 1994");
Assert.assertNotNull(parsed);
Assert.assertEquals(DATE, parsed);
}
}
@Test
public void testFormat() {
HttpHeaderDateFormat format = new HttpHeaderDateFormat();
final String formatted = format.format(DATE);
Assert.assertNotNull(formatted);
Assert.assertEquals("Sun, 06 Nov 1994 08:49:37 GMT", formatted);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.serialization;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
/**
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*/
public class CompactObjectSerializationTest {
@Test
public void testInterfaceSerialization() throws Exception {
PipedOutputStream pipeOut = new PipedOutputStream();
PipedInputStream pipeIn = new PipedInputStream(pipeOut);
CompactObjectOutputStream out = new CompactObjectOutputStream(pipeOut);
CompactObjectInputStream in = new CompactObjectInputStream(pipeIn);
out.writeObject(List.class);
Assert.assertSame(List.class, in.readObject());
}
}