diff --git a/example/src/main/java/io/netty/example/echo/EchoClient.java b/example/src/main/java/io/netty/example/echo/EchoClient.java index 6520460dcf..a90b07c7ed 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClient.java +++ b/example/src/main/java/io/netty/example/echo/EchoClient.java @@ -15,15 +15,22 @@ */ package io.netty.example.echo; -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.EventLoop; +import io.netty.channel.MultithreadEventLoop; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.nio.SelectorEventLoop; +import io.netty.handler.logging.LoggingHandler; +import io.netty.logging.InternalLogLevel; -import io.netty.bootstrap.ClientBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.Channels; -import io.netty.channel.socket.nio.NioClientSocketChannelFactory; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicLong; /** * Sends one message when a connection is open and echoes back any received @@ -36,6 +43,7 @@ public class EchoClient { private final String host; private final int port; private final int firstMessageSize; + private final AtomicLong transferredBytes = new AtomicLong(); public EchoClient(String host, int port, int firstMessageSize) { this.host = host; @@ -43,28 +51,54 @@ public class EchoClient { this.firstMessageSize = firstMessageSize; } - public void run() { - // Configure the client. - ClientBootstrap bootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool())); + public void run() throws Exception { + EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.class); + SocketChannel s = new NioSocketChannel(); + s.config().setTcpNoDelay(true); + s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); + s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter() { - // Set up the pipeline factory. - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new EchoClientHandler(firstMessageSize)); + private final ChannelBuffer firstMessage; + { + if (firstMessageSize <= 0) { + throw new IllegalArgumentException( + "firstMessageSize: " + firstMessageSize); + } + firstMessage = ChannelBuffers.buffer(firstMessageSize); + for (int i = 0; i < firstMessage.capacity(); i ++) { + firstMessage.writeByte((byte) i); + } + } + + @Override + public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { + return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); + } + + @Override + public void channelActive(ChannelInboundHandlerContext ctx) + throws Exception { + ctx.write(firstMessage); + } + + @Override + public void inboundBufferUpdated( + ChannelInboundHandlerContext ctx) throws Exception { + ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer out = ctx.out().byteBuffer(); + transferredBytes.addAndGet(in.readableBytes()); + + out.discardReadBytes(); + out.writeBytes(in); + in.clear(); + ctx.flush(); } }); + loop.register(s).awaitUninterruptibly().rethrowIfFailed(); + s.connect(new InetSocketAddress(host, port)).awaitUninterruptibly().rethrowIfFailed(); - // Start the connection attempt. - ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); - - // Wait until the connection is closed or the connection attempt fails. - future.channel().getCloseFuture().awaitUninterruptibly(); - - // Shut down thread pools to exit. - bootstrap.releaseExternalResources(); + // FIXME: Wait until the connection is closed or the connection attempt fails. + // FIXME: Show how to shut down. } public static void main(String[] args) throws Exception { diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index b318c3f8fc..d7ec3f48d1 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -33,6 +33,7 @@ import io.netty.logging.InternalLogLevel; import java.net.InetSocketAddress; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; /** * Echoes back any received data from a client. @@ -40,6 +41,7 @@ import java.util.Queue; public class EchoServer { private final int port; + private final AtomicLong transferredBytes = new AtomicLong(); public EchoServer(int port) { this.port = port; @@ -69,6 +71,7 @@ public class EchoServer { if (s == null) { break; } + s.config().setTcpNoDelay(true); s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter() { @Override @@ -80,6 +83,8 @@ public class EchoServer { public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { ChannelBuffer in = ctx.in().byteBuffer(); ChannelBuffer out = ctx.out().byteBuffer(); + transferredBytes.addAndGet(in.readableBytes()); + out.discardReadBytes(); out.writeBytes(in); in.clear(); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index f6c78b6f7e..27dffbd34c 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -546,8 +546,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha assert eventLoop().inEventLoop(); assert connectFuture != null; try { + boolean wasActive = isActive(); doFinishConnect(); connectFuture.setSuccess(); + if (!wasActive && isActive()) { + pipeline().fireChannelActive(); + } } catch (Throwable t) { connectFuture.setFailure(t); pipeline().fireExceptionCaught(t); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index b5ee1869d9..11a8304166 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -87,6 +87,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { } SelectorEventLoop loop = (SelectorEventLoop) eventLoop(); - selectionKey = javaChannel().register(loop.selector, SelectionKey.OP_READ, this); + selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index f6130d4d00..4b1e5c83ac 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -116,8 +116,10 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress); - if (!connected) { - selectionKey().interestOps(selectionKey().interestOps() | SelectionKey.OP_CONNECT); + if (connected) { + selectionKey().interestOps(SelectionKey.OP_READ); + } else { + selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; @@ -133,6 +135,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha if (!javaChannel().finishConnect()) { throw new Error(); } + selectionKey().interestOps(SelectionKey.OP_READ); } @Override