Add proxy support for client socket connections

Related issue: #1133

Motivation:

There is no support for client socket connections via a proxy server in
Netty.

Modifications:

- Add a new module 'handler-proxy'
- Add ProxyHandler and its subclasses to support SOCKS 4a/5 and HTTP(S)
  proxy connections
- Add a full parameterized test for most scenarios
- Clean up pom.xml

Result:

A user can make an outgoing connection via proxy servers with only
trivial effort.
This commit is contained in:
Trustin Lee 2014-08-25 17:50:37 +09:00
parent e4ab108a10
commit de9c81bf6e
26 changed files with 2648 additions and 27 deletions

View File

@ -189,6 +189,13 @@
<scope>compile</scope> <scope>compile</scope>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>netty-transport</artifactId> <artifactId>netty-transport</artifactId>

View File

@ -34,11 +34,6 @@
<artifactId>netty-codec</artifactId> <artifactId>netty-codec</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.directory.server</groupId> <groupId>org.apache.directory.server</groupId>

View File

@ -38,6 +38,7 @@
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId> <artifactId>netty-handler</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jcraft</groupId> <groupId>com.jcraft</groupId>

View File

@ -34,11 +34,6 @@
<artifactId>netty-codec</artifactId> <artifactId>netty-codec</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -34,11 +34,7 @@
<artifactId>netty-codec</artifactId> <artifactId>netty-codec</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>

View File

@ -34,11 +34,6 @@
<artifactId>netty-codec</artifactId> <artifactId>netty-codec</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -57,7 +57,6 @@ public class Socks4CmdResponseDecoder extends ReplayingDecoder<State> {
msg = new Socks4CmdResponse(cmdStatus, host, port); msg = new Socks4CmdResponse(cmdStatus, host, port);
} }
} }
ctx.pipeline().remove(this);
out.add(msg); out.add(msg);
} }

View File

@ -83,7 +83,6 @@ public class Socks5CmdResponseDecoder extends ReplayingDecoder<State> {
} }
} }
} }
ctx.pipeline().remove(this);
out.add(msg); out.add(msg);
} }

View File

@ -34,11 +34,6 @@
<artifactId>netty-codec</artifactId> <artifactId>netty-codec</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -44,6 +44,11 @@
<artifactId>netty-handler</artifactId> <artifactId>netty-handler</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>netty-codec-http</artifactId> <artifactId>netty-codec-http</artifactId>

55
handler-proxy/pom.xml Normal file
View File

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>5.0.0.Alpha2-SNAPSHOT</version>
</parent>
<artifactId>netty-handler-proxy</artifactId>
<packaging>jar</packaging>
<name>Netty/Handler/Proxy</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-socks</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-handler</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,161 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public final class HttpProxyHandler extends ProxyHandler {
private static final String PROTOCOL = "http";
private static final String AUTH_BASIC = "basic";
private final HttpClientCodec codec = new HttpClientCodec();
private final String username;
private final String password;
private final CharSequence authorization;
private HttpResponseStatus status;
public HttpProxyHandler(SocketAddress proxyAddress) {
super(proxyAddress);
username = null;
password = null;
authorization = null;
}
public HttpProxyHandler(SocketAddress proxyAddress, String username, String password) {
super(proxyAddress);
if (username == null) {
throw new NullPointerException("username");
}
if (password == null) {
throw new NullPointerException("password");
}
this.username = username;
this.password = password;
ByteBuf authz = Unpooled.copiedBuffer(username + ':' + password, CharsetUtil.UTF_8);
ByteBuf authzBase64 = Base64.encode(authz, false);
authorization = new AsciiString(authzBase64.toString(CharsetUtil.US_ASCII));
authz.release();
authzBase64.release();
}
@Override
public String protocol() {
return PROTOCOL;
}
@Override
public String authScheme() {
return authorization != null? AUTH_BASIC : AUTH_NONE;
}
public String username() {
return username;
}
public String password() {
return password;
}
@Override
protected void addCodec(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
String name = ctx.name();
p.addBefore(name, null, codec);
}
@Override
protected void removeEncoder(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(codec.encoder());
}
@Override
protected void removeDecoder(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(codec.decoder());
}
@Override
protected Object newInitialMessage(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress raddr = destinationAddress();
String rhost;
if (raddr.isUnresolved()) {
rhost = raddr.getHostString();
} else {
rhost = raddr.getAddress().getHostAddress();
}
FullHttpRequest req = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_0, HttpMethod.CONNECT,
rhost + ':' + raddr.getPort(),
Unpooled.EMPTY_BUFFER, false);
SocketAddress proxyAddress = proxyAddress();
if (proxyAddress instanceof InetSocketAddress) {
InetSocketAddress hostAddr = (InetSocketAddress) proxyAddress;
req.headers().set(Names.HOST, hostAddr.getHostString() + ':' + hostAddr.getPort());
}
if (authorization != null) {
req.headers().set(Names.AUTHORIZATION, authorization);
}
return req;
}
@Override
protected boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception {
if (response instanceof HttpResponse) {
if (status != null) {
throw new ProxyConnectException(exceptionMessage("too many responses"));
}
status = ((HttpResponse) response).status();
}
boolean finished = response instanceof LastHttpContent;
if (finished) {
if (status == null) {
throw new ProxyConnectException(exceptionMessage("missing response"));
}
if (status.code() != 200) {
throw new ProxyConnectException(exceptionMessage("status: " + status));
}
}
return finished;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import java.net.ConnectException;
public class ProxyConnectException extends ConnectException {
private static final long serialVersionUID = 5211364632246265538L;
public ProxyConnectException() { }
public ProxyConnectException(String msg) {
super(msg);
}
public ProxyConnectException(Throwable cause) {
initCause(cause);
}
public ProxyConnectException(String msg, Throwable cause) {
super(msg);
initCause(cause);
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
public final class ProxyConnectionEvent {
private final String protocol;
private final String authScheme;
private final SocketAddress proxyAddress;
private final SocketAddress destinationAddress;
private String strVal;
/**
* Creates a new event that indicates a successful connection attempt to the destination address.
*/
public ProxyConnectionEvent(
String protocol, String authScheme, SocketAddress proxyAddress, SocketAddress destinationAddress) {
if (protocol == null) {
throw new NullPointerException("protocol");
}
if (authScheme == null) {
throw new NullPointerException("authScheme");
}
if (proxyAddress == null) {
throw new NullPointerException("proxyAddress");
}
if (destinationAddress == null) {
throw new NullPointerException("destinationAddress");
}
this.protocol = protocol;
this.authScheme = authScheme;
this.proxyAddress = proxyAddress;
this.destinationAddress = destinationAddress;
}
/**
* Returns the name of the proxy protocol in use.
*/
public String protocol() {
return protocol;
}
/**
* Returns the name of the authentication scheme in use.
*/
public String authScheme() {
return authScheme;
}
/**
* Returns the address of the proxy server.
*/
@SuppressWarnings("unchecked")
public <T extends SocketAddress> T proxyAddress() {
return (T) proxyAddress;
}
/**
* Returns the address of the destination.
*/
@SuppressWarnings("unchecked")
public <T extends SocketAddress> T destinationAddress() {
return (T) destinationAddress;
}
@Override
public String toString() {
if (strVal != null) {
return strVal;
}
StringBuilder buf = new StringBuilder(128);
buf.append(StringUtil.simpleClassName(this));
buf.append('(');
buf.append(protocol);
buf.append(", ");
buf.append(authScheme);
buf.append(", ");
buf.append(proxyAddress);
buf.append(" => ");
buf.append(destinationAddress);
buf.append(')');
return strVal = buf.toString();
}
}

View File

@ -0,0 +1,449 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.nio.channels.ConnectionPendingException;
import java.util.concurrent.TimeUnit;
public abstract class ProxyHandler extends ChannelDuplexHandler {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ProxyHandler.class);
/**
* The default connect timeout: 10 seconds.
*/
private static final long DEFAULT_CONNECT_TIMEOUT_MILLIS = 10000;
/**
* A string that signifies 'no authentication' or 'anonymous'.
*/
static final String AUTH_NONE = "none";
private final SocketAddress proxyAddress;
private volatile SocketAddress destinationAddress;
private volatile long connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS;
private volatile ChannelHandlerContext ctx;
private PendingWriteQueue pendingWrites;
private boolean finished;
private boolean suppressChannelReadComplete;
private boolean flushedPrematurely;
private final LazyChannelPromise connectPromise = new LazyChannelPromise();
private ScheduledFuture<?> connectTimeoutFuture;
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
setConnectFailure(future.cause());
}
}
};
protected ProxyHandler(SocketAddress proxyAddress) {
if (proxyAddress == null) {
throw new NullPointerException("proxyAddress");
}
this.proxyAddress = proxyAddress;
}
/**
* Returns the name of the proxy protocol in use.
*/
public abstract String protocol();
/**
* Returns the name of the authentication scheme in use.
*/
public abstract String authScheme();
/**
* Returns the address of the proxy server.
*/
@SuppressWarnings("unchecked")
public final <T extends SocketAddress> T proxyAddress() {
return (T) proxyAddress;
}
/**
* Returns the address of the destination to connect to via the proxy server.
*/
@SuppressWarnings("unchecked")
public final <T extends SocketAddress> T destinationAddress() {
return (T) destinationAddress;
}
/**
* Rerutns {@code true} if and only if the connection to the destination has been established successfully.
*/
public final boolean isConnected() {
return connectPromise.isSuccess();
}
/**
* Returns a {@link Future} that is notified when the connection to the destination has been established
* or the connection attempt has failed.
*/
public final Future<Channel> connectFuture() {
return connectPromise;
}
/**
* Returns the connect timeout in millis. If the connection attempt to the destination does not finish within
* the timeout, the connection attempt will be failed.
*/
public final long connectTimeoutMillis() {
return connectTimeoutMillis;
}
/**
* Sets the connect timeout in millis. If the connection attempt to the destination does not finish within
* the timeout, the connection attempt will be failed.
*/
public final void setConnectTimeoutMillis(long connectTimeoutMillis) {
if (connectTimeoutMillis <= 0) {
connectTimeoutMillis = 0;
}
this.connectTimeoutMillis = connectTimeoutMillis;
}
@Override
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
addCodec(ctx);
if (ctx.channel().isActive()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
sendInitialMessage(ctx);
} else {
// channelActive() event has not been fired yet. this.channelOpen() will be invoked
// and initialization will occur there.
}
}
/**
* Adds the codec handlers required to communicate with the proxy server.
*/
protected abstract void addCodec(ChannelHandlerContext ctx) throws Exception;
/**
* Removes the encoders added in {@link #addCodec(ChannelHandlerContext)}.
*/
protected abstract void removeEncoder(ChannelHandlerContext ctx) throws Exception;
/**
* Removes the decoders added in {@link #addCodec(ChannelHandlerContext)}.
*/
protected abstract void removeDecoder(ChannelHandlerContext ctx) throws Exception;
@Override
public final void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
if (destinationAddress != null) {
promise.setFailure(new ConnectionPendingException());
return;
}
destinationAddress = remoteAddress;
ctx.connect(proxyAddress, localAddress, promise);
}
@Override
public final void channelActive(ChannelHandlerContext ctx) throws Exception {
sendInitialMessage(ctx);
ctx.fireChannelActive();
}
/**
* Sends the initial message to be sent to the proxy server. This method also starts a timeout task which marks
* the {@link #connectPromise} as failure if the connection attempt does not success within the timeout.
*/
private void sendInitialMessage(final ChannelHandlerContext ctx) throws Exception {
final long connectTimeoutMillis = this.connectTimeoutMillis;
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = ctx.executor().schedule(new OneTimeTask() {
@Override
public void run() {
if (!connectPromise.isDone()) {
setConnectFailure(new ProxyConnectException(exceptionMessage("timeout")));
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
final Object initialMessage = newInitialMessage(ctx);
if (initialMessage != null) {
sendToProxyServer(initialMessage);
}
}
/**
* Returns a new message that is sent at first time when the connection to the proxy server has been established.
*
* @return the initial message, or {@code null} if the proxy server is expected to send the first message instead
*/
protected abstract Object newInitialMessage(ChannelHandlerContext ctx) throws Exception;
/**
* Sends the specified message to the proxy server. Use this method to send a response to the proxy server in
* {@link #handleResponse(ChannelHandlerContext, Object)}.
*/
protected final void sendToProxyServer(Object msg) {
ctx.writeAndFlush(msg).addListener(writeListener);
}
@Override
public final void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (finished) {
ctx.fireChannelInactive();
} else {
// Disconnected before connected to the destination.
setConnectFailure(new ProxyConnectException(exceptionMessage("disconnected")));
}
}
@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (finished) {
ctx.fireExceptionCaught(cause);
} else {
// Exception was raised before the connection attempt is finished.
setConnectFailure(cause);
}
}
@Override
public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (finished) {
// Received a message after the connection has been established; pass through.
suppressChannelReadComplete = false;
ctx.fireChannelRead(msg);
} else {
suppressChannelReadComplete = true;
Throwable cause = null;
try {
boolean done = handleResponse(ctx, msg);
if (done) {
setConnectSuccess();
}
} catch (Throwable t) {
cause = t;
} finally {
ReferenceCountUtil.release(msg);
if (cause != null) {
setConnectFailure(cause);
}
}
}
}
/**
* Handles the message received from the proxy server.
*
* @return {@code true} if the connection to the destination has been established,
* {@code false} if the connection to the destination has not been established and more messages are
* expected from the proxy server
*/
protected abstract boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception;
private void setConnectSuccess() {
finished = true;
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
if (connectPromise.trySuccess(ctx.channel())) {
boolean removedCodec = true;
removedCodec &= safeRemoveEncoder();
ctx.fireUserEventTriggered(
new ProxyConnectionEvent(protocol(), authScheme(), proxyAddress, destinationAddress));
removedCodec &= safeRemoveDecoder();
if (removedCodec) {
writePendingWrites();
if (flushedPrematurely) {
ctx.flush();
}
} else {
// We are at inconsistent state because we failed to remove all codec handlers.
Exception cause = new ProxyConnectException(
"failed to remove all codec handlers added by the proxy handler; bug?");
failPendingWrites(cause);
ctx.fireExceptionCaught(cause);
ctx.close();
}
}
}
private boolean safeRemoveDecoder() {
try {
removeDecoder(ctx);
return true;
} catch (Exception e) {
logger.warn("Failed to remove proxy decoders:", e);
}
return false;
}
private boolean safeRemoveEncoder() {
try {
removeEncoder(ctx);
return true;
} catch (Exception e) {
logger.warn("Failed to remove proxy encoders:", e);
}
return false;
}
private void setConnectFailure(Throwable cause) {
finished = true;
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
if (!(cause instanceof ProxyConnectException)) {
cause = new ProxyConnectException(
exceptionMessage(cause.toString()), cause);
}
if (connectPromise.tryFailure(cause)) {
safeRemoveDecoder();
safeRemoveEncoder();
failPendingWrites(cause);
ctx.fireExceptionCaught(cause);
ctx.close();
}
}
/**
* Decorates the specified exception message with the common information such as the current protocol,
* authentication scheme, proxy address, and destination address.
*/
protected final String exceptionMessage(String msg) {
if (msg == null) {
msg = "";
}
StringBuilder buf = new StringBuilder(128 + msg.length());
buf.append(protocol());
buf.append(", ");
buf.append(authScheme());
buf.append(", ");
buf.append(proxyAddress);
buf.append(" => ");
buf.append(destinationAddress);
if (msg.length() > 0) {
buf.append(", ");
buf.append(msg);
}
return buf.toString();
}
@Override
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (suppressChannelReadComplete) {
suppressChannelReadComplete = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
} else {
ctx.fireChannelReadComplete();
}
}
@Override
public final void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (finished) {
writePendingWrites();
ctx.write(msg, promise);
} else {
addPendingWrite(ctx, msg, promise);
}
}
@Override
public final void flush(ChannelHandlerContext ctx) throws Exception {
if (finished) {
writePendingWrites();
ctx.flush();
} else {
flushedPrematurely = true;
}
}
private void writePendingWrites() {
if (pendingWrites != null) {
pendingWrites.removeAndWriteAll();
pendingWrites = null;
}
}
private void failPendingWrites(Throwable cause) {
if (pendingWrites != null) {
pendingWrites.removeAndFailAll(cause);
pendingWrites = null;
}
}
private void addPendingWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
PendingWriteQueue pendingWrites = this.pendingWrites;
if (pendingWrites == null) {
this.pendingWrites = pendingWrites = new PendingWriteQueue(ctx);
}
pendingWrites.add(msg, promise);
}
private final class LazyChannelPromise extends DefaultPromise<Channel> {
@Override
protected EventExecutor executor() {
if (ctx == null) {
throw new IllegalStateException();
}
return ctx.executor();
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.socksx.v4.Socks4CmdRequest;
import io.netty.handler.codec.socksx.v4.Socks4CmdResponse;
import io.netty.handler.codec.socksx.v4.Socks4CmdResponseDecoder;
import io.netty.handler.codec.socksx.v4.Socks4CmdStatus;
import io.netty.handler.codec.socksx.v4.Socks4CmdType;
import io.netty.handler.codec.socksx.v4.Socks4MessageEncoder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public final class Socks4ProxyHandler extends ProxyHandler {
private static final String PROTOCOL = "socks4";
private static final String AUTH_USERNAME = "username";
private final String username;
private String decoderName;
private String encoderName;
public Socks4ProxyHandler(SocketAddress proxyAddress) {
this(proxyAddress, null);
}
public Socks4ProxyHandler(SocketAddress proxyAddress, String username) {
super(proxyAddress);
if (username != null && username.length() == 0) {
username = null;
}
this.username = username;
}
@Override
public String protocol() {
return PROTOCOL;
}
@Override
public String authScheme() {
return username != null? AUTH_USERNAME : AUTH_NONE;
}
public String username() {
return username;
}
@Override
protected void addCodec(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
String name = ctx.name();
Socks4CmdResponseDecoder decoder = new Socks4CmdResponseDecoder();
p.addBefore(name, null, decoder);
decoderName = p.context(decoder).name();
encoderName = decoderName + ".encoder";
p.addBefore(name, encoderName, Socks4MessageEncoder.INSTANCE);
}
@Override
protected void removeEncoder(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
p.remove(encoderName);
}
@Override
protected void removeDecoder(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
p.remove(decoderName);
}
@Override
protected Object newInitialMessage(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress raddr = destinationAddress();
String rhost;
if (raddr.isUnresolved()) {
rhost = raddr.getHostString();
} else {
rhost = raddr.getAddress().getHostAddress();
}
return new Socks4CmdRequest(
username != null? username : "", Socks4CmdType.CONNECT, rhost, raddr.getPort());
}
@Override
protected boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception {
final Socks4CmdResponse res = (Socks4CmdResponse) response;
final Socks4CmdStatus status = res.cmdStatus();
if (status == Socks4CmdStatus.SUCCESS) {
return true;
}
throw new ProxyConnectException(exceptionMessage("cmdStatus: " + status));
}
}

View File

@ -0,0 +1,208 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.socksx.v5.Socks5AddressType;
import io.netty.handler.codec.socksx.v5.Socks5AuthRequest;
import io.netty.handler.codec.socksx.v5.Socks5AuthResponse;
import io.netty.handler.codec.socksx.v5.Socks5AuthResponseDecoder;
import io.netty.handler.codec.socksx.v5.Socks5AuthScheme;
import io.netty.handler.codec.socksx.v5.Socks5AuthStatus;
import io.netty.handler.codec.socksx.v5.Socks5CmdRequest;
import io.netty.handler.codec.socksx.v5.Socks5CmdResponse;
import io.netty.handler.codec.socksx.v5.Socks5CmdResponseDecoder;
import io.netty.handler.codec.socksx.v5.Socks5CmdStatus;
import io.netty.handler.codec.socksx.v5.Socks5CmdType;
import io.netty.handler.codec.socksx.v5.Socks5InitRequest;
import io.netty.handler.codec.socksx.v5.Socks5InitResponse;
import io.netty.handler.codec.socksx.v5.Socks5InitResponseDecoder;
import io.netty.handler.codec.socksx.v5.Socks5MessageEncoder;
import io.netty.util.NetUtil;
import io.netty.util.internal.StringUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
public final class Socks5ProxyHandler extends ProxyHandler {
private static final String PROTOCOL = "socks5";
private static final String AUTH_PASSWORD = "password";
private static final Socks5InitRequest INIT_REQUEST_NO_AUTH =
new Socks5InitRequest(Collections.singletonList(Socks5AuthScheme.NO_AUTH));
private static final Socks5InitRequest INIT_REQUEST_PASSWORD =
new Socks5InitRequest(Arrays.asList(Socks5AuthScheme.NO_AUTH, Socks5AuthScheme.AUTH_PASSWORD));
private final String username;
private final String password;
private String decoderName;
private String encoderName;
public Socks5ProxyHandler(SocketAddress proxyAddress) {
this(proxyAddress, null, null);
}
public Socks5ProxyHandler(SocketAddress proxyAddress, String username, String password) {
super(proxyAddress);
if (username != null && username.length() == 0) {
username = null;
}
if (password != null && password.length() == 0) {
password = null;
}
this.username = username;
this.password = password;
}
@Override
public String protocol() {
return PROTOCOL;
}
@Override
public String authScheme() {
return socksAuthScheme() == Socks5AuthScheme.AUTH_PASSWORD? AUTH_PASSWORD : AUTH_NONE;
}
public String username() {
return username;
}
public String password() {
return password;
}
@Override
protected void addCodec(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
String name = ctx.name();
Socks5InitResponseDecoder decoder = new Socks5InitResponseDecoder();
p.addBefore(name, null, decoder);
decoderName = p.context(decoder).name();
encoderName = decoderName + ".encoder";
p.addBefore(name, encoderName, Socks5MessageEncoder.INSTANCE);
}
@Override
protected void removeEncoder(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(encoderName);
}
@Override
protected void removeDecoder(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline p = ctx.pipeline();
if (p.context(decoderName) != null) {
p.remove(decoderName);
}
}
@Override
protected Object newInitialMessage(ChannelHandlerContext ctx) throws Exception {
return socksAuthScheme() == Socks5AuthScheme.AUTH_PASSWORD? INIT_REQUEST_PASSWORD : INIT_REQUEST_NO_AUTH;
}
@Override
protected boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception {
if (response instanceof Socks5InitResponse) {
Socks5InitResponse res = (Socks5InitResponse) response;
Socks5AuthScheme authScheme = socksAuthScheme();
if (res.authScheme() != Socks5AuthScheme.NO_AUTH && authScheme != res.authScheme()) {
// Server did not allow unauthenticated access nor accept the requested authentication scheme.
throw new ProxyConnectException(exceptionMessage("unexpected authScheme: " + res.authScheme()));
}
switch (authScheme) {
case NO_AUTH:
sendConnectCommand(ctx);
break;
case AUTH_PASSWORD:
// In case of password authentication, send an authentication request.
ctx.pipeline().addBefore(encoderName, decoderName, new Socks5AuthResponseDecoder());
sendToProxyServer(
new Socks5AuthRequest(username != null? username : "", password != null? password : ""));
break;
default:
// Should never reach here.
throw new Error();
}
return false;
}
if (response instanceof Socks5AuthResponse) {
// Received an authentication response from the server.
Socks5AuthResponse res = (Socks5AuthResponse) response;
if (res.authStatus() != Socks5AuthStatus.SUCCESS) {
throw new ProxyConnectException(exceptionMessage("authStatus: " + res.authStatus()));
}
sendConnectCommand(ctx);
return false;
}
// This should be the last message from the server.
Socks5CmdResponse res = (Socks5CmdResponse) response;
if (res.cmdStatus() != Socks5CmdStatus.SUCCESS) {
throw new ProxyConnectException(exceptionMessage("cmdStatus: " + res.cmdStatus()));
}
return true;
}
private Socks5AuthScheme socksAuthScheme() {
Socks5AuthScheme authScheme;
if (username == null && password == null) {
authScheme = Socks5AuthScheme.NO_AUTH;
} else {
authScheme = Socks5AuthScheme.AUTH_PASSWORD;
}
return authScheme;
}
private void sendConnectCommand(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress raddr = destinationAddress();
Socks5AddressType addrType;
String rhost;
if (raddr.isUnresolved()) {
addrType = Socks5AddressType.DOMAIN;
rhost = raddr.getHostString();
} else {
rhost = raddr.getAddress().getHostAddress();
if (NetUtil.isValidIpV4Address(rhost)) {
addrType = Socks5AddressType.IPv4;
} else if (NetUtil.isValidIpV6Address(rhost)) {
addrType = Socks5AddressType.IPv6;
} else {
throw new ProxyConnectException(
exceptionMessage("unknown address type: " + StringUtil.simpleClassName(rhost)));
}
}
ctx.pipeline().addBefore(encoderName, decoderName, new Socks5CmdResponseDecoder());
sendToProxyServer(new Socks5CmdRequest(Socks5CmdType.CONNECT, addrType, rhost, raddr.getPort()));
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Adds support for client connections via proxy protocols such as
* <a href="http://en.wikipedia.org/wiki/SOCKS">SOCKS</a> and
* <a href="http://en.wikipedia.org/wiki/HTTP_tunnel#HTTP_CONNECT_tunneling">HTTP CONNECT tunneling</a>
*/
package io.netty.handler.proxy;

View File

@ -0,0 +1,166 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
final class HttpProxyServer extends ProxyServer {
HttpProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) {
super(useSsl, testMode, destination);
}
HttpProxyServer(
boolean useSsl, TestMode testMode, InetSocketAddress destination, String username, String password) {
super(useSsl, testMode, destination, username, password);
}
@Override
protected void configure(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
switch (testMode) {
case INTERMEDIARY:
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1));
p.addLast(new HttpIntermediaryHandler());
break;
case TERMINAL:
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1));
p.addLast(new HttpTerminalHandler());
break;
case UNRESPONSIVE:
p.addLast(UnresponsiveHandler.INSTANCE);
break;
}
}
private boolean authenticate(ChannelHandlerContext ctx, FullHttpRequest req) {
assertThat(req.method(), is(HttpMethod.CONNECT));
if (testMode != TestMode.INTERMEDIARY) {
ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true));
}
ctx.pipeline().remove(HttpObjectAggregator.class);
ctx.pipeline().remove(HttpRequestDecoder.class);
boolean authzSuccess = false;
if (username != null) {
String authz = req.headers().get(Names.AUTHORIZATION);
if (authz != null) {
ByteBuf authzBuf64 = Unpooled.copiedBuffer(authz, CharsetUtil.US_ASCII);
ByteBuf authzBuf = Base64.decode(authzBuf64);
authz = authzBuf.toString(CharsetUtil.US_ASCII);
authzBuf64.release();
authzBuf.release();
String expectedAuthz = username + ':' + password;
authzSuccess = expectedAuthz.equals(authz);
}
} else {
authzSuccess = true;
}
return authzSuccess;
}
private final class HttpIntermediaryHandler extends IntermediaryHandler {
private SocketAddress intermediaryDestination;
@Override
protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest req = (FullHttpRequest) msg;
FullHttpResponse res;
if (!authenticate(ctx, req)) {
res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED);
res.headers().set(Names.CONTENT_LENGTH, 0);
} else {
res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
String uri = req.uri();
int lastColonPos = uri.lastIndexOf(':');
assertThat(lastColonPos, is(greaterThan(0)));
intermediaryDestination = new InetSocketAddress(
uri.substring(0, lastColonPos), Integer.parseInt(uri.substring(lastColonPos + 1)));
}
ctx.write(res);
ctx.pipeline().remove(HttpResponseEncoder.class);
return true;
}
@Override
protected SocketAddress intermediaryDestination() {
return intermediaryDestination;
}
}
private final class HttpTerminalHandler extends TerminalHandler {
@Override
protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest req = (FullHttpRequest) msg;
FullHttpResponse res;
boolean sendGreeting = false;
if (!authenticate(ctx, req)) {
res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED);
res.headers().set(Names.CONTENT_LENGTH, 0);
} else if (!req.uri().equals(destination.getHostString() + ':' + destination.getPort())) {
res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN);
res.headers().set(Names.CONTENT_LENGTH, 0);
} else {
res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
sendGreeting = true;
}
ctx.write(res);
ctx.pipeline().remove(HttpResponseEncoder.class);
if (sendGreeting) {
ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII));
}
return true;
}
}
}

View File

@ -0,0 +1,641 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class ProxyHandlerTest {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ProxyHandlerTest.class);
private static final InetSocketAddress DESTINATION = InetSocketAddress.createUnresolved("destination.com", 42);
private static final InetSocketAddress BAD_DESTINATION = new InetSocketAddress("1.2.3.4", 5);
private static final String USERNAME = "testUser";
private static final String PASSWORD = "testPassword";
private static final String BAD_USERNAME = "badUser";
private static final String BAD_PASSWORD = "badPassword";
static final EventLoopGroup group = new NioEventLoopGroup(3, new DefaultThreadFactory("proxy", true));
static final SslContext serverSslCtx;
static final SslContext clientSslCtx;
static {
SslContext sctx;
SslContext cctx;
try {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sctx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
cctx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE);
} catch (Exception e) {
throw new Error(e);
}
serverSslCtx = sctx;
clientSslCtx = cctx;
}
static final ProxyServer deadHttpProxy = new HttpProxyServer(false, TestMode.UNRESPONSIVE, null);
static final ProxyServer interHttpProxy = new HttpProxyServer(false, TestMode.INTERMEDIARY, null);
static final ProxyServer anonHttpProxy = new HttpProxyServer(false, TestMode.TERMINAL, DESTINATION);
static final ProxyServer httpProxy =
new HttpProxyServer(false, TestMode.TERMINAL, DESTINATION, USERNAME, PASSWORD);
static final ProxyServer deadHttpsProxy = new HttpProxyServer(true, TestMode.UNRESPONSIVE, null);
static final ProxyServer interHttpsProxy = new HttpProxyServer(true, TestMode.INTERMEDIARY, null);
static final ProxyServer anonHttpsProxy = new HttpProxyServer(true, TestMode.TERMINAL, DESTINATION);
static final ProxyServer httpsProxy =
new HttpProxyServer(true, TestMode.TERMINAL, DESTINATION, USERNAME, PASSWORD);
static final ProxyServer deadSocks4Proxy = new Socks4ProxyServer(false, TestMode.UNRESPONSIVE, null);
static final ProxyServer interSocks4Proxy = new Socks4ProxyServer(false, TestMode.INTERMEDIARY, null);
static final ProxyServer anonSocks4Proxy = new Socks4ProxyServer(false, TestMode.TERMINAL, DESTINATION);
static final ProxyServer socks4Proxy = new Socks4ProxyServer(false, TestMode.TERMINAL, DESTINATION, USERNAME);
static final ProxyServer deadSocks5Proxy = new Socks5ProxyServer(false, TestMode.UNRESPONSIVE, null);
static final ProxyServer interSocks5Proxy = new Socks5ProxyServer(false, TestMode.INTERMEDIARY, null);
static final ProxyServer anonSocks5Proxy = new Socks5ProxyServer(false, TestMode.TERMINAL, DESTINATION);
static final ProxyServer socks5Proxy =
new Socks5ProxyServer(false, TestMode.TERMINAL, DESTINATION, USERNAME, PASSWORD);
private static final Collection<ProxyServer> allProxies = Arrays.asList(
deadHttpProxy, interHttpProxy, anonHttpProxy, httpProxy,
deadHttpsProxy, interHttpsProxy, anonHttpsProxy, httpsProxy,
deadSocks4Proxy, interSocks4Proxy, anonSocks4Proxy, socks4Proxy,
deadSocks5Proxy, interSocks5Proxy, anonSocks5Proxy, socks5Proxy
);
@Parameters(name = "{index}: {0}")
public static List<Object[]> testItems() {
List<TestItem> items = Arrays.asList(
// HTTP -------------------------------------------------------
new SuccessTestItem(
"Anonymous HTTP proxy: successful connection",
DESTINATION,
new HttpProxyHandler(anonHttpProxy.address())),
new FailureTestItem(
"Anonymous HTTP proxy: rejected connection",
BAD_DESTINATION, "status: 403",
new HttpProxyHandler(anonHttpProxy.address())),
new FailureTestItem(
"HTTP proxy: rejected anonymous connection",
DESTINATION, "status: 401",
new HttpProxyHandler(httpProxy.address())),
new SuccessTestItem(
"HTTP proxy: successful connection",
DESTINATION,
new HttpProxyHandler(httpProxy.address(), USERNAME, PASSWORD)),
new FailureTestItem(
"HTTP proxy: rejected connection",
BAD_DESTINATION, "status: 403",
new HttpProxyHandler(httpProxy.address(), USERNAME, PASSWORD)),
new FailureTestItem(
"HTTP proxy: authentication failure",
DESTINATION, "status: 401",
new HttpProxyHandler(httpProxy.address(), BAD_USERNAME, BAD_PASSWORD)),
new TimeoutTestItem(
"HTTP proxy: timeout",
new HttpProxyHandler(deadHttpProxy.address())),
// HTTPS ------------------------------------------------------
new SuccessTestItem(
"Anonymous HTTPS proxy: successful connection",
DESTINATION,
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(anonHttpsProxy.address())),
new FailureTestItem(
"Anonymous HTTPS proxy: rejected connection",
BAD_DESTINATION, "status: 403",
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(anonHttpsProxy.address())),
new FailureTestItem(
"HTTPS proxy: rejected anonymous connection",
DESTINATION, "status: 401",
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(httpsProxy.address())),
new SuccessTestItem(
"HTTPS proxy: successful connection",
DESTINATION,
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(httpsProxy.address(), USERNAME, PASSWORD)),
new FailureTestItem(
"HTTPS proxy: rejected connection",
BAD_DESTINATION, "status: 403",
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(httpsProxy.address(), USERNAME, PASSWORD)),
new FailureTestItem(
"HTTPS proxy: authentication failure",
DESTINATION, "status: 401",
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(httpsProxy.address(), BAD_USERNAME, BAD_PASSWORD)),
new TimeoutTestItem(
"HTTPS proxy: timeout",
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(deadHttpsProxy.address())),
// SOCKS4 -----------------------------------------------------
new SuccessTestItem(
"Anonymous SOCKS4: successful connection",
DESTINATION,
new Socks4ProxyHandler(anonSocks4Proxy.address())),
new FailureTestItem(
"Anonymous SOCKS4: rejected connection",
BAD_DESTINATION, "cmdStatus: REJECTED_OR_FAILED",
new Socks4ProxyHandler(anonSocks4Proxy.address())),
new FailureTestItem(
"SOCKS4: rejected anonymous connection",
DESTINATION, "cmdStatus: IDENTD_AUTH_FAILURE",
new Socks4ProxyHandler(socks4Proxy.address())),
new SuccessTestItem(
"SOCKS4: successful connection",
DESTINATION,
new Socks4ProxyHandler(socks4Proxy.address(), USERNAME)),
new FailureTestItem(
"SOCKS4: rejected connection",
BAD_DESTINATION, "cmdStatus: REJECTED_OR_FAILED",
new Socks4ProxyHandler(socks4Proxy.address(), USERNAME)),
new FailureTestItem(
"SOCKS4: authentication failure",
DESTINATION, "cmdStatus: IDENTD_AUTH_FAILURE",
new Socks4ProxyHandler(socks4Proxy.address(), BAD_USERNAME)),
new TimeoutTestItem(
"SOCKS4: timeout",
new Socks4ProxyHandler(deadSocks4Proxy.address())),
// SOCKS5 -----------------------------------------------------
new SuccessTestItem(
"Anonymous SOCKS5: successful connection",
DESTINATION,
new Socks5ProxyHandler(anonSocks5Proxy.address())),
new FailureTestItem(
"Anonymous SOCKS5: rejected connection",
BAD_DESTINATION, "cmdStatus: FORBIDDEN",
new Socks5ProxyHandler(anonSocks5Proxy.address())),
new FailureTestItem(
"SOCKS5: rejected anonymous connection",
DESTINATION, "unexpected authScheme: AUTH_PASSWORD",
new Socks5ProxyHandler(socks5Proxy.address())),
new SuccessTestItem(
"SOCKS5: successful connection",
DESTINATION,
new Socks5ProxyHandler(socks5Proxy.address(), USERNAME, PASSWORD)),
new FailureTestItem(
"SOCKS5: rejected connection",
BAD_DESTINATION, "cmdStatus: FORBIDDEN",
new Socks5ProxyHandler(socks5Proxy.address(), USERNAME, PASSWORD)),
new FailureTestItem(
"SOCKS5: authentication failure",
DESTINATION, "authStatus: FAILURE",
new Socks5ProxyHandler(socks5Proxy.address(), BAD_USERNAME, BAD_PASSWORD)),
new TimeoutTestItem(
"SOCKS5: timeout",
new Socks5ProxyHandler(deadSocks5Proxy.address())),
// HTTP + HTTPS + SOCKS4 + SOCKS5
new SuccessTestItem(
"Single-chain: successful connection",
DESTINATION,
new Socks5ProxyHandler(interSocks5Proxy.address()), // SOCKS5
new Socks4ProxyHandler(interSocks4Proxy.address()), // SOCKS4
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(interHttpsProxy.address()), // HTTPS
new HttpProxyHandler(interHttpProxy.address()), // HTTP
new HttpProxyHandler(anonHttpProxy.address())),
// (HTTP + HTTPS + SOCKS4 + SOCKS5) * 2
new SuccessTestItem(
"Double-chain: successful connection",
DESTINATION,
new Socks5ProxyHandler(interSocks5Proxy.address()), // SOCKS5
new Socks4ProxyHandler(interSocks4Proxy.address()), // SOCKS4
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(interHttpsProxy.address()), // HTTPS
new HttpProxyHandler(interHttpProxy.address()), // HTTP
new Socks5ProxyHandler(interSocks5Proxy.address()), // SOCKS5
new Socks4ProxyHandler(interSocks4Proxy.address()), // SOCKS4
clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT),
new HttpProxyHandler(interHttpsProxy.address()), // HTTPS
new HttpProxyHandler(interHttpProxy.address()), // HTTP
new HttpProxyHandler(anonHttpProxy.address()))
);
// Convert the test items to the list of constructor parameters.
List<Object[]> params = new ArrayList<Object[]>(items.size());
for (Object i: items) {
params.add(new Object[] { i });
}
// Randomize the execution order to increase the possibility of exposing failure dependencies.
Collections.shuffle(params);
return params;
}
@AfterClass
public static void stopServers() {
for (ProxyServer p: allProxies) {
p.stop();
}
}
private final TestItem testItem;
public ProxyHandlerTest(TestItem testItem) {
this.testItem = testItem;
}
@Before
public void clearServerExceptions() throws Exception {
for (ProxyServer p: allProxies) {
p.clearExceptions();
}
}
@Test
public void test() throws Exception {
testItem.test();
}
@After
public void checkServerExceptions() throws Exception {
for (ProxyServer p: allProxies) {
p.checkExceptions();
}
}
private static final class SuccessTestHandler extends SimpleChannelInboundHandler<Object> {
final Queue<String> received = new LinkedBlockingQueue<String>();
final Queue<Throwable> exceptions = new LinkedBlockingQueue<Throwable>();
volatile int eventCount;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("A\n", CharsetUtil.US_ASCII));
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ProxyConnectionEvent) {
eventCount ++;
if (eventCount == 1) {
// Note that ProxyConnectionEvent can be triggered multiple times when there are multiple
// ProxyHandlers in the pipeline. Therefore, we send the 'B' message only on the first event.
ctx.writeAndFlush(Unpooled.copiedBuffer("B\n", CharsetUtil.US_ASCII));
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
String str = ((ByteBuf) msg).toString(CharsetUtil.US_ASCII);
received.add(str);
if ("2".equals(str)) {
ctx.writeAndFlush(Unpooled.copiedBuffer("C\n", CharsetUtil.US_ASCII));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exceptions.add(cause);
ctx.close();
}
}
private static final class FailureTestHandler extends SimpleChannelInboundHandler<Object> {
final Queue<Throwable> exceptions = new LinkedBlockingQueue<Throwable>();
/**
* A latch that counts down when:
* - a pending write attempt in {@link #channelActive(ChannelHandlerContext)} finishes, or
* - the channel is closed.
* By waiting until the latch goes down to 0, we can make sure all assertion failures related with all write
* attempts have been recorded.
*/
final CountDownLatch latch = new CountDownLatch(2);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("A\n", CharsetUtil.US_ASCII)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
latch.countDown();
if (!(future.cause() instanceof ProxyConnectException)) {
exceptions.add(new AssertionError(
"Unexpected failure cause for initial write: " + future.cause()));
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ProxyConnectionEvent) {
fail("Unexpected event: " + evt);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
fail("Unexpected message: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exceptions.add(cause);
ctx.close();
}
}
private abstract static class TestItem {
final String name;
final InetSocketAddress destination;
final ChannelHandler[] clientHandlers;
protected TestItem(String name, InetSocketAddress destination, ChannelHandler... clientHandlers) {
this.name = name;
this.destination = destination;
this.clientHandlers = clientHandlers;
}
abstract void test() throws Exception;
protected void assertProxyHandlers(boolean success) {
for (ChannelHandler h: clientHandlers) {
if (h instanceof ProxyHandler) {
ProxyHandler ph = (ProxyHandler) h;
String type = StringUtil.simpleClassName(ph);
Future<Channel> f = ph.connectFuture();
if (!f.isDone()) {
logger.warn("{}: not done", type);
} else if (f.isSuccess()) {
if (success) {
logger.debug("{}: success", type);
} else {
logger.warn("{}: success", type);
}
} else {
if (success) {
logger.warn("{}: failure", type, f.cause());
} else {
logger.debug("{}: failure", type, f.cause());
}
}
}
}
for (ChannelHandler h: clientHandlers) {
if (h instanceof ProxyHandler) {
ProxyHandler ph = (ProxyHandler) h;
assertThat(ph.connectFuture().isDone(), is(true));
assertThat(ph.connectFuture().isSuccess(), is(success));
}
}
}
@Override
public String toString() {
return name;
}
}
private static final class SuccessTestItem extends TestItem {
private final int expectedEventCount;
SuccessTestItem(String name, InetSocketAddress destination, ChannelHandler... clientHandlers) {
super(name, destination, clientHandlers);
int expectedEventCount = 0;
for (ChannelHandler h: clientHandlers) {
if (h instanceof ProxyHandler) {
expectedEventCount++;
}
}
this.expectedEventCount = expectedEventCount;
}
@Override
protected void test() throws Exception {
final SuccessTestHandler testHandler = new SuccessTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS);
logger.debug("Received messages: {}", testHandler.received);
if (testHandler.exceptions.isEmpty()) {
logger.debug("No recorded exceptions on the client side.");
} else {
for (Throwable t : testHandler.exceptions) {
logger.debug("Recorded exception on the client side: {}", t);
}
}
assertProxyHandlers(true);
assertThat(testHandler.received.toArray(), is(new Object[] { "0", "1", "2", "3" }));
assertThat(testHandler.exceptions.toArray(), is(EmptyArrays.EMPTY_OBJECTS));
assertThat(testHandler.eventCount, is(expectedEventCount));
assertThat(finished, is(true));
}
}
private static final class FailureTestItem extends TestItem {
private final String expectedMessage;
FailureTestItem(
String name, InetSocketAddress destination, String expectedMessage, ChannelHandler... clientHandlers) {
super(name, destination, clientHandlers);
this.expectedMessage = expectedMessage;
}
@Override
protected void test() throws Exception {
final FailureTestHandler testHandler = new FailureTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS);
finished &= testHandler.latch.await(10, TimeUnit.SECONDS);
logger.debug("Recorded exceptions: {}", testHandler.exceptions);
assertProxyHandlers(false);
assertThat(testHandler.exceptions.size(), is(1));
Throwable e = testHandler.exceptions.poll();
assertThat(e, is(instanceOf(ProxyConnectException.class)));
assertThat(String.valueOf(e), containsString(expectedMessage));
assertThat(finished, is(true));
}
}
private static final class TimeoutTestItem extends TestItem {
TimeoutTestItem(String name, ChannelHandler... clientHandlers) {
super(name, null, clientHandlers);
}
@Override
protected void test() throws Exception {
final long TIMEOUT = 2000;
for (ChannelHandler h: clientHandlers) {
if (h instanceof ProxyHandler) {
((ProxyHandler) h).setConnectTimeoutMillis(TIMEOUT);
}
}
final FailureTestHandler testHandler = new FailureTestHandler();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(clientHandlers);
p.addLast(new LineBasedFrameDecoder(64));
p.addLast(testHandler);
}
});
ChannelFuture cf = b.connect(DESTINATION).channel().closeFuture();
boolean finished = cf.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
finished &= testHandler.latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS);
logger.debug("Recorded exceptions: {}", testHandler.exceptions);
assertProxyHandlers(false);
assertThat(testHandler.exceptions.size(), is(1));
Throwable e = testHandler.exceptions.poll();
assertThat(e, is(instanceOf(ProxyConnectException.class)));
assertThat(String.valueOf(e), containsString("timeout"));
assertThat(finished, is(true));
}
}
}

View File

@ -0,0 +1,306 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
abstract class ProxyServer {
protected final InternalLogger logger = InternalLoggerFactory.getInstance(getClass());
private final ServerSocketChannel ch;
private final Queue<Throwable> recordedExceptions = new LinkedBlockingQueue<Throwable>();
protected final TestMode testMode;
protected final String username;
protected final String password;
protected final InetSocketAddress destination;
/**
* Starts a new proxy server with disabled authentication for testing purpose.
*
* @param useSsl {@code true} if and only if implicit SSL is enabled
* @param testMode the test mode
* @param destination the expected destination. If the client requests proxying to a different destination, this
* server will reject the connection request.
*/
protected ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) {
this(useSsl, testMode, destination, null, null);
}
/**
* Starts a new proxy server with disabled authentication for testing purpose.
*
* @param useSsl {@code true} if and only if implicit SSL is enabled
* @param testMode the test mode
* @param username the expected username. If the client tries to authenticate with a different username, this server
* will fail the authentication request.
* @param password the expected password. If the client tries to authenticate with a different password, this server
* will fail the authentication request.
* @param destination the expected destination. If the client requests proxying to a different destination, this
* server will reject the connection request.
*/
protected ProxyServer(
final boolean useSsl, TestMode testMode,
InetSocketAddress destination, String username, String password) {
this.testMode = testMode;
this.destination = destination;
this.username = username;
this.password = password;
ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
b.group(ProxyHandlerTest.group);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (useSsl) {
p.addLast(ProxyHandlerTest.serverSslCtx.newHandler(ch.alloc()));
}
configure(ch);
}
});
ch = (ServerSocketChannel) b.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel();
}
public final InetSocketAddress address() {
return new InetSocketAddress(NetUtil.LOCALHOST, ch.localAddress().getPort());
}
protected abstract void configure(SocketChannel ch) throws Exception;
final void recordException(Throwable t) {
logger.warn("Unexpected exception from proxy server:", t);
recordedExceptions.add(t);
}
/**
* Clears all recorded exceptions.
*/
public final void clearExceptions() {
recordedExceptions.clear();
}
/**
* Logs all recorded exceptions and raises the last one so that the caller can fail.
*/
public final void checkExceptions() {
Throwable t;
for (;;) {
t = recordedExceptions.poll();
if (t == null) {
break;
}
logger.warn("Unexpected exception:", t);
}
if (t != null) {
PlatformDependent.throwException(t);
}
}
public final void stop() {
ch.close();
}
protected abstract class IntermediaryHandler extends SimpleChannelInboundHandler<Object> {
private final Queue<Object> received = new ArrayDeque<Object>();
private boolean finished;
private Channel backend;
@Override
protected final void channelRead0(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (finished) {
received.add(ReferenceCountUtil.retain(msg));
flush();
return;
}
boolean finished = handleProxyProtocol(ctx, msg);
if (finished) {
this.finished = true;
ChannelFuture f = connectToDestination(ctx.channel().eventLoop(), new BackendHandler(ctx));
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
recordException(future.cause());
ctx.close();
} else {
backend = future.channel();
flush();
}
}
});
}
}
private void flush() {
if (backend != null) {
boolean wrote = false;
for (;;) {
Object msg = received.poll();
if (msg == null) {
break;
}
backend.write(msg);
wrote = true;
}
if (wrote) {
backend.flush();
}
}
}
protected abstract boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception;
protected abstract SocketAddress intermediaryDestination();
private ChannelFuture connectToDestination(EventLoop loop, ChannelHandler handler) {
Bootstrap b = new Bootstrap();
b.channel(NioSocketChannel.class);
b.group(loop);
b.handler(handler);
return b.connect(intermediaryDestination());
}
@Override
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (backend != null) {
backend.close();
}
}
@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
recordException(cause);
ctx.close();
}
private final class BackendHandler extends ChannelInboundHandlerAdapter {
private final ChannelHandlerContext frontend;
BackendHandler(ChannelHandlerContext frontend) {
this.frontend = frontend;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
frontend.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
frontend.flush();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
frontend.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
recordException(cause);
ctx.close();
}
}
}
protected abstract class TerminalHandler extends SimpleChannelInboundHandler<Object> {
private boolean finished;
@Override
protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (finished) {
String str = ((ByteBuf) msg).toString(CharsetUtil.US_ASCII);
if ("A\n".equals(str)) {
ctx.write(Unpooled.copiedBuffer("1\n", CharsetUtil.US_ASCII));
} else if ("B\n".equals(str)) {
ctx.write(Unpooled.copiedBuffer("2\n", CharsetUtil.US_ASCII));
} else if ("C\n".equals(str)) {
ctx.write(Unpooled.copiedBuffer("3\n", CharsetUtil.US_ASCII))
.addListener(ChannelFutureListener.CLOSE);
} else {
throw new IllegalStateException("unexpected message: " + str);
}
return;
}
boolean finished = handleProxyProtocol(ctx, msg);
if (finished) {
this.finished = finished;
}
}
protected abstract boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception;
@Override
public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
recordException(cause);
ctx.close();
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.socksx.v4.Socks4CmdRequest;
import io.netty.handler.codec.socksx.v4.Socks4CmdRequestDecoder;
import io.netty.handler.codec.socksx.v4.Socks4CmdResponse;
import io.netty.handler.codec.socksx.v4.Socks4CmdStatus;
import io.netty.handler.codec.socksx.v4.Socks4CmdType;
import io.netty.handler.codec.socksx.v4.Socks4MessageEncoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
final class Socks4ProxyServer extends ProxyServer {
Socks4ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) {
super(useSsl, testMode, destination);
}
Socks4ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination, String username) {
super(useSsl, testMode, destination, username, null);
}
@Override
protected void configure(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
switch (testMode) {
case INTERMEDIARY:
p.addLast(new Socks4CmdRequestDecoder());
p.addLast(Socks4MessageEncoder.INSTANCE);
p.addLast(new Socks4IntermediaryHandler());
break;
case TERMINAL:
p.addLast(new Socks4CmdRequestDecoder());
p.addLast(Socks4MessageEncoder.INSTANCE);
p.addLast(new Socks4TerminalHandler());
break;
case UNRESPONSIVE:
p.addLast(UnresponsiveHandler.INSTANCE);
break;
}
}
private boolean authenticate(ChannelHandlerContext ctx, Socks4CmdRequest req) {
assertThat(req.cmdType(), is(Socks4CmdType.CONNECT));
if (testMode != TestMode.INTERMEDIARY) {
ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true));
}
boolean authzSuccess;
if (username != null) {
authzSuccess = username.equals(req.userId());
} else {
authzSuccess = true;
}
return authzSuccess;
}
private final class Socks4IntermediaryHandler extends IntermediaryHandler {
private SocketAddress intermediaryDestination;
@Override
protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception {
Socks4CmdRequest req = (Socks4CmdRequest) msg;
Socks4CmdResponse res;
if (!authenticate(ctx, req)) {
res = new Socks4CmdResponse(Socks4CmdStatus.IDENTD_AUTH_FAILURE);
} else {
res = new Socks4CmdResponse(Socks4CmdStatus.SUCCESS);
intermediaryDestination = new InetSocketAddress(req.host(), req.port());
}
ctx.write(res);
ctx.pipeline().remove(Socks4MessageEncoder.class);
return true;
}
@Override
protected SocketAddress intermediaryDestination() {
return intermediaryDestination;
}
}
private final class Socks4TerminalHandler extends TerminalHandler {
@Override
protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception {
Socks4CmdRequest req = (Socks4CmdRequest) msg;
boolean authzSuccess = authenticate(ctx, req);
Socks4CmdResponse res;
boolean sendGreeting = false;
if (!authzSuccess) {
res = new Socks4CmdResponse(Socks4CmdStatus.IDENTD_AUTH_FAILURE);
} else if (!req.host().equals(destination.getHostString()) ||
req.port() != destination.getPort()) {
res = new Socks4CmdResponse(Socks4CmdStatus.REJECTED_OR_FAILED);
} else {
res = new Socks4CmdResponse(Socks4CmdStatus.SUCCESS);
sendGreeting = true;
}
ctx.write(res);
ctx.pipeline().remove(Socks4MessageEncoder.class);
if (sendGreeting) {
ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII));
}
return true;
}
}
}

View File

@ -0,0 +1,170 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.socksx.v5.Socks5AddressType;
import io.netty.handler.codec.socksx.v5.Socks5AuthRequest;
import io.netty.handler.codec.socksx.v5.Socks5AuthRequestDecoder;
import io.netty.handler.codec.socksx.v5.Socks5AuthResponse;
import io.netty.handler.codec.socksx.v5.Socks5AuthScheme;
import io.netty.handler.codec.socksx.v5.Socks5AuthStatus;
import io.netty.handler.codec.socksx.v5.Socks5CmdRequest;
import io.netty.handler.codec.socksx.v5.Socks5CmdRequestDecoder;
import io.netty.handler.codec.socksx.v5.Socks5CmdResponse;
import io.netty.handler.codec.socksx.v5.Socks5CmdStatus;
import io.netty.handler.codec.socksx.v5.Socks5CmdType;
import io.netty.handler.codec.socksx.v5.Socks5InitRequest;
import io.netty.handler.codec.socksx.v5.Socks5InitRequestDecoder;
import io.netty.handler.codec.socksx.v5.Socks5InitResponse;
import io.netty.handler.codec.socksx.v5.Socks5MessageEncoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
final class Socks5ProxyServer extends ProxyServer {
Socks5ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) {
super(useSsl, testMode, destination);
}
Socks5ProxyServer(
boolean useSsl, TestMode testMode, InetSocketAddress destination, String username, String password) {
super(useSsl, testMode, destination, username, password);
}
@Override
protected void configure(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
switch (testMode) {
case INTERMEDIARY:
p.addLast("decoder", new Socks5InitRequestDecoder());
p.addLast("encoder", Socks5MessageEncoder.INSTANCE);
p.addLast(new Socks5IntermediaryHandler());
break;
case TERMINAL:
p.addLast("decoder", new Socks5InitRequestDecoder());
p.addLast("encoder", Socks5MessageEncoder.INSTANCE);
p.addLast(new Socks5TerminalHandler());
break;
case UNRESPONSIVE:
p.addLast(UnresponsiveHandler.INSTANCE);
break;
}
}
private boolean authenticate(ChannelHandlerContext ctx, Object msg) {
if (username == null) {
ctx.pipeline().addBefore("encoder", "decoder", new Socks5CmdRequestDecoder());
ctx.write(new Socks5InitResponse(Socks5AuthScheme.NO_AUTH));
return true;
}
if (msg instanceof Socks5InitRequest) {
ctx.pipeline().addBefore("encoder", "decoder", new Socks5AuthRequestDecoder());
ctx.write(new Socks5InitResponse(Socks5AuthScheme.AUTH_PASSWORD));
return false;
}
Socks5AuthRequest req = (Socks5AuthRequest) msg;
if (req.username().equals(username) && req.password().equals(password)) {
ctx.pipeline().addBefore("encoder", "decoder", new Socks5CmdRequestDecoder());
ctx.write(new Socks5AuthResponse(Socks5AuthStatus.SUCCESS));
return true;
}
ctx.pipeline().addBefore("encoder", "decoder", new Socks5AuthRequestDecoder());
ctx.write(new Socks5AuthResponse(Socks5AuthStatus.FAILURE));
return false;
}
private final class Socks5IntermediaryHandler extends IntermediaryHandler {
private boolean authenticated;
private SocketAddress intermediaryDestination;
@Override
protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!authenticated) {
authenticated = authenticate(ctx, msg);
return false;
}
Socks5CmdRequest req = (Socks5CmdRequest) msg;
assertThat(req.cmdType(), is(Socks5CmdType.CONNECT));
Socks5CmdResponse res;
res = new Socks5CmdResponse(Socks5CmdStatus.SUCCESS, Socks5AddressType.IPv4);
intermediaryDestination = new InetSocketAddress(req.host(), req.port());
ctx.write(res);
ctx.pipeline().remove(Socks5MessageEncoder.class);
return true;
}
@Override
protected SocketAddress intermediaryDestination() {
return intermediaryDestination;
}
}
private final class Socks5TerminalHandler extends TerminalHandler {
private boolean authenticated;
@Override
protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!authenticated) {
authenticated = authenticate(ctx, msg);
return false;
}
Socks5CmdRequest req = (Socks5CmdRequest) msg;
assertThat(req.cmdType(), is(Socks5CmdType.CONNECT));
ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true));
Socks5CmdResponse res;
boolean sendGreeting = false;
if (!req.host().equals(destination.getHostString()) ||
req.port() != destination.getPort()) {
res = new Socks5CmdResponse(Socks5CmdStatus.FORBIDDEN, Socks5AddressType.IPv4);
} else {
res = new Socks5CmdResponse(Socks5CmdStatus.SUCCESS, Socks5AddressType.IPv4);
sendGreeting = true;
}
ctx.write(res);
ctx.pipeline().remove(Socks5MessageEncoder.class);
if (sendGreeting) {
ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII));
}
return true;
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
enum TestMode {
INTERMEDIARY,
TERMINAL,
UNRESPONSIVE,
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.proxy;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@Sharable
final class UnresponsiveHandler extends SimpleChannelInboundHandler<Object> {
static final UnresponsiveHandler INSTANCE = new UnresponsiveHandler();
private UnresponsiveHandler() { }
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Ignore
}
}

View File

@ -431,6 +431,7 @@
<module>transport-sctp</module> <module>transport-sctp</module>
<module>transport-udt</module> <module>transport-udt</module>
<module>handler</module> <module>handler</module>
<module>handler-proxy</module>
<module>example</module> <module>example</module>
<module>testsuite</module> <module>testsuite</module>
<module>microbench</module> <module>microbench</module>