diff --git a/example/src/main/java/io/netty/example/echo/EchoClient.java b/example/src/main/java/io/netty/example/echo/EchoClient.java index 2c5240baa9..e6997e45a7 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClient.java +++ b/example/src/main/java/io/netty/example/echo/EchoClient.java @@ -15,12 +15,6 @@ */ package io.netty.example.echo; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.EventLoop; import io.netty.channel.MultithreadEventLoop; import io.netty.channel.socket.SocketChannel; @@ -30,7 +24,6 @@ import io.netty.handler.logging.LoggingHandler; import io.netty.logging.InternalLogLevel; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicLong; /** * Sends one message when a connection is open and echoes back any received @@ -43,7 +36,6 @@ public class EchoClient { private final String host; private final int port; private final int firstMessageSize; - private final AtomicLong transferredBytes = new AtomicLong(); public EchoClient(String host, int port, int firstMessageSize) { this.host = host; @@ -52,53 +44,23 @@ public class EchoClient { } public void run() throws Exception { - EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.FACTORY); + // Create a new socket and configure it. SocketChannel s = new NioSocketChannel(); s.config().setTcpNoDelay(true); s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); - s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter() { + s.pipeline().addLast("echoer", new EchoClientHandler(firstMessageSize)); - private final ChannelBuffer firstMessage; - { - if (firstMessageSize <= 0) { - throw new IllegalArgumentException( - "firstMessageSize: " + firstMessageSize); - } - firstMessage = ChannelBuffers.buffer(firstMessageSize); - for (int i = 0; i < firstMessage.capacity(); i ++) { - firstMessage.writeByte((byte) i); - } - } - - @Override - public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { - return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); - } - - @Override - public void channelActive(ChannelInboundHandlerContext ctx) - throws Exception { - ctx.write(firstMessage); - } - - @Override - public void inboundBufferUpdated( - ChannelInboundHandlerContext ctx) throws Exception { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); - transferredBytes.addAndGet(in.readableBytes()); - - out.discardReadBytes(); - out.writeBytes(in); - in.clear(); - ctx.flush(); - } - }); + // Begin the communication by registering the channel to an event loop and connecting + // to the peer. + EventLoop loop = new MultithreadEventLoop(SelectorEventLoop.FACTORY); loop.register(s).awaitUninterruptibly().rethrowIfFailed(); - s.connect(new InetSocketAddress(host, port)).awaitUninterruptibly().rethrowIfFailed(); + s.connect(new InetSocketAddress(host, port)); - // FIXME: Wait until the connection is closed or the connection attempt fails. - // FIXME: Show how to shut down. + // Wait until the connection is closed. + s.closeFuture().awaitUninterruptibly(); + + // Terminate the event loop. + loop.shutdown(); } public static void main(String[] args) throws Exception { diff --git a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java index 2cfb9b2809..2fdaab679b 100644 --- a/example/src/main/java/io/netty/example/echo/EchoClientHandler.java +++ b/example/src/main/java/io/netty/example/echo/EchoClientHandler.java @@ -15,38 +15,34 @@ */ package io.netty.example.echo; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; + +import java.util.logging.Level; +import java.util.logging.Logger; /** * Handler implementation for the echo client. It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ -public class EchoClientHandler extends SimpleChannelUpstreamHandler { +public class EchoClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger.getLogger( EchoClientHandler.class.getName()); private final ChannelBuffer firstMessage; - private final AtomicLong transferredBytes = new AtomicLong(); /** * Creates a client-side handler. */ public EchoClientHandler(int firstMessageSize) { if (firstMessageSize <= 0) { - throw new IllegalArgumentException( - "firstMessageSize: " + firstMessageSize); + throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize); } firstMessage = ChannelBuffers.buffer(firstMessageSize); for (int i = 0; i < firstMessage.capacity(); i ++) { @@ -54,34 +50,31 @@ public class EchoClientHandler extends SimpleChannelUpstreamHandler { } } - public long getTransferredBytes() { - return transferredBytes.get(); + @Override + public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { + return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); } @Override - public void channelConnected( - ChannelHandlerContext ctx, ChannelStateEvent e) { - // Send the first message. Server will not send anything here - // because the firstMessage's capacity is 0. - e.channel().write(firstMessage); + public void channelActive(ChannelInboundHandlerContext ctx) { + ctx.write(firstMessage); } @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) { - // Send back the received message to the remote peer. - transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes()); - e.channel().write(e.getMessage()); + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { + ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer out = ctx.out().byteBuffer(); + out.discardReadBytes(); + out.writeBytes(in); + in.discardReadBytes(); + ctx.flush(); } @Override public void exceptionCaught( - ChannelHandlerContext ctx, ExceptionEvent e) { + ChannelInboundHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. - logger.log( - Level.WARNING, - "Unexpected exception from downstream.", - e.cause()); - e.channel().close(); + logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); + ctx.close(); } } diff --git a/example/src/main/java/io/netty/example/echo/EchoServer.java b/example/src/main/java/io/netty/example/echo/EchoServer.java index fa4788d434..aa3812f8fa 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServer.java +++ b/example/src/main/java/io/netty/example/echo/EchoServer.java @@ -15,8 +15,6 @@ */ package io.netty.example.echo; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -33,7 +31,6 @@ import io.netty.logging.InternalLogLevel; import java.net.InetSocketAddress; import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; /** * Echoes back any received data from a client. @@ -41,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLong; public class EchoServer { private final int port; - private final AtomicLong transferredBytes = new AtomicLong(); public EchoServer(int port) { this.port = port; @@ -73,24 +69,7 @@ public class EchoServer { } s.config().setTcpNoDelay(true); s.pipeline().addLast("logger", new LoggingHandler(InternalLogLevel.INFO)); - s.pipeline().addLast("echoer", new ChannelInboundHandlerAdapter() { - @Override - public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { - return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); - } - - @Override - public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { - ChannelBuffer in = ctx.in().byteBuffer(); - ChannelBuffer out = ctx.out().byteBuffer(); - transferredBytes.addAndGet(in.readableBytes()); - - out.discardReadBytes(); - out.writeBytes(in); - in.clear(); - ctx.flush(); - } - }); + s.pipeline().addLast("echoer", new EchoServerHandler()); loop.register(s); } } diff --git a/example/src/main/java/io/netty/example/echo/EchoServerHandler.java b/example/src/main/java/io/netty/example/echo/EchoServerHandler.java index 5828fd0be4..e91beac377 100644 --- a/example/src/main/java/io/netty/example/echo/EchoServerHandler.java +++ b/example/src/main/java/io/netty/example/echo/EchoServerHandler.java @@ -15,46 +15,43 @@ */ package io.netty.example.echo; -import java.util.concurrent.atomic.AtomicLong; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerContext; + import java.util.logging.Level; import java.util.logging.Logger; -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; - /** * Handler implementation for the echo server. */ -public class EchoServerHandler extends SimpleChannelUpstreamHandler { +public class EchoServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger.getLogger( EchoServerHandler.class.getName()); - private final AtomicLong transferredBytes = new AtomicLong(); - - public long getTransferredBytes() { - return transferredBytes.get(); + @Override + public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { + return ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); } @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) { - // Send back the received message to the remote peer. - transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes()); - e.channel().write(e.getMessage()); + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) { + ChannelBuffer in = ctx.in().byteBuffer(); + ChannelBuffer out = ctx.out().byteBuffer(); + out.discardReadBytes(); + out.writeBytes(in); + in.discardReadBytes(); + ctx.flush(); } @Override - public void exceptionCaught( - ChannelHandlerContext ctx, ExceptionEvent e) { + public void exceptionCaught(ChannelInboundHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. - logger.log( - Level.WARNING, - "Unexpected exception from downstream.", - e.cause()); - e.channel().close(); + logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); + ctx.close(); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 970639a7a8..759d266638 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -400,6 +400,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha future.setFailure(t); pipeline().fireExceptionCaught(t); + closeFuture().setSuccess(); } } @@ -538,7 +539,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void close(final ChannelFuture future) { if (eventLoop().inEventLoop()) { - if (isOpen()) { + if (closeFuture.setClosed()) { boolean wasActive = isActive(); try { doClose(); @@ -557,7 +558,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha pipeline().fireChannelInactive(); } - closeFuture.setClosed(); deregister(voidFuture()); } else { // Closed already. @@ -791,9 +791,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha throw new IllegalStateException(); } - void setClosed() { + boolean setClosed() { boolean set = super.setSuccess(); assert set; + return set; } }