Fixed various compiler warnings

* Missing @Override annotations
* Indirect access to a private member from inner classes
* Incorrect @see javadoc tags (should not use the @link tag)
* ..
This commit is contained in:
Trustin Lee 2011-05-03 11:05:06 +09:00
parent a722f64991
commit e1869db913
28 changed files with 208 additions and 53 deletions

View File

@ -41,6 +41,7 @@ class AcceptedServerChannelPipelineFactory implements ChannelPipelineFactory {
this.messageSwitch = messageSwitch;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

View File

@ -47,6 +47,7 @@ class ChannelFutureAggregator implements ChannelFutureListener {
future.addListener(this);
}
@Override
public synchronized void operationComplete(ChannelFuture future)
throws Exception {
if (future.isCancelled()) {

View File

@ -39,6 +39,7 @@ public class DefaultTunnelIdGenerator implements TunnelIdGenerator {
this.generator = generator;
}
@Override
public synchronized String generateId() {
// synchronized to ensure that this code is thread safe. The Sun
// standard implementations seem to be synchronized or lock free

View File

@ -62,36 +62,44 @@ class HttpTunnelAcceptedChannel extends AbstractChannel implements
fireChannelConnected(this, getRemoteAddress());
}
@Override
public SocketChannelConfig getConfig() {
return config;
}
@Override
public InetSocketAddress getLocalAddress() {
return ((HttpTunnelServerChannel) getParent()).getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public boolean isBound() {
return sink.isActive();
}
@Override
public boolean isConnected() {
return sink.isActive();
}
@Override
public void clientClosed() {
this.setClosed();
Channels.fireChannelClosed(this);
}
@Override
public void dataReceived(ChannelBuffer data) {
Channels.fireMessageReceived(this, data);
}
@Override
public void updateInterestOps(SaturationStateChange transition) {
switch (transition) {
case SATURATED:

View File

@ -39,15 +39,15 @@ import org.jboss.netty.channel.MessageEvent;
*/
class HttpTunnelAcceptedChannelSink extends AbstractChannelSink {
private final SaturationManager saturationManager;
final SaturationManager saturationManager;
private final ServerMessageSwitchDownstreamInterface messageSwitch;
private final String tunnelId;
private AtomicBoolean active = new AtomicBoolean(false);
private final AtomicBoolean active = new AtomicBoolean(false);
private HttpTunnelAcceptedChannelConfig config;
private final HttpTunnelAcceptedChannelConfig config;
public HttpTunnelAcceptedChannelSink(
ServerMessageSwitchDownstreamInterface messageSwitch,
@ -55,11 +55,12 @@ class HttpTunnelAcceptedChannelSink extends AbstractChannelSink {
this.messageSwitch = messageSwitch;
this.tunnelId = tunnelId;
this.config = config;
this.saturationManager =
saturationManager =
new SaturationManager(config.getWriteBufferLowWaterMark(),
config.getWriteBufferHighWaterMark());
}
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception {
if (e instanceof MessageEvent) {

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.http;
import org.jboss.netty.channel.DefaultChannelConfig;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
/**
* Configuration for HTTP tunnels. Where possible, properties set on this configuration will
@ -80,18 +81,18 @@ public abstract class HttpTunnelChannelConfig extends DefaultChannelConfig
}
/**
* Similarly to {@link org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferHighWaterMark(int)
* Similarly to {@link org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferHighWaterMark(int)
* NioSocketChannelConfig.setWriteBufferHighWaterMark()},
* the high water mark refers to the buffer size at which a user of the channel should stop writing. When the
* number of queued bytes exceeds the high water mark, {@link org.jboss.netty.channel.Channel#isWritable() Channel.isWritable()} will
* return false. Once the number of queued bytes falls below the {@link #setWriteBufferLowWaterMark(int) low water mark},
* {@link org.jboss.netty.channel.Channel#isWritable() Channel.isWritable()} will return true again, indicating that the client
* {@link org.jboss.netty.channel.Channel#isWritable() Channel.isWritable()} will return true again, indicating that the client
* can begin to send more data.
*
*
* @param level the number of queued bytes required to flip {@link org.jboss.netty.channel.Channel#isWritable()} to
* false.
*
* @see {@link org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferHighWaterMark(int) NioSocketChannelConfig.setWriteBufferHighWaterMark()}
*
* @see NioSocketChannelConfig#setWriteBufferHighWaterMark(int)
*/
public void setWriteBufferHighWaterMark(int level) {
if (level <= writeBufferLowWaterMark) {
@ -117,11 +118,11 @@ public abstract class HttpTunnelChannelConfig extends DefaultChannelConfig
/**
* The low water mark refers to the "safe" size of the queued byte buffer at which more data can be enqueued. When
* the {@link #setWriteBufferHighWaterMark(int) high water mark} is exceeded, {@link org.jboss.netty.channel.Channel#isWritable() Channel.isWriteable()}
* the {@link #setWriteBufferHighWaterMark(int) high water mark} is exceeded, {@link org.jboss.netty.channel.Channel#isWritable() Channel.isWriteable()}
* will return false until the buffer drops below this level. By creating a sufficient gap between the high and low
* water marks, rapid oscillation between "write enabled" and "write disabled" can be avoided.
*
* @see {@link org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferLowWaterMark(int) NioSocketChannelConfig.setWriteBufferLowWaterMark()}
*
* @see org.jboss.netty.channel.socket.nio.NioSocketChannelConfig#setWriteBufferLowWaterMark(int)
*/
public void setWriteBufferLowWaterMark(int level) {
if (level >= writeBufferHighWaterMark) {

View File

@ -48,33 +48,33 @@ import org.jboss.netty.logging.InternalLoggerFactory;
public class HttpTunnelClientChannel extends AbstractChannel implements
SocketChannel {
private static final InternalLogger LOG = InternalLoggerFactory
static final InternalLogger LOG = InternalLoggerFactory
.getInstance(HttpTunnelClientChannel.class);
private final HttpTunnelClientChannelConfig config;
private final SocketChannel sendChannel;
final SocketChannel sendChannel;
private final SocketChannel pollChannel;
final SocketChannel pollChannel;
private volatile String tunnelId;
volatile String tunnelId;
private volatile ChannelFuture connectFuture;
volatile ChannelFuture connectFuture;
private volatile boolean connected;
volatile boolean connected;
private volatile boolean bound;
volatile boolean bound;
volatile InetSocketAddress serverAddress;
private volatile String serverHostName;
volatile String serverHostName;
private final WorkerCallbacks callbackProxy;
private final SaturationManager saturationManager;
/**
* @see {@link HttpTunnelClientChannelFactory#newChannel(ChannelPipeline)}
* @see HttpTunnelClientChannelFactory#newChannel(ChannelPipeline)
*/
protected HttpTunnelClientChannel(ChannelFactory factory,
ChannelPipeline pipeline, HttpTunnelClientChannelSink sink,
@ -82,7 +82,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
ChannelGroup realConnections) {
super(null, factory, pipeline, sink);
this.callbackProxy = new WorkerCallbacks();
callbackProxy = new WorkerCallbacks();
sendChannel = outboundFactory.newChannel(createSendPipeline());
pollChannel = outboundFactory.newChannel(createPollPipeline());
@ -100,26 +100,36 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
Channels.fireChannelOpen(this);
}
@Override
public HttpTunnelClientChannelConfig getConfig() {
return config;
}
@Override
public boolean isBound() {
return bound;
}
@Override
public boolean isConnected() {
return connected;
}
@Override
public InetSocketAddress getLocalAddress() {
return sendChannel.getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress() {
return serverAddress;
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
void onConnectRequest(ChannelFuture connectFuture,
InetSocketAddress remoteAddress) {
this.connectFuture = connectFuture;
@ -146,6 +156,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
pollChannel.disconnect().addListener(disconnectListener);
disconnectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
serverAddress = null;
@ -214,7 +225,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
return pipeline;
}
private void setTunnelIdForPollChannel() {
void setTunnelIdForPollChannel() {
HttpTunnelClientPollHandler pollHandler =
pollChannel.getPipeline()
.get(HttpTunnelClientPollHandler.class);
@ -230,6 +241,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
updateSaturationStatus(messageSize);
Channels.write(sendChannel, e.getMessage()).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
@ -242,7 +254,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
});
}
private void updateSaturationStatus(int queueSizeDelta) {
void updateSaturationStatus(int queueSizeDelta) {
SaturationStateChange transition =
saturationManager.queueSizeChanged(queueSizeDelta);
switch (transition) {
@ -279,6 +291,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
eventsLeft = new AtomicInteger(numToConsolidate);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
futureFailed(future);
@ -315,7 +328,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
protected void futureFailed(ChannelFuture future) {
LOG.warn("Failed to close one of the child channels of tunnel " +
tunnelId);
HttpTunnelClientChannel.this.setClosed();
setClosed();
}
@Override
@ -323,7 +336,7 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
if (LOG.isDebugEnabled()) {
LOG.debug("Tunnel " + tunnelId + " closed");
}
HttpTunnelClientChannel.this.setClosed();
setClosed();
}
}
@ -332,20 +345,23 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
* Contains the implementing methods of HttpTunnelClientWorkerOwner, so that these are hidden
* from the public API.
*/
private class WorkerCallbacks implements HttpTunnelClientWorkerOwner {
class WorkerCallbacks implements HttpTunnelClientWorkerOwner {
@Override
public void onConnectRequest(ChannelFuture connectFuture,
InetSocketAddress remoteAddress) {
HttpTunnelClientChannel.this.onConnectRequest(connectFuture,
remoteAddress);
}
@Override
public void onTunnelOpened(String tunnelId) {
HttpTunnelClientChannel.this.tunnelId = tunnelId;
setTunnelIdForPollChannel();
Channels.connect(pollChannel, sendChannel.getRemoteAddress());
}
@Override
public void fullyEstablished() {
if (!bound) {
bound = true;
@ -359,10 +375,12 @@ public class HttpTunnelClientChannel extends AbstractChannel implements
getRemoteAddress());
}
@Override
public void onMessageReceived(ChannelBuffer content) {
Channels.fireMessageReceived(HttpTunnelClientChannel.this, content);
}
@Override
public String getServerHostName() {
if (serverHostName == null) {
serverHostName =

View File

@ -31,8 +31,8 @@ import org.jboss.netty.channel.socket.SocketChannelConfig;
* <th>Name</th><th>Associated setter method</th>
* </tr>
* <tr><td>{@code "proxyAddress"}</td><td>{@link #setProxyAddress(SocketAddress)}</td></tr>
* <tr><td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(long)}</td></tr>
* <tr><td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(long)}</td></tr>
* <tr><td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(int)}</td></tr>
* <tr><td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(int)}</td></tr>
* </table>
*
* @author The Netty Project (netty-dev@lists.jboss.org)
@ -88,39 +88,48 @@ public class HttpTunnelClientChannelConfig extends HttpTunnelChannelConfig {
/* GENERIC SOCKET CHANNEL CONFIGURATION */
@Override
public int getReceiveBufferSize() {
return pollChannelConfig.getReceiveBufferSize();
}
@Override
public int getSendBufferSize() {
return pollChannelConfig.getSendBufferSize();
}
@Override
public int getSoLinger() {
return pollChannelConfig.getSoLinger();
}
@Override
public int getTrafficClass() {
return pollChannelConfig.getTrafficClass();
}
@Override
public boolean isKeepAlive() {
return pollChannelConfig.isKeepAlive();
}
@Override
public boolean isReuseAddress() {
return pollChannelConfig.isReuseAddress();
}
@Override
public boolean isTcpNoDelay() {
return pollChannelConfig.isTcpNoDelay();
}
@Override
public void setKeepAlive(boolean keepAlive) {
pollChannelConfig.setKeepAlive(keepAlive);
sendChannelConfig.setKeepAlive(keepAlive);
}
@Override
public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) {
pollChannelConfig.setPerformancePreferences(connectionTime, latency,
@ -129,31 +138,37 @@ public class HttpTunnelClientChannelConfig extends HttpTunnelChannelConfig {
bandwidth);
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
pollChannelConfig.setReceiveBufferSize(receiveBufferSize);
sendChannelConfig.setReceiveBufferSize(receiveBufferSize);
}
@Override
public void setReuseAddress(boolean reuseAddress) {
pollChannelConfig.setReuseAddress(reuseAddress);
sendChannelConfig.setReuseAddress(reuseAddress);
}
@Override
public void setSendBufferSize(int sendBufferSize) {
pollChannelConfig.setSendBufferSize(sendBufferSize);
sendChannelConfig.setSendBufferSize(sendBufferSize);
}
@Override
public void setSoLinger(int soLinger) {
pollChannelConfig.setSoLinger(soLinger);
sendChannelConfig.setSoLinger(soLinger);
}
@Override
public void setTcpNoDelay(boolean tcpNoDelay) {
pollChannelConfig.setTcpNoDelay(true);
sendChannelConfig.setTcpNoDelay(true);
}
@Override
public void setTrafficClass(int trafficClass) {
pollChannelConfig.setTrafficClass(1);
sendChannelConfig.setTrafficClass(1);

View File

@ -41,11 +41,13 @@ public class HttpTunnelClientChannelFactory implements
this.factory = factory;
}
@Override
public HttpTunnelClientChannel newChannel(ChannelPipeline pipeline) {
return new HttpTunnelClientChannel(this, pipeline,
new HttpTunnelClientChannelSink(), factory, realConnections);
}
@Override
public void releaseExternalResources() {
realConnections.close().awaitUninterruptibly();
factory.releaseExternalResources();

View File

@ -33,6 +33,7 @@ import org.jboss.netty.channel.MessageEvent;
*/
class HttpTunnelClientChannelSink extends AbstractChannelSink {
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {

View File

@ -36,12 +36,13 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
private final ServerSocketChannel realChannel;
private final HttpTunnelServerChannelConfig config;
final HttpTunnelServerChannelConfig config;
private final ServerMessageSwitch messageSwitch;
final ServerMessageSwitch messageSwitch;
private final ChannelFutureListener CLOSE_FUTURE_PROXY =
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
HttpTunnelServerChannel.this.setClosed();
@ -62,31 +63,45 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
Channels.fireChannelOpen(this);
}
@Override
public ServerSocketChannelConfig getConfig() {
return config;
}
@Override
public InetSocketAddress getLocalAddress() {
return realChannel.getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress() {
// server channels never have a remote address
return null;
}
@Override
public boolean isBound() {
return realChannel.isBound();
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
/**
* Used to hide the newChannel method from the public API.
*/
private final class TunnelCreator implements
HttpTunnelAcceptedChannelFactory {
public HttpTunnelAcceptedChannelReceiver newChannel(String newTunnelId,
InetSocketAddress remoteAddress) {
TunnelCreator() {
super();
}
@Override
public HttpTunnelAcceptedChannelReceiver newChannel(
String newTunnelId, InetSocketAddress remoteAddress) {
ChannelPipeline childPipeline = null;
try {
childPipeline = getConfig().getPipelineFactory().getPipeline();
@ -103,6 +118,7 @@ public class HttpTunnelServerChannel extends AbstractServerChannel implements
getFactory(), childPipeline, sink, remoteAddress, config);
}
@Override
public String generateTunnelId() {
return config.getTunnelIdGenerator().generateId();
}

View File

@ -45,56 +45,69 @@ public class HttpTunnelServerChannelConfig implements ServerSocketChannelConfig
return realChannel.getConfig();
}
@Override
public int getBacklog() {
return getWrappedConfig().getBacklog();
}
@Override
public int getReceiveBufferSize() {
return getWrappedConfig().getReceiveBufferSize();
}
@Override
public boolean isReuseAddress() {
return getWrappedConfig().isReuseAddress();
}
@Override
public void setBacklog(int backlog) {
getWrappedConfig().setBacklog(backlog);
}
@Override
public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) {
getWrappedConfig().setPerformancePreferences(connectionTime, latency,
bandwidth);
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
getWrappedConfig().setReceiveBufferSize(receiveBufferSize);
}
@Override
public void setReuseAddress(boolean reuseAddress) {
getWrappedConfig().setReuseAddress(reuseAddress);
}
@Override
public ChannelBufferFactory getBufferFactory() {
return getWrappedConfig().getBufferFactory();
}
@Override
public int getConnectTimeoutMillis() {
return getWrappedConfig().getConnectTimeoutMillis();
}
@Override
public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory;
}
@Override
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
getWrappedConfig().setBufferFactory(bufferFactory);
}
@Override
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
getWrappedConfig().setConnectTimeoutMillis(connectTimeoutMillis);
}
@Override
public boolean setOption(String name, Object value) {
if (name.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
@ -107,12 +120,14 @@ public class HttpTunnelServerChannelConfig implements ServerSocketChannelConfig
}
}
@Override
public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue());
}
}
@Override
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
this.pipelineFactory = pipelineFactory;
}

View File

@ -40,6 +40,7 @@ public class HttpTunnelServerChannelFactory implements
realConnections = new DefaultChannelGroup();
}
@Override
public HttpTunnelServerChannel newChannel(ChannelPipeline pipeline) {
return new HttpTunnelServerChannel(this, pipeline);
}
@ -58,6 +59,7 @@ public class HttpTunnelServerChannelFactory implements
return newChannel;
}
@Override
public void releaseExternalResources() {
realConnections.close().awaitUninterruptibly();
realConnectionFactory.releaseExternalResources();

View File

@ -36,6 +36,7 @@ class HttpTunnelServerChannelSink extends AbstractChannelSink {
private ServerSocketChannel realChannel;
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception {
@ -67,6 +68,7 @@ class HttpTunnelServerChannelSink extends AbstractChannelSink {
this.upstreamFuture = upstreamFuture;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
upstreamFuture.setSuccess();

View File

@ -38,7 +38,7 @@ import org.jboss.netty.logging.InternalLoggerFactory;
* ends of the http tunnel and the virtual server accepted tunnel. As a tunnel can last for longer than
* the lifetime of the client channels that are used to service it, this layer of abstraction is
* necessary.
*
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Iain McGinniss (iain.mcginniss@onedrum.com)
* @author OneDrum Ltd.
@ -62,6 +62,7 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
tunnelsById = new ConcurrentHashMap<String, TunnelInfo>();
}
@Override
public String createTunnel(InetSocketAddress remoteAddress) {
String newTunnelId =
String.format("%s_%s", tunnelIdPrefix,
@ -74,11 +75,13 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
return newTunnelId;
}
@Override
public boolean isOpenTunnel(String tunnelId) {
TunnelInfo tunnel = tunnelsById.get(tunnelId);
return tunnel != null;
}
@Override
public void pollOutboundData(String tunnelId, Channel channel) {
TunnelInfo tunnel = tunnelsById.get(tunnelId);
if (tunnel == null) {
@ -136,6 +139,7 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
new RelayedChannelFutureListener(originalFuture));
}
@Override
public TunnelStatus routeInboundData(String tunnelId,
ChannelBuffer inboundData) {
TunnelInfo tunnel = tunnelsById.get(tunnelId);
@ -156,12 +160,14 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
return TunnelStatus.ALIVE;
}
@Override
public void clientCloseTunnel(String tunnelId) {
TunnelInfo tunnel = tunnelsById.get(tunnelId);
tunnel.localChannel.clientClosed();
tunnelsById.remove(tunnelId);
}
@Override
public void serverCloseTunnel(String tunnelId) {
TunnelInfo tunnel = tunnelsById.get(tunnelId);
tunnel.closing.set(true);
@ -179,6 +185,7 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
tunnelsById.remove(tunnelId);
}
@Override
public void routeOutboundData(String tunnelId, ChannelBuffer data,
ChannelFuture writeFuture) {
TunnelInfo tunnel = tunnelsById.get(tunnelId);
@ -217,10 +224,11 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
ChannelFutureListener {
private final ChannelFuture originalFuture;
private RelayedChannelFutureListener(ChannelFuture originalFuture) {
RelayedChannelFutureListener(ChannelFuture originalFuture) {
this.originalFuture = originalFuture;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
originalFuture.setSuccess();
@ -231,6 +239,10 @@ class ServerMessageSwitch implements ServerMessageSwitchUpstreamInterface,
}
private static final class TunnelInfo {
TunnelInfo() {
super();
}
public String tunnelId;
public HttpTunnelAcceptedChannelReceiver localChannel;

View File

@ -53,6 +53,7 @@ public class FakeChannelConfig implements SocketChannelConfig {
private ChannelPipelineFactory pipelineFactory =
new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline();
}
@ -60,87 +61,108 @@ public class FakeChannelConfig implements SocketChannelConfig {
private int writeTimeout = 3000;
@Override
public int getReceiveBufferSize() {
return receiveBufferSize;
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
@Override
public int getSendBufferSize() {
return sendBufferSize;
}
@Override
public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}
@Override
public int getSoLinger() {
return soLinger;
}
@Override
public void setSoLinger(int soLinger) {
this.soLinger = soLinger;
}
@Override
public int getTrafficClass() {
return trafficClass;
}
@Override
public void setTrafficClass(int trafficClass) {
this.trafficClass = trafficClass;
}
@Override
public boolean isKeepAlive() {
return keepAlive;
}
@Override
public void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}
@Override
public boolean isReuseAddress() {
return reuseAddress;
}
@Override
public void setReuseAddress(boolean reuseAddress) {
this.reuseAddress = reuseAddress;
}
@Override
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
@Override
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
@Override
public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) {
// do nothing
}
@Override
public ChannelBufferFactory getBufferFactory() {
return bufferFactory;
}
@Override
public void setBufferFactory(ChannelBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory;
}
@Override
public int getConnectTimeoutMillis() {
return connectTimeout;
}
@Override
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
connectTimeout = connectTimeoutMillis;
}
@Override
public ChannelPipelineFactory getPipelineFactory() {
return pipelineFactory;
}
@Override
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) {
this.pipelineFactory = pipelineFactory;
}
@ -153,6 +175,7 @@ public class FakeChannelConfig implements SocketChannelConfig {
writeTimeout = writeTimeoutMillis;
}
@Override
public boolean setOption(String key, Object value) {
if (key.equals("pipelineFactory")) {
setPipelineFactory((ChannelPipelineFactory) value);
@ -180,6 +203,7 @@ public class FakeChannelConfig implements SocketChannelConfig {
return true;
}
@Override
public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue());

View File

@ -31,6 +31,7 @@ public class FakeChannelSink extends AbstractChannelSink {
public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>();
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)
throws Exception {
events.add(e);

View File

@ -36,6 +36,7 @@ public class FakeClientSocketChannelFactory implements
createdChannels = new ArrayList<FakeSocketChannel>();
}
@Override
public SocketChannel newChannel(ChannelPipeline pipeline) {
FakeSocketChannel channel =
new FakeSocketChannel(null, this, pipeline,
@ -44,6 +45,7 @@ public class FakeClientSocketChannelFactory implements
return channel;
}
@Override
public void releaseExternalResources() {
// nothing to do
}

View File

@ -52,22 +52,27 @@ public class FakeServerSocketChannel extends AbstractChannel implements
super(null, factory, pipeline, sink);
}
@Override
public ServerSocketChannelConfig getConfig() {
return config;
}
@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
@Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public boolean isBound() {
return bound;
}
@Override
public boolean isConnected() {
return connected;
}

View File

@ -43,30 +43,37 @@ public class FakeServerSocketChannelConfig extends DefaultChannelConfig
public ChannelBufferFactory bufferFactory = new HeapChannelBufferFactory();
@Override
public int getBacklog() {
return backlog;
}
@Override
public void setBacklog(int backlog) {
this.backlog = backlog;
}
@Override
public int getReceiveBufferSize() {
return receiveBufferSize;
}
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
@Override
public boolean isReuseAddress() {
return reuseAddress;
}
@Override
public void setReuseAddress(boolean reuseAddress) {
this.reuseAddress = reuseAddress;
}
@Override
public void setPerformancePreferences(int connectionTime, int latency,
int bandwidth) {
// ignore

View File

@ -32,11 +32,13 @@ public class FakeServerSocketChannelFactory implements
public FakeServerSocketChannel createdChannel;
@Override
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
createdChannel = new FakeServerSocketChannel(this, pipeline, sink);
return createdChannel;
}
@Override
public void releaseExternalResources() {
// nothing to do
}

View File

@ -52,22 +52,27 @@ public class FakeSocketChannel extends AbstractChannel implements SocketChannel
this.sink = sink;
}
@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}
@Override
public SocketChannelConfig getConfig() {
return config;
}
@Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public boolean isBound() {
return bound;
}
@Override
public boolean isConnected() {
return connected;
}

View File

@ -73,7 +73,7 @@ public class HttpTunnelSoakTester {
final ChannelGroup channels;
private final ExecutorService executor;
final ExecutorService executor;
final ScheduledExecutorService scheduledExecutor;
@ -81,9 +81,9 @@ public class HttpTunnelSoakTester {
final DataSender s2cDataSender = new DataSender("S2C");
private DataVerifier c2sVerifier = new DataVerifier("C2S-Verifier");
final DataVerifier c2sVerifier = new DataVerifier("C2S-Verifier");
private DataVerifier s2cVerifier = new DataVerifier("S2C-Verifier");
final DataVerifier s2cVerifier = new DataVerifier("S2C-Verifier");
private static byte[] SEND_STREAM;
@ -160,6 +160,7 @@ public class HttpTunnelSoakTester {
protected ChannelPipelineFactory createClientPipelineFactory() {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("s2cVerifier", s2cVerifier);
@ -173,6 +174,7 @@ public class HttpTunnelSoakTester {
protected ChannelPipelineFactory createServerPipelineFactory() {
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("c2sVerifier", c2sVerifier);
@ -180,6 +182,7 @@ public class HttpTunnelSoakTester {
s2cDataSender));
pipeline.addLast("sendStarter",
new SimpleChannelUpstreamHandler() {
@Override
public void channelConnected(
ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
@ -187,7 +190,7 @@ public class HttpTunnelSoakTester {
channels.add(childChannel);
s2cDataSender.setChannel(childChannel);
executor.execute(s2cDataSender);
};
}
});
return pipeline;
}
@ -271,15 +274,15 @@ public class HttpTunnelSoakTester {
}
HttpTunnelClientChannelConfig config =
((HttpTunnelClientChannelConfig) clientChannelFuture
.getChannel().getConfig());
(HttpTunnelClientChannelConfig) clientChannelFuture
.getChannel().getConfig();
config.setWriteBufferHighWaterMark(2 * 1024 * 1024);
config.setWriteBufferLowWaterMark(1024 * 1024);
return (SocketChannel) clientChannelFuture.getChannel();
}
private ChannelBuffer createRandomSizeBuffer(AtomicInteger nextWriteByte) {
ChannelBuffer createRandomSizeBuffer(AtomicInteger nextWriteByte) {
Random random = new Random();
int arraySize = random.nextInt(MAX_WRITE_SIZE) + 1;
@ -309,13 +312,13 @@ public class HttpTunnelSoakTester {
}
private class DataVerifier extends SimpleChannelUpstreamHandler {
private String name;
private final String name;
private int expectedNext = 0;
private int verifiedBytes = 0;
private CountDownLatch completionLatch = new CountDownLatch(1);
private final CountDownLatch completionLatch = new CountDownLatch(1);
public DataVerifier(String name) {
this.name = name;
@ -376,7 +379,7 @@ public class HttpTunnelSoakTester {
private class DataSender implements Runnable {
private AtomicReference<Channel> channel =
private final AtomicReference<Channel> channel =
new AtomicReference<Channel>();
private long totalBytesSent = 0;
@ -387,15 +390,15 @@ public class HttpTunnelSoakTester {
private boolean firstRun = true;
private AtomicBoolean writeEnabled = new AtomicBoolean(true);
private final AtomicBoolean writeEnabled = new AtomicBoolean(true);
private AtomicBoolean running = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean(false);
private CountDownLatch finishLatch = new CountDownLatch(1);
private final CountDownLatch finishLatch = new CountDownLatch(1);
private String name;
private final String name;
private AtomicInteger nextWriteByte = new AtomicInteger(0);
private final AtomicInteger nextWriteByte = new AtomicInteger(0);
public DataSender(String name) {
this.name = name;
@ -407,7 +410,7 @@ public class HttpTunnelSoakTester {
public void setWriteEnabled(boolean enabled) {
writeEnabled.set(enabled);
if (enabled && !this.isRunning() && finishLatch.getCount() > 0) {
if (enabled && !isRunning() && finishLatch.getCount() > 0) {
executor.execute(this);
}
}

View File

@ -102,6 +102,7 @@ public class HttpTunnelTest {
clientCaptureHandler = new ClientEndHandler();
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("clientCapture", clientCaptureHandler);
@ -117,6 +118,7 @@ public class HttpTunnelTest {
connectionCaptureHandler = new ServerEndHandler();
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("capture", connectionCaptureHandler);

View File

@ -37,23 +37,28 @@ public class MockChannelStateListener implements HttpTunnelClientWorkerOwner {
public String serverHostName = null;
@Override
public void fullyEstablished() {
fullyEstablished = true;
}
@Override
public void onConnectRequest(ChannelFuture connectFuture,
InetSocketAddress remoteAddress) {
// not relevant for test
}
@Override
public void onMessageReceived(ChannelBuffer content) {
messages.add(content);
}
@Override
public void onTunnelOpened(String tunnelId) {
this.tunnelId = tunnelId;
}
@Override
public String getServerHostName() {
return serverHostName;
}

View File

@ -28,11 +28,13 @@ import org.jboss.netty.channel.ChannelUpstreamHandler;
public class NullChannelHandler implements ChannelUpstreamHandler,
ChannelDownstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
ctx.sendUpstream(e);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
ctx.sendDownstream(e);

View File

@ -16,7 +16,7 @@
package org.jboss.netty.channel.socket.http;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import java.net.InetSocketAddress;
@ -55,7 +55,7 @@ public class ServerMessageSwitchTest {
private FakeSocketChannel requesterChannel;
private HttpTunnelAcceptedChannelReceiver htunAcceptedChannel;
HttpTunnelAcceptedChannelReceiver htunAcceptedChannel;
@Before
public void setUp() throws Exception {

View File

@ -33,6 +33,7 @@ public class UpstreamEventCatcher implements ChannelUpstreamHandler {
public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
events.add(e);