From 65fd9c0b120a6705e79647274fa3226356d2c254 Mon Sep 17 00:00:00 2001 From: bk1te Date: Tue, 13 Aug 2013 23:02:21 +0400 Subject: [PATCH] use Promise instead of CallbackNotifier --- .../example/socksproxy/CallbackNotifier.java | 23 -------- .../socksproxy/DirectClientHandler.java | 11 ++-- .../socksproxy/DirectClientInitializer.java | 9 +-- .../socksproxy/SocksServerConnectHandler.java | 59 ++++++++++++------- 4 files changed, 49 insertions(+), 53 deletions(-) delete mode 100644 example/src/main/java/io/netty/example/socksproxy/CallbackNotifier.java diff --git a/example/src/main/java/io/netty/example/socksproxy/CallbackNotifier.java b/example/src/main/java/io/netty/example/socksproxy/CallbackNotifier.java deleted file mode 100644 index 728144e45f..0000000000 --- a/example/src/main/java/io/netty/example/socksproxy/CallbackNotifier.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.example.socksproxy; - -import io.netty.channel.ChannelHandlerContext; - -public interface CallbackNotifier { - void onSuccess(ChannelHandlerContext outboundCtx); - void onFailure(ChannelHandlerContext outboundCtx, Throwable cause); -} diff --git a/example/src/main/java/io/netty/example/socksproxy/DirectClientHandler.java b/example/src/main/java/io/netty/example/socksproxy/DirectClientHandler.java index 3cd1f7843f..4c41916ed1 100644 --- a/example/src/main/java/io/netty/example/socksproxy/DirectClientHandler.java +++ b/example/src/main/java/io/netty/example/socksproxy/DirectClientHandler.java @@ -17,6 +17,7 @@ package io.netty.example.socksproxy; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.concurrent.Promise; public final class DirectClientHandler extends ChannelInboundHandlerAdapter { @@ -25,20 +26,20 @@ public final class DirectClientHandler extends ChannelInboundHandlerAdapter { public static String getName() { return name; } - private final CallbackNotifier cb; + private final Promise promise; - public DirectClientHandler(CallbackNotifier cb) { - this.cb = cb; + public DirectClientHandler(Promise promise) { + this.promise = promise; } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.pipeline().remove(this); - cb.onSuccess(ctx); + promise.setSuccess(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) throws Exception { - cb.onFailure(ctx, throwable); + promise.setFailure(throwable); } } diff --git a/example/src/main/java/io/netty/example/socksproxy/DirectClientInitializer.java b/example/src/main/java/io/netty/example/socksproxy/DirectClientInitializer.java index 678a7d715a..c4d1bc8186 100644 --- a/example/src/main/java/io/netty/example/socksproxy/DirectClientInitializer.java +++ b/example/src/main/java/io/netty/example/socksproxy/DirectClientInitializer.java @@ -18,19 +18,20 @@ package io.netty.example.socksproxy; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.Promise; public final class DirectClientInitializer extends ChannelInitializer { - private final CallbackNotifier callbackNotifier; + private final Promise promise; - public DirectClientInitializer(CallbackNotifier callbackNotifier) { - this.callbackNotifier = callbackNotifier; + public DirectClientInitializer(Promise promise) { + this.promise = promise; } @Override public void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); - channelPipeline.addLast(DirectClientHandler.getName(), new DirectClientHandler(callbackNotifier)); + channelPipeline.addLast(DirectClientHandler.getName(), new DirectClientHandler(promise)); } } diff --git a/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java b/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java index c78f30aff8..263f98f371 100644 --- a/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java +++ b/example/src/main/java/io/netty/example/socksproxy/SocksServerConnectHandler.java @@ -27,6 +27,9 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.socks.SocksCmdRequest; import io.netty.handler.codec.socks.SocksCmdResponse; import io.netty.handler.codec.socks.SocksCmdStatus; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; @ChannelHandler.Sharable public final class SocksServerConnectHandler extends SimpleChannelInboundHandler { @@ -40,35 +43,49 @@ public final class SocksServerConnectHandler extends SimpleChannelInboundHandler @Override public void channelRead0(final ChannelHandlerContext ctx, final SocksCmdRequest request) throws Exception { - CallbackNotifier cb = new CallbackNotifier() { + Promise promise = ctx.executor().newPromise(); + promise.addListener( + new GenericFutureListener>() { @Override - public void onSuccess(final ChannelHandlerContext outboundCtx) { - ctx.channel().writeAndFlush(new SocksCmdResponse(SocksCmdStatus.SUCCESS, request.addressType())) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - ctx.pipeline().remove(getName()); - outboundCtx.channel().pipeline().addLast(new RelayHandler(ctx.channel())); - ctx.channel().pipeline().addLast(new RelayHandler(outboundCtx.channel())); - } - }); + public void operationComplete(final Future future) throws Exception { + final Channel outboundChannel = future.getNow(); + if (future.isSuccess()) { + ctx.channel().writeAndFlush(new SocksCmdResponse(SocksCmdStatus.SUCCESS, request.addressType())) + .addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + ctx.pipeline().remove(getName()); + outboundChannel.pipeline().addLast(new RelayHandler(ctx.channel())); + ctx.channel().pipeline().addLast(new RelayHandler(outboundChannel)); + } + }); + } else { + ctx.channel().writeAndFlush(new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType())); + SocksServerUtils.closeOnFlush(ctx.channel()); + } } - - @Override - public void onFailure(ChannelHandlerContext outboundCtx, Throwable cause) { - ctx.channel().writeAndFlush(new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType())); - SocksServerUtils.closeOnFlush(ctx.channel()); - } - }; + }); final Channel inboundChannel = ctx.channel(); b.group(inboundChannel.eventLoop()) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new DirectClientInitializer(cb)); - - b.connect(request.host(), request.port()); + .handler(new DirectClientInitializer(promise)); + b.connect(request.host(), request.port()) + .addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // Connection established use handler provided results + } else { + // Close the connection if the connection attempt has failed. + ctx.channel().writeAndFlush( + new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType())); + SocksServerUtils.closeOnFlush(ctx.channel()); + } + } + }); } @Override